diff options
author | dennnniska <dennnis@ydb.tech> | 2025-04-18 15:12:51 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-18 15:12:51 +0300 |
commit | 68c43aed9070c11917f3b1bb79f11197910d4a6a (patch) | |
tree | bdf0bd8b0c81f1d1b3384574a86fd23d0bbe9b2c | |
parent | 62614f2f7fa863a2743bb9fabbbb6816f5b79426 (diff) | |
download | ydb-68c43aed9070c11917f3b1bb79f11197910d4a6a.tar.gz |
Test for async replication for all types (#16670)
-rw-r--r-- | ydb/tests/datashard/async_replication/test_async_replication.py | 90 | ||||
-rw-r--r-- | ydb/tests/datashard/async_replication/ya.make | 23 | ||||
-rw-r--r-- | ydb/tests/datashard/dml/test_dml.py | 405 | ||||
-rw-r--r-- | ydb/tests/datashard/lib/dml_operations.py | 396 | ||||
-rw-r--r-- | ydb/tests/datashard/lib/multicluster_test_base.py | 70 | ||||
-rw-r--r-- | ydb/tests/datashard/lib/ya.make | 9 | ||||
-rw-r--r-- | ydb/tests/datashard/ya.make | 3 |
7 files changed, 601 insertions, 395 deletions
diff --git a/ydb/tests/datashard/async_replication/test_async_replication.py b/ydb/tests/datashard/async_replication/test_async_replication.py new file mode 100644 index 00000000000..793d2a1e590 --- /dev/null +++ b/ydb/tests/datashard/async_replication/test_async_replication.py @@ -0,0 +1,90 @@ +import pytest +import time + +from ydb.tests.sql.lib.test_query import Query +from ydb.tests.library.common.wait_for import wait_for +from ydb.tests.datashard.lib.multicluster_test_base import MulticlusterTestBase +from ydb.tests.datashard.lib.dml_operations import DMLOperations +from ydb.tests.datashard.lib.types_of_variables import pk_types, non_pk_types, index_first, index_second, \ + index_first_sync, index_second_sync, index_three_sync, index_four_sync, index_zero_sync + + +class TestAsyncReplication(MulticlusterTestBase): + @pytest.mark.parametrize( + "table_name, pk_types, all_types, index, ttl, unique, sync", + [ + # ("table_index_4_UNIQUE_SYNC", pk_types, {}, + # index_four_sync, "", "UNIQUE", "SYNC"), + # ("table_index_3_UNIQUE_SYNC", pk_types, {}, + # index_three_sync_not_Bool, "", "UNIQUE", "SYNC"), + # ("table_index_2_UNIQUE_SYNC", pk_types, {}, + # index_second_sync, "", "UNIQUE", "SYNC"), + # ("table_index_1_UNIQUE_SYNC", pk_types, {}, + # index_first_sync, "", "UNIQUE", "SYNC"), + # ("table_index_0_UNIQUE_SYNC", pk_types, {}, + # index_zero_sync, "", "UNIQUE", "SYNC"), + ("table_index_4__SYNC", pk_types, {}, + index_four_sync, "", "", "SYNC"), + ("table_index_3__SYNC", pk_types, {}, + index_three_sync, "", "", "SYNC"), + ("table_index_2__SYNC", pk_types, {}, + index_second_sync, "", "", "SYNC"), + ("table_index_1__SYNC", pk_types, {}, + index_first_sync, "", "", "SYNC"), + ("table_index_0__SYNC", pk_types, {}, + index_zero_sync, "", "", "SYNC"), + ("table_index_1__ASYNC", pk_types, {}, index_second, "", "", "ASYNC"), + ("table_index_0__ASYNC", pk_types, {}, index_first, "", "", "ASYNC"), + ("table_all_types", pk_types, { + **pk_types, **non_pk_types}, {}, "", "", ""), + ("table_ttl_DyNumber", pk_types, {}, {}, "DyNumber", "", ""), + ("table_ttl_Uint32", pk_types, {}, {}, "Uint32", "", ""), + ("table_ttl_Uint64", pk_types, {}, {}, "Uint64", "", ""), + ("table_ttl_Datetime", pk_types, {}, {}, "Datetime", "", ""), + ("table_ttl_Timestamp", pk_types, {}, {}, "Timestamp", "", ""), + ("table_ttl_Date", pk_types, {}, {}, "Date", "", ""), + ] + ) + def test_async_replication(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str): + dml_cluster_1 = DMLOperations(Query.create( + self.get_database(), self.get_endpoint(self.clusters[0]))) + dml_cluster_2 = DMLOperations(Query.create( + self.get_database(), self.get_endpoint(self.clusters[1]))) + + dml_cluster_1.create_table(table_name, pk_types, all_types, + index, ttl, unique, sync) + dml_cluster_1.insert(table_name, all_types, pk_types, index, ttl) + dml_cluster_2.query(f""" + CREATE ASYNC REPLICATION `replication_{table_name}` + FOR `{self.get_database()}/{table_name}` AS `{self.get_database()}/{table_name}` + WITH ( + CONNECTION_STRING = 'grpc://{self.get_endpoint(self.clusters[0])}/?database={self.get_database()}' + ) + """) + for _ in range(100): + try: + dml_cluster_2.query( + f"select count(*) as count from {table_name}") + break + except Exception: + time.sleep(1) + dml_cluster_2.select_after_insert( + table_name, all_types, pk_types, index, ttl) + dml_cluster_1.query(f"delete from {table_name}") + assert wait_for(self.create_predicate(True, table_name, dml_cluster_2.query), + timeout_seconds=100) is True, "Expected zero rows after delete" + dml_cluster_1.insert(table_name, all_types, pk_types, index, ttl) + wait_for(self.create_predicate(False, table_name, + dml_cluster_2.query), timeout_seconds=100) + dml_cluster_2.select_after_insert( + table_name, all_types, pk_types, index, ttl) + + def create_predicate(self, is_zero, table_name, dml_cluster): + def predicate(): + rows = dml_cluster( + f"select count(*) as count from {table_name}") + if is_zero: + return len(rows) == 1 and rows[0].count == 0 + else: + return len(rows) == 1 and rows[0].count != 0 + return predicate diff --git a/ydb/tests/datashard/async_replication/ya.make b/ydb/tests/datashard/async_replication/ya.make new file mode 100644 index 00000000000..a6350c8eb5b --- /dev/null +++ b/ydb/tests/datashard/async_replication/ya.make @@ -0,0 +1,23 @@ +PY3TEST() +ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") + +FORK_SUBTESTS() +SPLIT_FACTOR(20) +SIZE(MEDIUM) + +TEST_SRCS( + test_async_replication.py +) + +PEERDIR( + ydb/tests/datashard/lib + ydb/tests/library + ydb/tests/sql/lib +) + +DEPENDS( + ydb/apps/ydb + ydb/apps/ydbd +) + +END() diff --git a/ydb/tests/datashard/dml/test_dml.py b/ydb/tests/datashard/dml/test_dml.py index 19317363aa0..3126d413035 100644 --- a/ydb/tests/datashard/dml/test_dml.py +++ b/ydb/tests/datashard/dml/test_dml.py @@ -1,10 +1,8 @@ import pytest -import math -from datetime import datetime, timedelta from ydb.tests.sql.lib.test_base import TestBase -from ydb.tests.datashard.lib.create_table import create_table_sql_request, create_ttl_sql_request -from ydb.tests.datashard.lib.types_of_variables import cleanup_type_name, format_sql_value, pk_types, non_pk_types, index_first, index_second, ttl_types, \ +from ydb.tests.datashard.lib.dml_operations import DMLOperations +from ydb.tests.datashard.lib.types_of_variables import pk_types, non_pk_types, index_first, index_second, \ index_first_sync, index_second_sync, index_three_sync, index_three_sync_not_Bool, index_four_sync, index_zero_sync @@ -45,393 +43,12 @@ class TestDML(TestBase): ] ) def test_dml(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str): - self.create_table(table_name, pk_types, all_types, - index, ttl, unique, sync) - self.insert(table_name, all_types, pk_types, index, ttl) - self.select_all_type(table_name, all_types, pk_types, index, ttl) - self.select_after_insert(table_name, all_types, pk_types, index, ttl) - self.update(table_name, all_types, index, ttl, unique) - self.upsert(table_name, all_types, pk_types, index, ttl) - self.delete(table_name, all_types, pk_types, index, ttl) - - def create_table(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str): - columns = { - "pk_": pk_types.keys(), - "col_": all_types.keys(), - "col_index_": index.keys(), - "ttl_": [ttl] - } - pk_columns = { - "pk_": pk_types.keys() - } - index_columns = { - "col_index_": index.keys() - } - sql_create_table = create_table_sql_request( - table_name, columns, pk_columns, index_columns, unique, sync) - self.query(sql_create_table) - if ttl != "": - sql_ttl = create_ttl_sql_request(f"ttl_{cleanup_type_name(ttl)}", {"P18262D": ""}, "SECONDS" if ttl == - "Uint32" or ttl == "Uint64" or ttl == "DyNumber" else "", table_name) - self.query(sql_ttl) - - def insert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): - number_of_columns = len(pk_types) + len(all_types) + len(index) - - if ttl != "": - number_of_columns += 1 - for count in range(1, number_of_columns + 1): - self.create_insert(table_name, count, all_types, - pk_types, index, ttl) - - def create_insert(self, table_name: str, value: int, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): - insert_sql = f""" - INSERT INTO {table_name}( - {", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""} - {", ".join(["col_" + cleanup_type_name(type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""} - {", ".join(["col_index_" + cleanup_type_name(type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""} - {f"ttl_{ttl}" if ttl != "" else ""} - ) - VALUES( - {", ".join([format_sql_value(pk_types[type_name](value), type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""} - {", ".join([format_sql_value(all_types[type_name](value), type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""} - {", ".join([format_sql_value(index[type_name](value), type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""} - {format_sql_value(ttl_types[ttl](value), ttl) if ttl != "" else ""} - ); - """ - self.query(insert_sql) - - def select_after_insert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): - - number_of_columns = len(pk_types) + len(all_types) + len(index) - - if ttl != "": - number_of_columns += 1 - - for count in range(1, number_of_columns + 1): - create_all_type = [] - for type_name in all_types.keys(): - if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": - create_all_type.append( - f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}") - sql_select = f""" - SELECT COUNT(*) as count FROM `{table_name}` WHERE - {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])} - {" and " if len(index) != 0 else ""} - {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}" for type_name in index.keys()])} - {" and " if len(create_all_type) != 0 else ""} - {" and ".join(create_all_type)} - {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" else ""} - """ - rows = self.query(sql_select) - assert len( - rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}" - - rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`") - assert len( - rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows, after select all line" - - def update(self, table_name: str, all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str): - count = 1 - - if ttl != "": - self.create_update( - count, "ttl_", ttl, ttl_types[ttl], table_name) - count += 1 - - for type_name in all_types.keys(): - self.create_update( - count, "col_", type_name, all_types[type_name], table_name) - count += 1 - - if unique == "": - for type_name in index.keys(): - self.create_update( - count, "col_index_", type_name, index[type_name], table_name) - count += 1 - else: - number_of_columns = len(pk_types) + len(all_types) + len(index)+1 - if ttl != "": - number_of_columns += 1 - for i in range(1, number_of_columns + 1): - self.create_update_unique( - number_of_columns + i, i, index, table_name) - - count_assert = 1 - - number_of_columns = len(pk_types) + len(all_types) + len(index) - if ttl != "": - number_of_columns += 1 - - if ttl != "": - rows = self.query( - f"SELECT COUNT(*) as count FROM `{table_name}` WHERE ttl_{cleanup_type_name(ttl)}={format_sql_value(ttl_types[ttl](count_assert), ttl)}") - assert len( - rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in ttl_{cleanup_type_name(ttl)}, table {table_name}" - count_assert += 1 - - for type_name in all_types.keys(): - if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": - rows = self.query( - f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count_assert), type_name)}") - assert len( - rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in col_{cleanup_type_name(type_name)}, table {table_name}" - count_assert += 1 - if unique == "": - for type_name in index.keys(): - rows = self.query( - f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count_assert), type_name)}") - assert len( - rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in col_index_{cleanup_type_name(type_name)}, table {table_name}" - count_assert += 1 - else: - number_of_columns = len(pk_types) + len(all_types) + len(index) + 2 - if ttl != "": - number_of_columns += 1 - for type_name in index.keys(): - rows = self.query( - f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns), type_name)}") - assert len( - rows) == 1 and rows[0].count == 1, f"Expected {1} rows after insert, faild in col_index_{cleanup_type_name(type_name)}, table {table_name}" - number_of_columns += 1 - - def create_update(self, value: int, prefix: str, type_name: str, key: str, table_name: str): - update_sql = f""" UPDATE `{table_name}` SET {prefix}{cleanup_type_name(type_name)} = {format_sql_value(key(value), type_name)} """ - self.query(update_sql) - - def create_update_unique(self, value: int, search: int, index: dict[str, str], table_name: str): - update_sql = f""" UPDATE `{table_name}` SET - {", ".join([f"col_index_{cleanup_type_name(type_name)} = {format_sql_value(index[type_name](value), type_name)}" for type_name in index.keys()])} - WHERE - {" and ".join(f"col_index_{cleanup_type_name(type_name)} = {format_sql_value(index[type_name](search), type_name)}" for type_name in index.keys())} - """ - self.query(update_sql) - - def upsert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): - number_of_columns = len(pk_types) + len(all_types) + len(index) - if ttl != "": - number_of_columns += 1 - - for count in range(1, number_of_columns+1): - self.create_upsert(table_name, number_of_columns + 1 - - count, count, all_types, pk_types, index, ttl) - - for count in range(number_of_columns+1, 2*number_of_columns+1): - self.create_upsert(table_name, count, count, - all_types, pk_types, index, ttl) - - for count in range(1, number_of_columns + 1): - create_all_type = [] - for type_name in all_types.keys(): - if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": - create_all_type.append( - f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](number_of_columns - count + 1), type_name)}") - sql_select = f""" - SELECT COUNT(*) as count FROM `{table_name}` WHERE - {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])} - {" and " if len(index) != 0 else ""} - {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns - count + 1), type_name)}" for type_name in index.keys()])} - {" and " if len(create_all_type) != 0 else ""} - {" and ".join(create_all_type)} - {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](number_of_columns - count + 1), ttl)}" if ttl != "" else ""} - """ - rows = self.query(sql_select) - assert len( - rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}" - - for count in range(number_of_columns + 1, 2*number_of_columns + 1): - create_all_type = [] - for type_name in all_types.keys(): - if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument" and ((type_name != "Date" and type_name != "Datetime") or count < 106): - create_all_type.append( - f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}") - create_pk = [] - for type_name in pk_types.keys(): - if (type_name != "Date" and type_name != "Datetime") or count < 106: - create_pk.append( - f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}") - create_index = [] - for type_name in index.keys(): - if (type_name != "Date" and type_name != "Datetime") or count < 106: - create_index.append( - f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}") - sql_select = f""" - SELECT COUNT(*) as count FROM `{table_name}` WHERE - {" and ".join(create_pk)} - {" and " if len(create_index) != 0 else ""} - {" and ".join(create_index)} - {" and " if len(create_all_type) != 0 else ""} - {" and ".join(create_all_type)} - {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" and ((type_name != "Date" and type_name != "Datetime") or count < 106) else ""} - """ - rows = self.query(sql_select) - assert len( - rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}" - rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`") - assert len( - rows) == 1 and rows[0].count == 2*number_of_columns, f"Expected {2*number_of_columns} rows, after select all line" - - def create_upsert(self, table_name: str, value: int, search: int, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): - upsert_sql = f""" - UPSERT INTO {table_name} ( - {", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""} - {", ".join(["col_" + cleanup_type_name(type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""} - {", ".join(["col_index_" + cleanup_type_name(type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""} - {f" ttl_{ttl}" if ttl != "" else ""} - ) - VALUES - ( - {", ".join([format_sql_value(pk_types[type_name](search), type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""} - {", ".join([format_sql_value(all_types[type_name](value), type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""} - {", ".join([format_sql_value(index[type_name](value), type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""} - {format_sql_value(ttl_types[ttl](value), ttl) if ttl != "" else ""} - ) - ; - """ - self.query(upsert_sql) - - def delete(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): - number_of_columns = len(pk_types) + len(all_types) + len(index) + 1 - - if ttl != "": - number_of_columns += 1 - - if ttl != "": - self.create_delete(number_of_columns, - "ttl_", ttl, ttl_types[ttl], table_name) - number_of_columns += 1 - - for type_name in pk_types.keys(): - if type_name != "Bool": - self.create_delete( - number_of_columns, "pk_", type_name, pk_types[type_name], table_name) - else: - self.create_delete( - number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name) - number_of_columns += 1 - - for type_name in all_types.keys(): - if type_name != "Bool" and type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": - self.create_delete( - number_of_columns, "col_", type_name, all_types[type_name], table_name) - else: - self.create_delete( - number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name) - number_of_columns += 1 - - for type_name in index.keys(): - if type_name != "Bool": - self.create_delete( - number_of_columns, "col_index_", type_name, index[type_name], table_name) - else: - self.create_delete( - number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name) - number_of_columns += 1 - - number_of_columns = len(pk_types) + len(all_types) + len(index) - - if ttl != "": - number_of_columns += 1 - - for count in range(1, number_of_columns + 1): - create_all_type = [] - for type_name in all_types.keys(): - if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": - create_all_type.append( - f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](number_of_columns - count + 1), type_name)}") - sql_select = f""" - SELECT COUNT(*) as count FROM `{table_name}` WHERE - {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])} - {" and " if len(index) != 0 else ""} - {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns - count + 1), type_name)}" for type_name in index.keys()])} - {" and " if len(create_all_type) != 0 else ""} - {" and ".join(create_all_type)} - {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](number_of_columns - count + 1), ttl)}" if ttl != "" else ""} - """ - rows = self.query(sql_select) - assert len( - rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}" - - for count in range(number_of_columns + 1, 2*number_of_columns + 1): - create_all_type = [] - for type_name in all_types.keys(): - if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": - create_all_type.append( - f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}") - sql_select = f""" - SELECT COUNT(*) as count FROM `{table_name}` WHERE - {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])} - {" and " if len(index) != 0 else ""} - {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}" for type_name in index.keys()])} - {" and " if len(create_all_type) != 0 else ""} - {" and ".join(create_all_type)} - {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" and ((type_name != "Date" and type_name != "Datetime") or count < 106) else ""} - """ - rows = self.query(sql_select) - assert len( - rows) == 1 and rows[0].count == 0, f"Expected one rows, faild in {count} value, table {table_name}" - rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`") - assert len( - rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows, after select all line" - - def create_delete(self, value: int, prefix: str, type_name: str, key: str, table_name: str): - delete_sql = f""" - DELETE FROM {table_name} WHERE {prefix}{cleanup_type_name(type_name)} = {format_sql_value(key(value), type_name)}; - """ - self.query(delete_sql) - - def select_all_type(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): - statements = [] - # delete if after https://github.com/ydb-platform/ydb/issues/16930 - for type in all_types.keys(): - if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64': - statements.append(f"col_{cleanup_type_name(type)}") - for type in pk_types.keys(): - if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64': - statements.append(f"pk_{cleanup_type_name(type)}") - for type in index.keys(): - if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64': - statements.append(f"col_index_{cleanup_type_name(type)}") - if ttl != "": - statements.append(f"ttl_{cleanup_type_name(ttl)}") - - rows = self.query(f"select {", ".join(statements)} from {table_name}") - count = 0 - for type in all_types.keys(): - if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64': - for i in range(len(rows)): - self.assert_type(all_types, type, i+1, rows[i][count]) - count += 1 - for type in pk_types.keys(): - if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64': - for i in range(len(rows)): - self.assert_type(pk_types, type, i+1, rows[i][count]) - count += 1 - for type in index.keys(): - if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64': - for i in range(len(rows)): - self.assert_type(index, type, i+1, rows[i][count]) - count += 1 - if ttl != "": - for i in range(len(rows)): - self.assert_type(ttl_types, ttl, i+1, rows[i][count]) - count += 1 - - def assert_type(self, key, type: str, values: int, values_from_rows): - if type == "String" or type == "Yson": - assert values_from_rows.decode( - "utf-8") == key[type](values), f"{type}" - elif type == "Float" or type == "DyNumber": - assert math.isclose(float(values_from_rows), float( - key[type](values)), rel_tol=1e-3), f"{type}" - elif type == "Interval" or type == "Interval64": - assert values_from_rows == timedelta( - microseconds=key[type](values)), f"{type}" - elif type == "Timestamp" or type == "Timestamp64": - assert values_from_rows == datetime.fromtimestamp( - key[type](values)/1_000_000), f"{type}" - elif type == "Json" or type == "JsonDocument": - assert str(values_from_rows).replace( - "'", "\"") == str(key[type](values)), f"{type}" - else: - assert str(values_from_rows) == str(key[type](values)), f"{type}" + dml = DMLOperations(self) + dml.create_table(table_name, pk_types, all_types, + index, ttl, unique, sync) + dml.insert(table_name, all_types, pk_types, index, ttl) + dml.select_all_type(table_name, all_types, pk_types, index, ttl) + dml.select_after_insert(table_name, all_types, pk_types, index, ttl) + dml.update(table_name, all_types, pk_types, index, ttl, unique) + dml.upsert(table_name, all_types, pk_types, index, ttl) + dml.delete(table_name, all_types, pk_types, index, ttl) diff --git a/ydb/tests/datashard/lib/dml_operations.py b/ydb/tests/datashard/lib/dml_operations.py new file mode 100644 index 00000000000..9db3abfa88b --- /dev/null +++ b/ydb/tests/datashard/lib/dml_operations.py @@ -0,0 +1,396 @@ +import math +from datetime import datetime, timedelta + +from ydb.tests.sql.lib.test_query import Query +from ydb.tests.datashard.lib.create_table import create_table_sql_request, create_ttl_sql_request +from ydb.tests.datashard.lib.types_of_variables import cleanup_type_name, format_sql_value, ttl_types + + +class DMLOperations(): + def __init__(self, query_object: Query): + self.query_object = query_object + + def query(self, text): + return self.query_object.query(text) + + def create_table(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str): + columns = { + "pk_": pk_types.keys(), + "col_": all_types.keys(), + "col_index_": index.keys(), + "ttl_": [ttl] + } + pk_columns = { + "pk_": pk_types.keys() + } + index_columns = { + "col_index_": index.keys() + } + sql_create_table = create_table_sql_request( + table_name, columns, pk_columns, index_columns, unique, sync) + self.query(sql_create_table) + if ttl != "": + sql_ttl = create_ttl_sql_request(f"ttl_{cleanup_type_name(ttl)}", {"P18262D": ""}, "SECONDS" if ttl == + "Uint32" or ttl == "Uint64" or ttl == "DyNumber" else "", table_name) + self.query(sql_ttl) + + def insert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): + number_of_columns = len(pk_types) + len(all_types) + len(index) + + if ttl != "": + number_of_columns += 1 + for count in range(1, number_of_columns + 1): + self.create_insert(table_name, count, all_types, + pk_types, index, ttl) + + def create_insert(self, table_name: str, value: int, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): + insert_sql = f""" + INSERT INTO {table_name}( + {", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""} + {", ".join(["col_" + cleanup_type_name(type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""} + {", ".join(["col_index_" + cleanup_type_name(type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""} + {f"ttl_{ttl}" if ttl != "" else ""} + ) + VALUES( + {", ".join([format_sql_value(pk_types[type_name](value), type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""} + {", ".join([format_sql_value(all_types[type_name](value), type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""} + {", ".join([format_sql_value(index[type_name](value), type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""} + {format_sql_value(ttl_types[ttl](value), ttl) if ttl != "" else ""} + ); + """ + self.query(insert_sql) + + def select_after_insert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): + + number_of_columns = len(pk_types) + len(all_types) + len(index) + + if ttl != "": + number_of_columns += 1 + + for count in range(1, number_of_columns + 1): + create_all_type = [] + for type_name in all_types.keys(): + if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": + create_all_type.append( + f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}") + sql_select = f""" + SELECT COUNT(*) as count FROM `{table_name}` WHERE + {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])} + {" and " if len(index) != 0 else ""} + {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}" for type_name in index.keys()])} + {" and " if len(create_all_type) != 0 else ""} + {" and ".join(create_all_type)} + {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" else ""} + """ + rows = self.query(sql_select) + assert len( + rows) == 1 and rows[0].count == 1, f"Expected one rows, failed in {count} value, table {table_name}" + + rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`") + assert len( + rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows, after select all line" + + def update(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str, unique: str): + count = 1 + + if ttl != "": + self.create_update( + count, "ttl_", ttl, ttl_types[ttl], table_name) + count += 1 + + for type_name in all_types.keys(): + self.create_update( + count, "col_", type_name, all_types[type_name], table_name) + count += 1 + + if unique == "": + for type_name in index.keys(): + self.create_update( + count, "col_index_", type_name, index[type_name], table_name) + count += 1 + else: + number_of_columns = len(pk_types) + len(all_types) + len(index)+1 + if ttl != "": + number_of_columns += 1 + for i in range(1, number_of_columns + 1): + self.create_update_unique( + number_of_columns + i, i, index, table_name) + + count_assert = 1 + + number_of_columns = len(pk_types) + len(all_types) + len(index) + if ttl != "": + number_of_columns += 1 + + if ttl != "": + rows = self.query( + f"SELECT COUNT(*) as count FROM `{table_name}` WHERE ttl_{cleanup_type_name(ttl)}={format_sql_value(ttl_types[ttl](count_assert), ttl)}") + assert len( + rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in ttl_{cleanup_type_name(ttl)}, table {table_name}" + count_assert += 1 + + for type_name in all_types.keys(): + if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": + rows = self.query( + f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count_assert), type_name)}") + assert len( + rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in col_{cleanup_type_name(type_name)}, table {table_name}" + count_assert += 1 + if unique == "": + for type_name in index.keys(): + rows = self.query( + f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count_assert), type_name)}") + assert len( + rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in col_index_{cleanup_type_name(type_name)}, table {table_name}" + count_assert += 1 + else: + number_of_columns = len(pk_types) + len(all_types) + len(index) + 2 + if ttl != "": + number_of_columns += 1 + for type_name in index.keys(): + rows = self.query( + f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns), type_name)}") + assert len( + rows) == 1 and rows[0].count == 1, f"Expected {1} rows after insert, faild in col_index_{cleanup_type_name(type_name)}, table {table_name}" + number_of_columns += 1 + + def create_update(self, value: int, prefix: str, type_name: str, key: str, table_name: str): + update_sql = f""" UPDATE `{table_name}` SET {prefix}{cleanup_type_name(type_name)} = {format_sql_value(key(value), type_name)} """ + self.query(update_sql) + + def create_update_unique(self, value: int, search: int, index: dict[str, str], table_name: str): + update_sql = f""" UPDATE `{table_name}` SET + {", ".join([f"col_index_{cleanup_type_name(type_name)} = {format_sql_value(index[type_name](value), type_name)}" for type_name in index.keys()])} + WHERE + {" and ".join(f"col_index_{cleanup_type_name(type_name)} = {format_sql_value(index[type_name](search), type_name)}" for type_name in index.keys())} + """ + self.query(update_sql) + + def upsert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): + number_of_columns = len(pk_types) + len(all_types) + len(index) + if ttl != "": + number_of_columns += 1 + + for count in range(1, number_of_columns+1): + self.create_upsert(table_name, number_of_columns + 1 - + count, count, all_types, pk_types, index, ttl) + + for count in range(number_of_columns+1, 2*number_of_columns+1): + self.create_upsert(table_name, count, count, + all_types, pk_types, index, ttl) + + for count in range(1, number_of_columns + 1): + create_all_type = [] + for type_name in all_types.keys(): + if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": + create_all_type.append( + f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](number_of_columns - count + 1), type_name)}") + sql_select = f""" + SELECT COUNT(*) as count FROM `{table_name}` WHERE + {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])} + {" and " if len(index) != 0 else ""} + {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns - count + 1), type_name)}" for type_name in index.keys()])} + {" and " if len(create_all_type) != 0 else ""} + {" and ".join(create_all_type)} + {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](number_of_columns - count + 1), ttl)}" if ttl != "" else ""} + """ + rows = self.query(sql_select) + assert len( + rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}" + + for count in range(number_of_columns + 1, 2*number_of_columns + 1): + create_all_type = [] + for type_name in all_types.keys(): + if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument" and ((type_name != "Date" and type_name != "Datetime") or count < 106): + create_all_type.append( + f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}") + create_pk = [] + for type_name in pk_types.keys(): + if (type_name != "Date" and type_name != "Datetime") or count < 106: + create_pk.append( + f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}") + create_index = [] + for type_name in index.keys(): + if (type_name != "Date" and type_name != "Datetime") or count < 106: + create_index.append( + f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}") + sql_select = f""" + SELECT COUNT(*) as count FROM `{table_name}` WHERE + {" and ".join(create_pk)} + {" and " if len(create_index) != 0 else ""} + {" and ".join(create_index)} + {" and " if len(create_all_type) != 0 else ""} + {" and ".join(create_all_type)} + {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" and ((type_name != "Date" and type_name != "Datetime") or count < 106) else ""} + """ + rows = self.query(sql_select) + assert len( + rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}" + rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`") + assert len( + rows) == 1 and rows[0].count == 2*number_of_columns, f"Expected {2*number_of_columns} rows, after select all line" + + def create_upsert(self, table_name: str, value: int, search: int, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): + upsert_sql = f""" + UPSERT INTO {table_name} ( + {", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""} + {", ".join(["col_" + cleanup_type_name(type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""} + {", ".join(["col_index_" + cleanup_type_name(type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""} + {f" ttl_{ttl}" if ttl != "" else ""} + ) + VALUES + ( + {", ".join([format_sql_value(pk_types[type_name](search), type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""} + {", ".join([format_sql_value(all_types[type_name](value), type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""} + {", ".join([format_sql_value(index[type_name](value), type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""} + {format_sql_value(ttl_types[ttl](value), ttl) if ttl != "" else ""} + ) + ; + """ + self.query(upsert_sql) + + def delete(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): + number_of_columns = len(pk_types) + len(all_types) + len(index) + 1 + + if ttl != "": + number_of_columns += 1 + + if ttl != "": + self.create_delete(number_of_columns, + "ttl_", ttl, ttl_types[ttl], table_name) + number_of_columns += 1 + + for type_name in pk_types.keys(): + if type_name != "Bool": + self.create_delete( + number_of_columns, "pk_", type_name, pk_types[type_name], table_name) + else: + self.create_delete( + number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name) + number_of_columns += 1 + + for type_name in all_types.keys(): + if type_name != "Bool" and type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": + self.create_delete( + number_of_columns, "col_", type_name, all_types[type_name], table_name) + else: + self.create_delete( + number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name) + number_of_columns += 1 + + for type_name in index.keys(): + if type_name != "Bool": + self.create_delete( + number_of_columns, "col_index_", type_name, index[type_name], table_name) + else: + self.create_delete( + number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name) + number_of_columns += 1 + + number_of_columns = len(pk_types) + len(all_types) + len(index) + + if ttl != "": + number_of_columns += 1 + + for count in range(1, number_of_columns + 1): + create_all_type = [] + for type_name in all_types.keys(): + if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": + create_all_type.append( + f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](number_of_columns - count + 1), type_name)}") + sql_select = f""" + SELECT COUNT(*) as count FROM `{table_name}` WHERE + {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])} + {" and " if len(index) != 0 else ""} + {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns - count + 1), type_name)}" for type_name in index.keys()])} + {" and " if len(create_all_type) != 0 else ""} + {" and ".join(create_all_type)} + {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](number_of_columns - count + 1), ttl)}" if ttl != "" else ""} + """ + rows = self.query(sql_select) + assert len( + rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}" + + for count in range(number_of_columns + 1, 2*number_of_columns + 1): + create_all_type = [] + for type_name in all_types.keys(): + if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument": + create_all_type.append( + f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}") + sql_select = f""" + SELECT COUNT(*) as count FROM `{table_name}` WHERE + {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])} + {" and " if len(index) != 0 else ""} + {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}" for type_name in index.keys()])} + {" and " if len(create_all_type) != 0 else ""} + {" and ".join(create_all_type)} + {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" and ((type_name != "Date" and type_name != "Datetime") or count < 106) else ""} + """ + rows = self.query(sql_select) + assert len( + rows) == 1 and rows[0].count == 0, f"Expected one rows, faild in {count} value, table {table_name}" + rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`") + assert len( + rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows, after select all line" + + def create_delete(self, value: int, prefix: str, type_name: str, key: str, table_name: str): + delete_sql = f""" + DELETE FROM {table_name} WHERE {prefix}{cleanup_type_name(type_name)} = {format_sql_value(key(value), type_name)}; + """ + self.query(delete_sql) + + def select_all_type(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str): + statements = [] + # delete if after https://github.com/ydb-platform/ydb/issues/16930 + for data_type in all_types.keys(): + if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64': + statements.append(f"col_{cleanup_type_name(data_type)}") + for data_type in pk_types.keys(): + if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64': + statements.append(f"pk_{cleanup_type_name(data_type)}") + for data_type in index.keys(): + if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64': + statements.append(f"col_index_{cleanup_type_name(data_type)}") + if ttl != "": + statements.append(f"ttl_{cleanup_type_name(ttl)}") + + rows = self.query(f"select {", ".join(statements)} from {table_name}") + count = 0 + for data_type in all_types.keys(): + if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64': + for i in range(len(rows)): + self.assert_type(all_types, data_type, i+1, rows[i][count]) + count += 1 + for data_type in pk_types.keys(): + if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64': + for i in range(len(rows)): + self.assert_type(pk_types, data_type, i+1, rows[i][count]) + count += 1 + for data_type in index.keys(): + if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64': + for i in range(len(rows)): + self.assert_type(index, data_type, i+1, rows[i][count]) + count += 1 + if ttl != "": + for i in range(len(rows)): + self.assert_type(ttl_types, ttl, i+1, rows[i][count]) + count += 1 + + def assert_type(self, key, data_type: str, values: int, values_from_rows): + if data_type == "String" or data_type == "Yson": + assert values_from_rows.decode( + "utf-8") == key[data_type](values), f"{data_type}" + elif data_type == "Float" or data_type == "DyNumber": + assert math.isclose(float(values_from_rows), float( + key[data_type](values)), rel_tol=1e-3), f"{data_type}" + elif data_type == "Interval" or data_type == "Interval64": + assert values_from_rows == timedelta( + microseconds=key[data_type](values)), f"{data_type}" + elif data_type == "Timestamp" or data_type == "Timestamp64": + assert values_from_rows == datetime.fromtimestamp( + key[data_type](values)/1_000_000), f"{data_type}" + elif data_type == "Json" or data_type == "JsonDocument": + assert str(values_from_rows).replace( + "'", "\"") == str(key[data_type](values)), f"{data_type}" + else: + assert str(values_from_rows) == str(key[data_type](values)), f"{data_type}" diff --git a/ydb/tests/datashard/lib/multicluster_test_base.py b/ydb/tests/datashard/lib/multicluster_test_base.py new file mode 100644 index 00000000000..974271631c3 --- /dev/null +++ b/ydb/tests/datashard/lib/multicluster_test_base.py @@ -0,0 +1,70 @@ +import os +import yatest.common +import logging +import hashlib + + +from ydb.tests.library.harness.kikimr_runner import KiKiMR +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator +from ydb.tests.library.common.types import Erasure +from ydb.tests.library.harness.util import LogLevels + +logger = logging.getLogger(__name__) + + +class MulticlusterTestBase(): + @classmethod + def setup_class(cls): + ydb_path = yatest.common.build_path(os.environ.get( + "YDB_DRIVER_BINARY", "ydb/apps/ydbd/ydbd")) + logger.error(yatest.common.execute( + [ydb_path, "-V"], wait=True).stdout.decode("utf-8")) + + cls.ydb_cli_path = yatest.common.build_path("ydb/apps/ydb/ydb") + + cls.database = "/Root" + cls.clusters = [cls.build_cluster(), cls.build_cluster()] + + @classmethod + def build_cluster(cls): + cluster = KiKiMR(KikimrConfigGenerator(erasure=cls.get_cluster_configuration(), + extra_feature_flags=["enable_resource_pools", + "enable_external_data_sources", + "enable_tiering_in_column_shard"], + column_shard_config={ + 'disabled_on_scheme_shard': False, + 'lag_for_compaction_before_tierings_ms': 0, + 'compaction_actualization_lag_ms': 0, + 'optimizer_freshness_check_duration_ms': 0, + 'small_portion_detect_size_limit': 0, + }, + additional_log_configs={ + 'TX_TIERING': LogLevels.DEBUG})) + cluster.start() + return cluster + + @staticmethod + def get_cluster_configuration(): + return Erasure.NONE + + @classmethod + def get_database(cls): + return cls.database + + @staticmethod + def get_endpoint(cluster): + return "%s:%s" % ( + cluster.nodes[1].host, cluster.nodes[1].port + ) + + @classmethod + def teardown_class(cls): + for cluster in cls.clusters: + cluster.stop() + + def setup_method(self): + current_test_full_name = os.environ.get("PYTEST_CURRENT_TEST") + self.table_path = "insert_table_" + \ + current_test_full_name.replace("::", ".").removesuffix(" (setup)") + self.hash = hashlib.md5(self.table_path.encode()).hexdigest() + self.hash_short = self.hash[:8] diff --git a/ydb/tests/datashard/lib/ya.make b/ydb/tests/datashard/lib/ya.make index ec3405575e0..ddeba33cd5f 100644 --- a/ydb/tests/datashard/lib/ya.make +++ b/ydb/tests/datashard/lib/ya.make @@ -1,8 +1,17 @@ PY3_LIBRARY() +ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") +ENV(MOTO_SERVER_PATH="contrib/python/moto/bin/moto_server") PY_SRCS( + dml_operations.py create_table.py types_of_variables.py + multicluster_test_base.py +) + +PEERDIR( + ydb/tests/library + ydb/tests/sql/lib ) END() diff --git a/ydb/tests/datashard/ya.make b/ydb/tests/datashard/ya.make index 6a74e3cb5d3..9adfddd954c 100644 --- a/ydb/tests/datashard/ya.make +++ b/ydb/tests/datashard/ya.make @@ -1,4 +1,5 @@ RECURSE( - lib + async_replication dml + lib ) |