aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordennnniska <dennnis@ydb.tech>2025-04-18 15:12:51 +0300
committerGitHub <noreply@github.com>2025-04-18 15:12:51 +0300
commit68c43aed9070c11917f3b1bb79f11197910d4a6a (patch)
treebdf0bd8b0c81f1d1b3384574a86fd23d0bbe9b2c
parent62614f2f7fa863a2743bb9fabbbb6816f5b79426 (diff)
downloadydb-68c43aed9070c11917f3b1bb79f11197910d4a6a.tar.gz
Test for async replication for all types (#16670)
-rw-r--r--ydb/tests/datashard/async_replication/test_async_replication.py90
-rw-r--r--ydb/tests/datashard/async_replication/ya.make23
-rw-r--r--ydb/tests/datashard/dml/test_dml.py405
-rw-r--r--ydb/tests/datashard/lib/dml_operations.py396
-rw-r--r--ydb/tests/datashard/lib/multicluster_test_base.py70
-rw-r--r--ydb/tests/datashard/lib/ya.make9
-rw-r--r--ydb/tests/datashard/ya.make3
7 files changed, 601 insertions, 395 deletions
diff --git a/ydb/tests/datashard/async_replication/test_async_replication.py b/ydb/tests/datashard/async_replication/test_async_replication.py
new file mode 100644
index 00000000000..793d2a1e590
--- /dev/null
+++ b/ydb/tests/datashard/async_replication/test_async_replication.py
@@ -0,0 +1,90 @@
+import pytest
+import time
+
+from ydb.tests.sql.lib.test_query import Query
+from ydb.tests.library.common.wait_for import wait_for
+from ydb.tests.datashard.lib.multicluster_test_base import MulticlusterTestBase
+from ydb.tests.datashard.lib.dml_operations import DMLOperations
+from ydb.tests.datashard.lib.types_of_variables import pk_types, non_pk_types, index_first, index_second, \
+ index_first_sync, index_second_sync, index_three_sync, index_four_sync, index_zero_sync
+
+
+class TestAsyncReplication(MulticlusterTestBase):
+ @pytest.mark.parametrize(
+ "table_name, pk_types, all_types, index, ttl, unique, sync",
+ [
+ # ("table_index_4_UNIQUE_SYNC", pk_types, {},
+ # index_four_sync, "", "UNIQUE", "SYNC"),
+ # ("table_index_3_UNIQUE_SYNC", pk_types, {},
+ # index_three_sync_not_Bool, "", "UNIQUE", "SYNC"),
+ # ("table_index_2_UNIQUE_SYNC", pk_types, {},
+ # index_second_sync, "", "UNIQUE", "SYNC"),
+ # ("table_index_1_UNIQUE_SYNC", pk_types, {},
+ # index_first_sync, "", "UNIQUE", "SYNC"),
+ # ("table_index_0_UNIQUE_SYNC", pk_types, {},
+ # index_zero_sync, "", "UNIQUE", "SYNC"),
+ ("table_index_4__SYNC", pk_types, {},
+ index_four_sync, "", "", "SYNC"),
+ ("table_index_3__SYNC", pk_types, {},
+ index_three_sync, "", "", "SYNC"),
+ ("table_index_2__SYNC", pk_types, {},
+ index_second_sync, "", "", "SYNC"),
+ ("table_index_1__SYNC", pk_types, {},
+ index_first_sync, "", "", "SYNC"),
+ ("table_index_0__SYNC", pk_types, {},
+ index_zero_sync, "", "", "SYNC"),
+ ("table_index_1__ASYNC", pk_types, {}, index_second, "", "", "ASYNC"),
+ ("table_index_0__ASYNC", pk_types, {}, index_first, "", "", "ASYNC"),
+ ("table_all_types", pk_types, {
+ **pk_types, **non_pk_types}, {}, "", "", ""),
+ ("table_ttl_DyNumber", pk_types, {}, {}, "DyNumber", "", ""),
+ ("table_ttl_Uint32", pk_types, {}, {}, "Uint32", "", ""),
+ ("table_ttl_Uint64", pk_types, {}, {}, "Uint64", "", ""),
+ ("table_ttl_Datetime", pk_types, {}, {}, "Datetime", "", ""),
+ ("table_ttl_Timestamp", pk_types, {}, {}, "Timestamp", "", ""),
+ ("table_ttl_Date", pk_types, {}, {}, "Date", "", ""),
+ ]
+ )
+ def test_async_replication(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str):
+ dml_cluster_1 = DMLOperations(Query.create(
+ self.get_database(), self.get_endpoint(self.clusters[0])))
+ dml_cluster_2 = DMLOperations(Query.create(
+ self.get_database(), self.get_endpoint(self.clusters[1])))
+
+ dml_cluster_1.create_table(table_name, pk_types, all_types,
+ index, ttl, unique, sync)
+ dml_cluster_1.insert(table_name, all_types, pk_types, index, ttl)
+ dml_cluster_2.query(f"""
+ CREATE ASYNC REPLICATION `replication_{table_name}`
+ FOR `{self.get_database()}/{table_name}` AS `{self.get_database()}/{table_name}`
+ WITH (
+ CONNECTION_STRING = 'grpc://{self.get_endpoint(self.clusters[0])}/?database={self.get_database()}'
+ )
+ """)
+ for _ in range(100):
+ try:
+ dml_cluster_2.query(
+ f"select count(*) as count from {table_name}")
+ break
+ except Exception:
+ time.sleep(1)
+ dml_cluster_2.select_after_insert(
+ table_name, all_types, pk_types, index, ttl)
+ dml_cluster_1.query(f"delete from {table_name}")
+ assert wait_for(self.create_predicate(True, table_name, dml_cluster_2.query),
+ timeout_seconds=100) is True, "Expected zero rows after delete"
+ dml_cluster_1.insert(table_name, all_types, pk_types, index, ttl)
+ wait_for(self.create_predicate(False, table_name,
+ dml_cluster_2.query), timeout_seconds=100)
+ dml_cluster_2.select_after_insert(
+ table_name, all_types, pk_types, index, ttl)
+
+ def create_predicate(self, is_zero, table_name, dml_cluster):
+ def predicate():
+ rows = dml_cluster(
+ f"select count(*) as count from {table_name}")
+ if is_zero:
+ return len(rows) == 1 and rows[0].count == 0
+ else:
+ return len(rows) == 1 and rows[0].count != 0
+ return predicate
diff --git a/ydb/tests/datashard/async_replication/ya.make b/ydb/tests/datashard/async_replication/ya.make
new file mode 100644
index 00000000000..a6350c8eb5b
--- /dev/null
+++ b/ydb/tests/datashard/async_replication/ya.make
@@ -0,0 +1,23 @@
+PY3TEST()
+ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
+
+FORK_SUBTESTS()
+SPLIT_FACTOR(20)
+SIZE(MEDIUM)
+
+TEST_SRCS(
+ test_async_replication.py
+)
+
+PEERDIR(
+ ydb/tests/datashard/lib
+ ydb/tests/library
+ ydb/tests/sql/lib
+)
+
+DEPENDS(
+ ydb/apps/ydb
+ ydb/apps/ydbd
+)
+
+END()
diff --git a/ydb/tests/datashard/dml/test_dml.py b/ydb/tests/datashard/dml/test_dml.py
index 19317363aa0..3126d413035 100644
--- a/ydb/tests/datashard/dml/test_dml.py
+++ b/ydb/tests/datashard/dml/test_dml.py
@@ -1,10 +1,8 @@
import pytest
-import math
-from datetime import datetime, timedelta
from ydb.tests.sql.lib.test_base import TestBase
-from ydb.tests.datashard.lib.create_table import create_table_sql_request, create_ttl_sql_request
-from ydb.tests.datashard.lib.types_of_variables import cleanup_type_name, format_sql_value, pk_types, non_pk_types, index_first, index_second, ttl_types, \
+from ydb.tests.datashard.lib.dml_operations import DMLOperations
+from ydb.tests.datashard.lib.types_of_variables import pk_types, non_pk_types, index_first, index_second, \
index_first_sync, index_second_sync, index_three_sync, index_three_sync_not_Bool, index_four_sync, index_zero_sync
@@ -45,393 +43,12 @@ class TestDML(TestBase):
]
)
def test_dml(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str):
- self.create_table(table_name, pk_types, all_types,
- index, ttl, unique, sync)
- self.insert(table_name, all_types, pk_types, index, ttl)
- self.select_all_type(table_name, all_types, pk_types, index, ttl)
- self.select_after_insert(table_name, all_types, pk_types, index, ttl)
- self.update(table_name, all_types, index, ttl, unique)
- self.upsert(table_name, all_types, pk_types, index, ttl)
- self.delete(table_name, all_types, pk_types, index, ttl)
-
- def create_table(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str):
- columns = {
- "pk_": pk_types.keys(),
- "col_": all_types.keys(),
- "col_index_": index.keys(),
- "ttl_": [ttl]
- }
- pk_columns = {
- "pk_": pk_types.keys()
- }
- index_columns = {
- "col_index_": index.keys()
- }
- sql_create_table = create_table_sql_request(
- table_name, columns, pk_columns, index_columns, unique, sync)
- self.query(sql_create_table)
- if ttl != "":
- sql_ttl = create_ttl_sql_request(f"ttl_{cleanup_type_name(ttl)}", {"P18262D": ""}, "SECONDS" if ttl ==
- "Uint32" or ttl == "Uint64" or ttl == "DyNumber" else "", table_name)
- self.query(sql_ttl)
-
- def insert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
- number_of_columns = len(pk_types) + len(all_types) + len(index)
-
- if ttl != "":
- number_of_columns += 1
- for count in range(1, number_of_columns + 1):
- self.create_insert(table_name, count, all_types,
- pk_types, index, ttl)
-
- def create_insert(self, table_name: str, value: int, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
- insert_sql = f"""
- INSERT INTO {table_name}(
- {", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""}
- {", ".join(["col_" + cleanup_type_name(type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""}
- {", ".join(["col_index_" + cleanup_type_name(type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""}
- {f"ttl_{ttl}" if ttl != "" else ""}
- )
- VALUES(
- {", ".join([format_sql_value(pk_types[type_name](value), type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""}
- {", ".join([format_sql_value(all_types[type_name](value), type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""}
- {", ".join([format_sql_value(index[type_name](value), type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""}
- {format_sql_value(ttl_types[ttl](value), ttl) if ttl != "" else ""}
- );
- """
- self.query(insert_sql)
-
- def select_after_insert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
-
- number_of_columns = len(pk_types) + len(all_types) + len(index)
-
- if ttl != "":
- number_of_columns += 1
-
- for count in range(1, number_of_columns + 1):
- create_all_type = []
- for type_name in all_types.keys():
- if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
- create_all_type.append(
- f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}")
- sql_select = f"""
- SELECT COUNT(*) as count FROM `{table_name}` WHERE
- {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])}
- {" and " if len(index) != 0 else ""}
- {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}" for type_name in index.keys()])}
- {" and " if len(create_all_type) != 0 else ""}
- {" and ".join(create_all_type)}
- {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" else ""}
- """
- rows = self.query(sql_select)
- assert len(
- rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}"
-
- rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`")
- assert len(
- rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows, after select all line"
-
- def update(self, table_name: str, all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str):
- count = 1
-
- if ttl != "":
- self.create_update(
- count, "ttl_", ttl, ttl_types[ttl], table_name)
- count += 1
-
- for type_name in all_types.keys():
- self.create_update(
- count, "col_", type_name, all_types[type_name], table_name)
- count += 1
-
- if unique == "":
- for type_name in index.keys():
- self.create_update(
- count, "col_index_", type_name, index[type_name], table_name)
- count += 1
- else:
- number_of_columns = len(pk_types) + len(all_types) + len(index)+1
- if ttl != "":
- number_of_columns += 1
- for i in range(1, number_of_columns + 1):
- self.create_update_unique(
- number_of_columns + i, i, index, table_name)
-
- count_assert = 1
-
- number_of_columns = len(pk_types) + len(all_types) + len(index)
- if ttl != "":
- number_of_columns += 1
-
- if ttl != "":
- rows = self.query(
- f"SELECT COUNT(*) as count FROM `{table_name}` WHERE ttl_{cleanup_type_name(ttl)}={format_sql_value(ttl_types[ttl](count_assert), ttl)}")
- assert len(
- rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in ttl_{cleanup_type_name(ttl)}, table {table_name}"
- count_assert += 1
-
- for type_name in all_types.keys():
- if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
- rows = self.query(
- f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count_assert), type_name)}")
- assert len(
- rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in col_{cleanup_type_name(type_name)}, table {table_name}"
- count_assert += 1
- if unique == "":
- for type_name in index.keys():
- rows = self.query(
- f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count_assert), type_name)}")
- assert len(
- rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in col_index_{cleanup_type_name(type_name)}, table {table_name}"
- count_assert += 1
- else:
- number_of_columns = len(pk_types) + len(all_types) + len(index) + 2
- if ttl != "":
- number_of_columns += 1
- for type_name in index.keys():
- rows = self.query(
- f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns), type_name)}")
- assert len(
- rows) == 1 and rows[0].count == 1, f"Expected {1} rows after insert, faild in col_index_{cleanup_type_name(type_name)}, table {table_name}"
- number_of_columns += 1
-
- def create_update(self, value: int, prefix: str, type_name: str, key: str, table_name: str):
- update_sql = f""" UPDATE `{table_name}` SET {prefix}{cleanup_type_name(type_name)} = {format_sql_value(key(value), type_name)} """
- self.query(update_sql)
-
- def create_update_unique(self, value: int, search: int, index: dict[str, str], table_name: str):
- update_sql = f""" UPDATE `{table_name}` SET
- {", ".join([f"col_index_{cleanup_type_name(type_name)} = {format_sql_value(index[type_name](value), type_name)}" for type_name in index.keys()])}
- WHERE
- {" and ".join(f"col_index_{cleanup_type_name(type_name)} = {format_sql_value(index[type_name](search), type_name)}" for type_name in index.keys())}
- """
- self.query(update_sql)
-
- def upsert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
- number_of_columns = len(pk_types) + len(all_types) + len(index)
- if ttl != "":
- number_of_columns += 1
-
- for count in range(1, number_of_columns+1):
- self.create_upsert(table_name, number_of_columns + 1 -
- count, count, all_types, pk_types, index, ttl)
-
- for count in range(number_of_columns+1, 2*number_of_columns+1):
- self.create_upsert(table_name, count, count,
- all_types, pk_types, index, ttl)
-
- for count in range(1, number_of_columns + 1):
- create_all_type = []
- for type_name in all_types.keys():
- if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
- create_all_type.append(
- f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](number_of_columns - count + 1), type_name)}")
- sql_select = f"""
- SELECT COUNT(*) as count FROM `{table_name}` WHERE
- {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])}
- {" and " if len(index) != 0 else ""}
- {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns - count + 1), type_name)}" for type_name in index.keys()])}
- {" and " if len(create_all_type) != 0 else ""}
- {" and ".join(create_all_type)}
- {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](number_of_columns - count + 1), ttl)}" if ttl != "" else ""}
- """
- rows = self.query(sql_select)
- assert len(
- rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}"
-
- for count in range(number_of_columns + 1, 2*number_of_columns + 1):
- create_all_type = []
- for type_name in all_types.keys():
- if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument" and ((type_name != "Date" and type_name != "Datetime") or count < 106):
- create_all_type.append(
- f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}")
- create_pk = []
- for type_name in pk_types.keys():
- if (type_name != "Date" and type_name != "Datetime") or count < 106:
- create_pk.append(
- f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}")
- create_index = []
- for type_name in index.keys():
- if (type_name != "Date" and type_name != "Datetime") or count < 106:
- create_index.append(
- f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}")
- sql_select = f"""
- SELECT COUNT(*) as count FROM `{table_name}` WHERE
- {" and ".join(create_pk)}
- {" and " if len(create_index) != 0 else ""}
- {" and ".join(create_index)}
- {" and " if len(create_all_type) != 0 else ""}
- {" and ".join(create_all_type)}
- {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" and ((type_name != "Date" and type_name != "Datetime") or count < 106) else ""}
- """
- rows = self.query(sql_select)
- assert len(
- rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}"
- rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`")
- assert len(
- rows) == 1 and rows[0].count == 2*number_of_columns, f"Expected {2*number_of_columns} rows, after select all line"
-
- def create_upsert(self, table_name: str, value: int, search: int, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
- upsert_sql = f"""
- UPSERT INTO {table_name} (
- {", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""}
- {", ".join(["col_" + cleanup_type_name(type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""}
- {", ".join(["col_index_" + cleanup_type_name(type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""}
- {f" ttl_{ttl}" if ttl != "" else ""}
- )
- VALUES
- (
- {", ".join([format_sql_value(pk_types[type_name](search), type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""}
- {", ".join([format_sql_value(all_types[type_name](value), type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""}
- {", ".join([format_sql_value(index[type_name](value), type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""}
- {format_sql_value(ttl_types[ttl](value), ttl) if ttl != "" else ""}
- )
- ;
- """
- self.query(upsert_sql)
-
- def delete(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
- number_of_columns = len(pk_types) + len(all_types) + len(index) + 1
-
- if ttl != "":
- number_of_columns += 1
-
- if ttl != "":
- self.create_delete(number_of_columns,
- "ttl_", ttl, ttl_types[ttl], table_name)
- number_of_columns += 1
-
- for type_name in pk_types.keys():
- if type_name != "Bool":
- self.create_delete(
- number_of_columns, "pk_", type_name, pk_types[type_name], table_name)
- else:
- self.create_delete(
- number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name)
- number_of_columns += 1
-
- for type_name in all_types.keys():
- if type_name != "Bool" and type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
- self.create_delete(
- number_of_columns, "col_", type_name, all_types[type_name], table_name)
- else:
- self.create_delete(
- number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name)
- number_of_columns += 1
-
- for type_name in index.keys():
- if type_name != "Bool":
- self.create_delete(
- number_of_columns, "col_index_", type_name, index[type_name], table_name)
- else:
- self.create_delete(
- number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name)
- number_of_columns += 1
-
- number_of_columns = len(pk_types) + len(all_types) + len(index)
-
- if ttl != "":
- number_of_columns += 1
-
- for count in range(1, number_of_columns + 1):
- create_all_type = []
- for type_name in all_types.keys():
- if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
- create_all_type.append(
- f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](number_of_columns - count + 1), type_name)}")
- sql_select = f"""
- SELECT COUNT(*) as count FROM `{table_name}` WHERE
- {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])}
- {" and " if len(index) != 0 else ""}
- {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns - count + 1), type_name)}" for type_name in index.keys()])}
- {" and " if len(create_all_type) != 0 else ""}
- {" and ".join(create_all_type)}
- {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](number_of_columns - count + 1), ttl)}" if ttl != "" else ""}
- """
- rows = self.query(sql_select)
- assert len(
- rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}"
-
- for count in range(number_of_columns + 1, 2*number_of_columns + 1):
- create_all_type = []
- for type_name in all_types.keys():
- if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
- create_all_type.append(
- f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}")
- sql_select = f"""
- SELECT COUNT(*) as count FROM `{table_name}` WHERE
- {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])}
- {" and " if len(index) != 0 else ""}
- {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}" for type_name in index.keys()])}
- {" and " if len(create_all_type) != 0 else ""}
- {" and ".join(create_all_type)}
- {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" and ((type_name != "Date" and type_name != "Datetime") or count < 106) else ""}
- """
- rows = self.query(sql_select)
- assert len(
- rows) == 1 and rows[0].count == 0, f"Expected one rows, faild in {count} value, table {table_name}"
- rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`")
- assert len(
- rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows, after select all line"
-
- def create_delete(self, value: int, prefix: str, type_name: str, key: str, table_name: str):
- delete_sql = f"""
- DELETE FROM {table_name} WHERE {prefix}{cleanup_type_name(type_name)} = {format_sql_value(key(value), type_name)};
- """
- self.query(delete_sql)
-
- def select_all_type(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
- statements = []
- # delete if after https://github.com/ydb-platform/ydb/issues/16930
- for type in all_types.keys():
- if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64':
- statements.append(f"col_{cleanup_type_name(type)}")
- for type in pk_types.keys():
- if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64':
- statements.append(f"pk_{cleanup_type_name(type)}")
- for type in index.keys():
- if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64':
- statements.append(f"col_index_{cleanup_type_name(type)}")
- if ttl != "":
- statements.append(f"ttl_{cleanup_type_name(ttl)}")
-
- rows = self.query(f"select {", ".join(statements)} from {table_name}")
- count = 0
- for type in all_types.keys():
- if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64':
- for i in range(len(rows)):
- self.assert_type(all_types, type, i+1, rows[i][count])
- count += 1
- for type in pk_types.keys():
- if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64':
- for i in range(len(rows)):
- self.assert_type(pk_types, type, i+1, rows[i][count])
- count += 1
- for type in index.keys():
- if type != "Date32" and type != "Datetime64" and type != "Timestamp64" and type != 'Interval64':
- for i in range(len(rows)):
- self.assert_type(index, type, i+1, rows[i][count])
- count += 1
- if ttl != "":
- for i in range(len(rows)):
- self.assert_type(ttl_types, ttl, i+1, rows[i][count])
- count += 1
-
- def assert_type(self, key, type: str, values: int, values_from_rows):
- if type == "String" or type == "Yson":
- assert values_from_rows.decode(
- "utf-8") == key[type](values), f"{type}"
- elif type == "Float" or type == "DyNumber":
- assert math.isclose(float(values_from_rows), float(
- key[type](values)), rel_tol=1e-3), f"{type}"
- elif type == "Interval" or type == "Interval64":
- assert values_from_rows == timedelta(
- microseconds=key[type](values)), f"{type}"
- elif type == "Timestamp" or type == "Timestamp64":
- assert values_from_rows == datetime.fromtimestamp(
- key[type](values)/1_000_000), f"{type}"
- elif type == "Json" or type == "JsonDocument":
- assert str(values_from_rows).replace(
- "'", "\"") == str(key[type](values)), f"{type}"
- else:
- assert str(values_from_rows) == str(key[type](values)), f"{type}"
+ dml = DMLOperations(self)
+ dml.create_table(table_name, pk_types, all_types,
+ index, ttl, unique, sync)
+ dml.insert(table_name, all_types, pk_types, index, ttl)
+ dml.select_all_type(table_name, all_types, pk_types, index, ttl)
+ dml.select_after_insert(table_name, all_types, pk_types, index, ttl)
+ dml.update(table_name, all_types, pk_types, index, ttl, unique)
+ dml.upsert(table_name, all_types, pk_types, index, ttl)
+ dml.delete(table_name, all_types, pk_types, index, ttl)
diff --git a/ydb/tests/datashard/lib/dml_operations.py b/ydb/tests/datashard/lib/dml_operations.py
new file mode 100644
index 00000000000..9db3abfa88b
--- /dev/null
+++ b/ydb/tests/datashard/lib/dml_operations.py
@@ -0,0 +1,396 @@
+import math
+from datetime import datetime, timedelta
+
+from ydb.tests.sql.lib.test_query import Query
+from ydb.tests.datashard.lib.create_table import create_table_sql_request, create_ttl_sql_request
+from ydb.tests.datashard.lib.types_of_variables import cleanup_type_name, format_sql_value, ttl_types
+
+
+class DMLOperations():
+ def __init__(self, query_object: Query):
+ self.query_object = query_object
+
+ def query(self, text):
+ return self.query_object.query(text)
+
+ def create_table(self, table_name: str, pk_types: dict[str, str], all_types: dict[str, str], index: dict[str, str], ttl: str, unique: str, sync: str):
+ columns = {
+ "pk_": pk_types.keys(),
+ "col_": all_types.keys(),
+ "col_index_": index.keys(),
+ "ttl_": [ttl]
+ }
+ pk_columns = {
+ "pk_": pk_types.keys()
+ }
+ index_columns = {
+ "col_index_": index.keys()
+ }
+ sql_create_table = create_table_sql_request(
+ table_name, columns, pk_columns, index_columns, unique, sync)
+ self.query(sql_create_table)
+ if ttl != "":
+ sql_ttl = create_ttl_sql_request(f"ttl_{cleanup_type_name(ttl)}", {"P18262D": ""}, "SECONDS" if ttl ==
+ "Uint32" or ttl == "Uint64" or ttl == "DyNumber" else "", table_name)
+ self.query(sql_ttl)
+
+ def insert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
+ number_of_columns = len(pk_types) + len(all_types) + len(index)
+
+ if ttl != "":
+ number_of_columns += 1
+ for count in range(1, number_of_columns + 1):
+ self.create_insert(table_name, count, all_types,
+ pk_types, index, ttl)
+
+ def create_insert(self, table_name: str, value: int, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
+ insert_sql = f"""
+ INSERT INTO {table_name}(
+ {", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""}
+ {", ".join(["col_" + cleanup_type_name(type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""}
+ {", ".join(["col_index_" + cleanup_type_name(type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""}
+ {f"ttl_{ttl}" if ttl != "" else ""}
+ )
+ VALUES(
+ {", ".join([format_sql_value(pk_types[type_name](value), type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""}
+ {", ".join([format_sql_value(all_types[type_name](value), type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""}
+ {", ".join([format_sql_value(index[type_name](value), type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""}
+ {format_sql_value(ttl_types[ttl](value), ttl) if ttl != "" else ""}
+ );
+ """
+ self.query(insert_sql)
+
+ def select_after_insert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
+
+ number_of_columns = len(pk_types) + len(all_types) + len(index)
+
+ if ttl != "":
+ number_of_columns += 1
+
+ for count in range(1, number_of_columns + 1):
+ create_all_type = []
+ for type_name in all_types.keys():
+ if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
+ create_all_type.append(
+ f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}")
+ sql_select = f"""
+ SELECT COUNT(*) as count FROM `{table_name}` WHERE
+ {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])}
+ {" and " if len(index) != 0 else ""}
+ {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}" for type_name in index.keys()])}
+ {" and " if len(create_all_type) != 0 else ""}
+ {" and ".join(create_all_type)}
+ {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" else ""}
+ """
+ rows = self.query(sql_select)
+ assert len(
+ rows) == 1 and rows[0].count == 1, f"Expected one rows, failed in {count} value, table {table_name}"
+
+ rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`")
+ assert len(
+ rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows, after select all line"
+
+ def update(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str, unique: str):
+ count = 1
+
+ if ttl != "":
+ self.create_update(
+ count, "ttl_", ttl, ttl_types[ttl], table_name)
+ count += 1
+
+ for type_name in all_types.keys():
+ self.create_update(
+ count, "col_", type_name, all_types[type_name], table_name)
+ count += 1
+
+ if unique == "":
+ for type_name in index.keys():
+ self.create_update(
+ count, "col_index_", type_name, index[type_name], table_name)
+ count += 1
+ else:
+ number_of_columns = len(pk_types) + len(all_types) + len(index)+1
+ if ttl != "":
+ number_of_columns += 1
+ for i in range(1, number_of_columns + 1):
+ self.create_update_unique(
+ number_of_columns + i, i, index, table_name)
+
+ count_assert = 1
+
+ number_of_columns = len(pk_types) + len(all_types) + len(index)
+ if ttl != "":
+ number_of_columns += 1
+
+ if ttl != "":
+ rows = self.query(
+ f"SELECT COUNT(*) as count FROM `{table_name}` WHERE ttl_{cleanup_type_name(ttl)}={format_sql_value(ttl_types[ttl](count_assert), ttl)}")
+ assert len(
+ rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in ttl_{cleanup_type_name(ttl)}, table {table_name}"
+ count_assert += 1
+
+ for type_name in all_types.keys():
+ if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
+ rows = self.query(
+ f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count_assert), type_name)}")
+ assert len(
+ rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in col_{cleanup_type_name(type_name)}, table {table_name}"
+ count_assert += 1
+ if unique == "":
+ for type_name in index.keys():
+ rows = self.query(
+ f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count_assert), type_name)}")
+ assert len(
+ rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows after insert, faild in col_index_{cleanup_type_name(type_name)}, table {table_name}"
+ count_assert += 1
+ else:
+ number_of_columns = len(pk_types) + len(all_types) + len(index) + 2
+ if ttl != "":
+ number_of_columns += 1
+ for type_name in index.keys():
+ rows = self.query(
+ f"SELECT COUNT(*) as count FROM `{table_name}` WHERE col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns), type_name)}")
+ assert len(
+ rows) == 1 and rows[0].count == 1, f"Expected {1} rows after insert, faild in col_index_{cleanup_type_name(type_name)}, table {table_name}"
+ number_of_columns += 1
+
+ def create_update(self, value: int, prefix: str, type_name: str, key: str, table_name: str):
+ update_sql = f""" UPDATE `{table_name}` SET {prefix}{cleanup_type_name(type_name)} = {format_sql_value(key(value), type_name)} """
+ self.query(update_sql)
+
+ def create_update_unique(self, value: int, search: int, index: dict[str, str], table_name: str):
+ update_sql = f""" UPDATE `{table_name}` SET
+ {", ".join([f"col_index_{cleanup_type_name(type_name)} = {format_sql_value(index[type_name](value), type_name)}" for type_name in index.keys()])}
+ WHERE
+ {" and ".join(f"col_index_{cleanup_type_name(type_name)} = {format_sql_value(index[type_name](search), type_name)}" for type_name in index.keys())}
+ """
+ self.query(update_sql)
+
+ def upsert(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
+ number_of_columns = len(pk_types) + len(all_types) + len(index)
+ if ttl != "":
+ number_of_columns += 1
+
+ for count in range(1, number_of_columns+1):
+ self.create_upsert(table_name, number_of_columns + 1 -
+ count, count, all_types, pk_types, index, ttl)
+
+ for count in range(number_of_columns+1, 2*number_of_columns+1):
+ self.create_upsert(table_name, count, count,
+ all_types, pk_types, index, ttl)
+
+ for count in range(1, number_of_columns + 1):
+ create_all_type = []
+ for type_name in all_types.keys():
+ if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
+ create_all_type.append(
+ f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](number_of_columns - count + 1), type_name)}")
+ sql_select = f"""
+ SELECT COUNT(*) as count FROM `{table_name}` WHERE
+ {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])}
+ {" and " if len(index) != 0 else ""}
+ {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns - count + 1), type_name)}" for type_name in index.keys()])}
+ {" and " if len(create_all_type) != 0 else ""}
+ {" and ".join(create_all_type)}
+ {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](number_of_columns - count + 1), ttl)}" if ttl != "" else ""}
+ """
+ rows = self.query(sql_select)
+ assert len(
+ rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}"
+
+ for count in range(number_of_columns + 1, 2*number_of_columns + 1):
+ create_all_type = []
+ for type_name in all_types.keys():
+ if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument" and ((type_name != "Date" and type_name != "Datetime") or count < 106):
+ create_all_type.append(
+ f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}")
+ create_pk = []
+ for type_name in pk_types.keys():
+ if (type_name != "Date" and type_name != "Datetime") or count < 106:
+ create_pk.append(
+ f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}")
+ create_index = []
+ for type_name in index.keys():
+ if (type_name != "Date" and type_name != "Datetime") or count < 106:
+ create_index.append(
+ f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}")
+ sql_select = f"""
+ SELECT COUNT(*) as count FROM `{table_name}` WHERE
+ {" and ".join(create_pk)}
+ {" and " if len(create_index) != 0 else ""}
+ {" and ".join(create_index)}
+ {" and " if len(create_all_type) != 0 else ""}
+ {" and ".join(create_all_type)}
+ {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" and ((type_name != "Date" and type_name != "Datetime") or count < 106) else ""}
+ """
+ rows = self.query(sql_select)
+ assert len(
+ rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}"
+ rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`")
+ assert len(
+ rows) == 1 and rows[0].count == 2*number_of_columns, f"Expected {2*number_of_columns} rows, after select all line"
+
+ def create_upsert(self, table_name: str, value: int, search: int, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
+ upsert_sql = f"""
+ UPSERT INTO {table_name} (
+ {", ".join(["pk_" + cleanup_type_name(type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""}
+ {", ".join(["col_" + cleanup_type_name(type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""}
+ {", ".join(["col_index_" + cleanup_type_name(type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""}
+ {f" ttl_{ttl}" if ttl != "" else ""}
+ )
+ VALUES
+ (
+ {", ".join([format_sql_value(pk_types[type_name](search), type_name) for type_name in pk_types.keys()])}{", " if len(all_types) != 0 else ""}
+ {", ".join([format_sql_value(all_types[type_name](value), type_name) for type_name in all_types.keys()])}{", " if len(index) != 0 else ""}
+ {", ".join([format_sql_value(index[type_name](value), type_name) for type_name in index.keys()])}{", " if len(ttl) != 0 else ""}
+ {format_sql_value(ttl_types[ttl](value), ttl) if ttl != "" else ""}
+ )
+ ;
+ """
+ self.query(upsert_sql)
+
+ def delete(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
+ number_of_columns = len(pk_types) + len(all_types) + len(index) + 1
+
+ if ttl != "":
+ number_of_columns += 1
+
+ if ttl != "":
+ self.create_delete(number_of_columns,
+ "ttl_", ttl, ttl_types[ttl], table_name)
+ number_of_columns += 1
+
+ for type_name in pk_types.keys():
+ if type_name != "Bool":
+ self.create_delete(
+ number_of_columns, "pk_", type_name, pk_types[type_name], table_name)
+ else:
+ self.create_delete(
+ number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name)
+ number_of_columns += 1
+
+ for type_name in all_types.keys():
+ if type_name != "Bool" and type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
+ self.create_delete(
+ number_of_columns, "col_", type_name, all_types[type_name], table_name)
+ else:
+ self.create_delete(
+ number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name)
+ number_of_columns += 1
+
+ for type_name in index.keys():
+ if type_name != "Bool":
+ self.create_delete(
+ number_of_columns, "col_index_", type_name, index[type_name], table_name)
+ else:
+ self.create_delete(
+ number_of_columns, "pk_", "Int64", pk_types["Int64"], table_name)
+ number_of_columns += 1
+
+ number_of_columns = len(pk_types) + len(all_types) + len(index)
+
+ if ttl != "":
+ number_of_columns += 1
+
+ for count in range(1, number_of_columns + 1):
+ create_all_type = []
+ for type_name in all_types.keys():
+ if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
+ create_all_type.append(
+ f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](number_of_columns - count + 1), type_name)}")
+ sql_select = f"""
+ SELECT COUNT(*) as count FROM `{table_name}` WHERE
+ {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])}
+ {" and " if len(index) != 0 else ""}
+ {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](number_of_columns - count + 1), type_name)}" for type_name in index.keys()])}
+ {" and " if len(create_all_type) != 0 else ""}
+ {" and ".join(create_all_type)}
+ {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](number_of_columns - count + 1), ttl)}" if ttl != "" else ""}
+ """
+ rows = self.query(sql_select)
+ assert len(
+ rows) == 1 and rows[0].count == 1, f"Expected one rows, faild in {count} value, table {table_name}"
+
+ for count in range(number_of_columns + 1, 2*number_of_columns + 1):
+ create_all_type = []
+ for type_name in all_types.keys():
+ if type_name != "Json" and type_name != "Yson" and type_name != "JsonDocument":
+ create_all_type.append(
+ f"col_{cleanup_type_name(type_name)}={format_sql_value(all_types[type_name](count), type_name)}")
+ sql_select = f"""
+ SELECT COUNT(*) as count FROM `{table_name}` WHERE
+ {" and ".join([f"pk_{cleanup_type_name(type_name)}={format_sql_value(pk_types[type_name](count), type_name)}" for type_name in pk_types.keys()])}
+ {" and " if len(index) != 0 else ""}
+ {" and ".join([f"col_index_{cleanup_type_name(type_name)}={format_sql_value(index[type_name](count), type_name)}" for type_name in index.keys()])}
+ {" and " if len(create_all_type) != 0 else ""}
+ {" and ".join(create_all_type)}
+ {f" and ttl_{ttl}={format_sql_value(ttl_types[ttl](count), ttl)}" if ttl != "" and ((type_name != "Date" and type_name != "Datetime") or count < 106) else ""}
+ """
+ rows = self.query(sql_select)
+ assert len(
+ rows) == 1 and rows[0].count == 0, f"Expected one rows, faild in {count} value, table {table_name}"
+ rows = self.query(f"SELECT COUNT(*) as count FROM `{table_name}`")
+ assert len(
+ rows) == 1 and rows[0].count == number_of_columns, f"Expected {number_of_columns} rows, after select all line"
+
+ def create_delete(self, value: int, prefix: str, type_name: str, key: str, table_name: str):
+ delete_sql = f"""
+ DELETE FROM {table_name} WHERE {prefix}{cleanup_type_name(type_name)} = {format_sql_value(key(value), type_name)};
+ """
+ self.query(delete_sql)
+
+ def select_all_type(self, table_name: str, all_types: dict[str, str], pk_types: dict[str, str], index: dict[str, str], ttl: str):
+ statements = []
+ # delete if after https://github.com/ydb-platform/ydb/issues/16930
+ for data_type in all_types.keys():
+ if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64':
+ statements.append(f"col_{cleanup_type_name(data_type)}")
+ for data_type in pk_types.keys():
+ if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64':
+ statements.append(f"pk_{cleanup_type_name(data_type)}")
+ for data_type in index.keys():
+ if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64':
+ statements.append(f"col_index_{cleanup_type_name(data_type)}")
+ if ttl != "":
+ statements.append(f"ttl_{cleanup_type_name(ttl)}")
+
+ rows = self.query(f"select {", ".join(statements)} from {table_name}")
+ count = 0
+ for data_type in all_types.keys():
+ if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64':
+ for i in range(len(rows)):
+ self.assert_type(all_types, data_type, i+1, rows[i][count])
+ count += 1
+ for data_type in pk_types.keys():
+ if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64':
+ for i in range(len(rows)):
+ self.assert_type(pk_types, data_type, i+1, rows[i][count])
+ count += 1
+ for data_type in index.keys():
+ if data_type != "Date32" and data_type != "Datetime64" and data_type != "Timestamp64" and data_type != 'Interval64':
+ for i in range(len(rows)):
+ self.assert_type(index, data_type, i+1, rows[i][count])
+ count += 1
+ if ttl != "":
+ for i in range(len(rows)):
+ self.assert_type(ttl_types, ttl, i+1, rows[i][count])
+ count += 1
+
+ def assert_type(self, key, data_type: str, values: int, values_from_rows):
+ if data_type == "String" or data_type == "Yson":
+ assert values_from_rows.decode(
+ "utf-8") == key[data_type](values), f"{data_type}"
+ elif data_type == "Float" or data_type == "DyNumber":
+ assert math.isclose(float(values_from_rows), float(
+ key[data_type](values)), rel_tol=1e-3), f"{data_type}"
+ elif data_type == "Interval" or data_type == "Interval64":
+ assert values_from_rows == timedelta(
+ microseconds=key[data_type](values)), f"{data_type}"
+ elif data_type == "Timestamp" or data_type == "Timestamp64":
+ assert values_from_rows == datetime.fromtimestamp(
+ key[data_type](values)/1_000_000), f"{data_type}"
+ elif data_type == "Json" or data_type == "JsonDocument":
+ assert str(values_from_rows).replace(
+ "'", "\"") == str(key[data_type](values)), f"{data_type}"
+ else:
+ assert str(values_from_rows) == str(key[data_type](values)), f"{data_type}"
diff --git a/ydb/tests/datashard/lib/multicluster_test_base.py b/ydb/tests/datashard/lib/multicluster_test_base.py
new file mode 100644
index 00000000000..974271631c3
--- /dev/null
+++ b/ydb/tests/datashard/lib/multicluster_test_base.py
@@ -0,0 +1,70 @@
+import os
+import yatest.common
+import logging
+import hashlib
+
+
+from ydb.tests.library.harness.kikimr_runner import KiKiMR
+from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
+from ydb.tests.library.common.types import Erasure
+from ydb.tests.library.harness.util import LogLevels
+
+logger = logging.getLogger(__name__)
+
+
+class MulticlusterTestBase():
+ @classmethod
+ def setup_class(cls):
+ ydb_path = yatest.common.build_path(os.environ.get(
+ "YDB_DRIVER_BINARY", "ydb/apps/ydbd/ydbd"))
+ logger.error(yatest.common.execute(
+ [ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
+
+ cls.ydb_cli_path = yatest.common.build_path("ydb/apps/ydb/ydb")
+
+ cls.database = "/Root"
+ cls.clusters = [cls.build_cluster(), cls.build_cluster()]
+
+ @classmethod
+ def build_cluster(cls):
+ cluster = KiKiMR(KikimrConfigGenerator(erasure=cls.get_cluster_configuration(),
+ extra_feature_flags=["enable_resource_pools",
+ "enable_external_data_sources",
+ "enable_tiering_in_column_shard"],
+ column_shard_config={
+ 'disabled_on_scheme_shard': False,
+ 'lag_for_compaction_before_tierings_ms': 0,
+ 'compaction_actualization_lag_ms': 0,
+ 'optimizer_freshness_check_duration_ms': 0,
+ 'small_portion_detect_size_limit': 0,
+ },
+ additional_log_configs={
+ 'TX_TIERING': LogLevels.DEBUG}))
+ cluster.start()
+ return cluster
+
+ @staticmethod
+ def get_cluster_configuration():
+ return Erasure.NONE
+
+ @classmethod
+ def get_database(cls):
+ return cls.database
+
+ @staticmethod
+ def get_endpoint(cluster):
+ return "%s:%s" % (
+ cluster.nodes[1].host, cluster.nodes[1].port
+ )
+
+ @classmethod
+ def teardown_class(cls):
+ for cluster in cls.clusters:
+ cluster.stop()
+
+ def setup_method(self):
+ current_test_full_name = os.environ.get("PYTEST_CURRENT_TEST")
+ self.table_path = "insert_table_" + \
+ current_test_full_name.replace("::", ".").removesuffix(" (setup)")
+ self.hash = hashlib.md5(self.table_path.encode()).hexdigest()
+ self.hash_short = self.hash[:8]
diff --git a/ydb/tests/datashard/lib/ya.make b/ydb/tests/datashard/lib/ya.make
index ec3405575e0..ddeba33cd5f 100644
--- a/ydb/tests/datashard/lib/ya.make
+++ b/ydb/tests/datashard/lib/ya.make
@@ -1,8 +1,17 @@
PY3_LIBRARY()
+ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
+ENV(MOTO_SERVER_PATH="contrib/python/moto/bin/moto_server")
PY_SRCS(
+ dml_operations.py
create_table.py
types_of_variables.py
+ multicluster_test_base.py
+)
+
+PEERDIR(
+ ydb/tests/library
+ ydb/tests/sql/lib
)
END()
diff --git a/ydb/tests/datashard/ya.make b/ydb/tests/datashard/ya.make
index 6a74e3cb5d3..9adfddd954c 100644
--- a/ydb/tests/datashard/ya.make
+++ b/ydb/tests/datashard/ya.make
@@ -1,4 +1,5 @@
RECURSE(
- lib
+ async_replication
dml
+ lib
)