diff options
author | Semyon <yentsovsemyon@ydb.tech> | 2025-02-12 14:26:30 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-12 14:26:30 +0300 |
commit | 39ecbe0625acd9cf9e79deeddab7762474d29dcc (patch) | |
tree | 9962cbebb9deda2178c64a774ecf90c0d16dc1ed | |
parent | 82981ccb32e9522bc09564bde0613e9aa4e12bea (diff) | |
download | ydb-39ecbe0625acd9cf9e79deeddab7762474d29dcc.tar.gz |
functional tiering tests: data correctness, unstable connection (#13629)
-rw-r--r-- | ydb/tests/olap/ttl_tiering/base.py | 17 | ||||
-rw-r--r-- | ydb/tests/olap/ttl_tiering/data_correctness.py | 191 | ||||
-rw-r--r-- | ydb/tests/olap/ttl_tiering/ttl_delete_s3.py | 6 | ||||
-rw-r--r-- | ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py | 26 | ||||
-rw-r--r-- | ydb/tests/olap/ttl_tiering/unstable_connection.py | 154 | ||||
-rw-r--r-- | ydb/tests/olap/ttl_tiering/ya.make | 2 |
6 files changed, 386 insertions, 10 deletions
diff --git a/ydb/tests/olap/ttl_tiering/base.py b/ydb/tests/olap/ttl_tiering/base.py index 16e12719d0..3882ef632b 100644 --- a/ydb/tests/olap/ttl_tiering/base.py +++ b/ydb/tests/olap/ttl_tiering/base.py @@ -67,6 +67,13 @@ class YdbClient: def query(self, statement): return self.session_pool.execute_with_retries(statement) + def bulk_upsert(self, table_path, column_types: ydb.BulkUpsertColumns, data_slice): + self.driver.table_client.bulk_upsert( + table_path, + data_slice, + column_types + ) + class ColumnTableHelper: def __init__(self, ydb_client: YdbClient, path: str): @@ -92,6 +99,15 @@ class ColumnTableHelper: results = self.ydb_client.query(stmt) return {row["TierName"]: {"Portions": row["Portions"], "BlobSize": row["BlobSize"], "BlobCount": row["BlobCount"]} for result_set in results for row in result_set.rows} + def set_fast_compaction(self): + self.ydb_client.query( + f""" + ALTER OBJECT `{self.path}` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=` + {{"levels" : [{{"class_name" : "Zero", "portions_live_duration" : "5s", "expected_blobs_size" : 1000000000000, "portions_count_available" : 2}}, + {{"class_name" : "Zero"}}]}}`); + """ + ) + class TllTieringTestBase(object): @classmethod @@ -120,6 +136,7 @@ class TllTieringTestBase(object): "optimizer_freshness_check_duration_ms": 0, "small_portion_detect_size_limit": 0, "max_read_staleness_ms": 5000, + "alter_object_enabled": True, }, additional_log_configs={ "TX_COLUMNSHARD_TIERING": LogLevels.DEBUG, diff --git a/ydb/tests/olap/ttl_tiering/data_correctness.py b/ydb/tests/olap/ttl_tiering/data_correctness.py new file mode 100644 index 0000000000..efc9b46638 --- /dev/null +++ b/ydb/tests/olap/ttl_tiering/data_correctness.py @@ -0,0 +1,191 @@ +import time +import logging +from .base import TllTieringTestBase, ColumnTableHelper +import ydb +import concurrent +import random +import datetime + +logger = logging.getLogger(__name__) + + +class TestDataCorrectness(TllTieringTestBase): + test_name = "data_correctness" + cold_bucket = "cold" + n_shards = 4 + + @classmethod + def setup_class(cls): + super(TestDataCorrectness, cls).setup_class() + cls.s3_client.create_bucket(cls.cold_bucket) + + def write_data( + self, + table: str, + timestamp_from_ms: int, + rows: int, + value: int = 1, + ): + chunk_size = 100 + while rows: + current_chunk_size = min(chunk_size, rows) + data = [ + { + "ts": timestamp_from_ms + i, + "s": random.randbytes(1024 * 10), + "val": value, + } + for i in range(current_chunk_size) + ] + self.ydb_client.bulk_upsert( + table, + self.column_types, + data, + ) + timestamp_from_ms += current_chunk_size + rows -= current_chunk_size + assert rows >= 0 + + def total_values(self, table: str) -> int: + return ( + self.ydb_client.query(f"select sum(val) as Values from `{table}`")[0].rows[ + 0 + ]["Values"] + or 0 + ) + + def wait_eviction(self, table: ColumnTableHelper): + deadline = datetime.datetime.now() + datetime.timedelta(seconds=60) + while ( + table.get_portion_stat_by_tier().get("__DEFAULT", {}).get("Portions", 0) + > self.n_shards + ): + assert ( + datetime.datetime.now() < deadline + ), "Timeout for wait eviction is exceeded" + + logger.info( + f"Waiting for data eviction: {table.get_portion_stat_by_tier()}" + ) + time.sleep(1) + + stats = table.get_portion_stat_by_tier() + assert len(stats) > 1 or '__DEFAULT' not in stats + + def test(self): + """Implements https://github.com/ydb-platform/ydb/issues/13465""" + test_dir = f"{self.ydb_client.database}/{self.test_name}" + table_path = f"{test_dir}/table" + secret_prefix = self.test_name + access_key_id_secret_name = f"{secret_prefix}_key_id" + access_key_secret_secret_name = f"{secret_prefix}_key_secret" + eds_path = f"{test_dir}/{self.cold_bucket}" + + # Expect empty buckets to avoid unintentional data deletion/modification + if self.s3_client.get_bucket_stat(self.cold_bucket) != (0, 0): + raise Exception("Bucket for cold data is not empty") + + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + ts Timestamp NOT NULL, + s String, + val Uint64, + PRIMARY KEY(ts), + ) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = {self.n_shards} + ) + """ + ) + + table = ColumnTableHelper(self.ydb_client, table_path) + table.set_fast_compaction() + + self.column_types = ydb.BulkUpsertColumns() + self.column_types.add_column("ts", ydb.PrimitiveType.Timestamp) + self.column_types.add_column("s", ydb.PrimitiveType.String) + self.column_types.add_column("val", ydb.PrimitiveType.Uint64) + + logger.info(f"Table {table_path} created") + + self.ydb_client.query( + f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'" + ) + self.ydb_client.query( + f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'" + ) + + self.ydb_client.query( + f""" + CREATE EXTERNAL DATA SOURCE `{eds_path}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{self.s3_client.endpoint}/{self.cold_bucket}", + AUTH_METHOD="AWS", + AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}", + AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}", + AWS_REGION="{self.s3_client.region}" + ) + """ + ) + + stmt = f""" + ALTER TABLE `{table_path}` SET (TTL = + Interval("PT1S") TO EXTERNAL DATA SOURCE `{eds_path}` + ON ts + ) + """ + logger.info(stmt) + self.ydb_client.query(stmt) + + ts_start = int(datetime.datetime.now().timestamp() * 1000000) + rows = 10000 + num_threads = 10 + assert rows % num_threads == 0 + chunk_size = rows // num_threads + + # Write data + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + insert_futures = [ + executor.submit( + self.write_data, + table_path, + ts_start + i * chunk_size, + chunk_size, + 1, + ) + for i in range(num_threads) + ] + + concurrent.futures.wait(insert_futures) + for future in insert_futures: + future.result() + + self.wait_eviction(table) + assert self.total_values(table_path) == rows + + # Update data + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + insert_futures = [ + executor.submit( + self.write_data, + table_path, + ts_start + i * chunk_size, + chunk_size, + 2, + ) + for i in range(num_threads) + ] + + concurrent.futures.wait(insert_futures) + for future in insert_futures: + future.result() + + self.wait_eviction(table) + assert self.total_values(table_path) == rows * 2 + + # Delete data + self.ydb_client.query(f"delete from `{table_path}`") + + assert not self.total_values(table_path) diff --git a/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py b/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py index 9ea6848ced..9990bf8693 100644 --- a/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py +++ b/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py @@ -66,7 +66,8 @@ class TestDeleteS3Ttl(TllTieringTestBase): if self.s3_client.get_bucket_stat(medium_bucket) != (0, 0): raise Exception("Bucket for medium data is not empty") - self.ydb_client.query(f""" + self.ydb_client.query( + f""" CREATE TABLE `{table_path}` ( ts Timestamp NOT NULL, s String, @@ -229,7 +230,8 @@ class TestDeleteS3Ttl(TllTieringTestBase): if self.s3_client.get_bucket_stat(self.frozen_bucket) != (0, 0): raise Exception("Bucket for frozen data is not empty") - self.ydb_client.query(f""" + self.ydb_client.query( + f""" CREATE TABLE `{table_path}` ( ts Timestamp NOT NULL, s String, diff --git a/ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py b/ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py index db813d282a..c8758f7132 100644 --- a/ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py +++ b/ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py @@ -2,8 +2,11 @@ import os import signal import sys import time +import logging -from .base import TllTieringTestBase +from .base import TllTieringTestBase, ColumnTableHelper + +logger = logging.getLogger(__name__) ROWS_CHUNK_SIZE = 1000000 ROWS_CHUNKS_COUNT = 10 @@ -21,9 +24,15 @@ class TestUnavailableS3(TllTieringTestBase): v String, PRIMARY KEY(ts), ) - WITH (STORE = COLUMN) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2 + ) """) + table = ColumnTableHelper(self.ydb_client, f"{self.ydb_client.database}/table") + table.set_fast_compaction() + self.s3_client.create_bucket(bucket_s3_name) self.ydb_client.query(f"CREATE OBJECT s3_id (TYPE SECRET) WITH value = '{self.s3_client.key_id}'") @@ -58,6 +67,9 @@ class TestUnavailableS3(TllTieringTestBase): SELECT * FROM AS_TABLE($rows_list); """) + def get_stat(): + return self.s3_client.get_bucket_stat(bucket_s3_name)[0] + for i in range(0, ROWS_CHUNKS_COUNT // 2): upsert_chunk(i) @@ -68,6 +80,8 @@ class TestUnavailableS3(TllTieringTestBase): ) """) + assert self.wait_for(get_stat, 30), "initial eviction" + print("!!! simulating S3 hang up -- sending SIGSTOP", file=sys.stderr) os.kill(self.s3_pid, signal.SIGSTOP) @@ -76,13 +90,9 @@ class TestUnavailableS3(TllTieringTestBase): print("!!! simulating S3 recovery -- sending SIGCONT", file=sys.stderr) os.kill(self.s3_pid, signal.SIGCONT) - def get_stat(): - return self.s3_client.get_bucket_stat(bucket_s3_name)[0] - - # stat_old = get_stat() + stat_old = get_stat() for i in range(ROWS_CHUNKS_COUNT // 2, ROWS_CHUNKS_COUNT): upsert_chunk(i) - # Uncomment after fixing https://github.com/ydb-platform/ydb/issues/13719 - # assert self.wait_for(lambda: get_stat() != stat_old, 120), "data distribution continuation" + assert self.wait_for(lambda: get_stat() != stat_old, 120), "data distribution continuation" diff --git a/ydb/tests/olap/ttl_tiering/unstable_connection.py b/ydb/tests/olap/ttl_tiering/unstable_connection.py new file mode 100644 index 0000000000..0c85a80035 --- /dev/null +++ b/ydb/tests/olap/ttl_tiering/unstable_connection.py @@ -0,0 +1,154 @@ +import time +import logging +from .base import TllTieringTestBase, ColumnTableHelper +import ydb +import concurrent +import random +import datetime +import threading +import subprocess + +logger = logging.getLogger(__name__) + + +class TestUnstableConnection(TllTieringTestBase): + test_name = "unstable_connection" + cold_bucket = "cold" + num_writers = 10 + num_readers = 4 + + @classmethod + def setup_class(cls): + super(TestUnstableConnection, cls).setup_class() + cls.s3_client.create_bucket(cls.cold_bucket) + + def write_data( + self, + table: str, + timestamp_from_ms: int, + rows: int, + ): + chunk_size = 100 + while rows: + current_chunk_size = min(chunk_size, rows) + data = [ + { + 'ts': timestamp_from_ms + i, + 's': random.randbytes(1024), + } for i in range(current_chunk_size) + ] + self.ydb_client.bulk_upsert( + table, + self.column_types, + data, + ) + timestamp_from_ms += current_chunk_size + rows -= current_chunk_size + assert rows >= 0 + + def writer(self, table: str, stop_event): + logger.info("Writer started") + while not stop_event.is_set(): + self.write_data(table, int(datetime.datetime.now().timestamp() * 1000000), 1000) + logger.info("Writer stopped") + + def reader(self, table: str, stop_event): + logger.info("Reader started") + while not stop_event.is_set(): + self.ydb_client.query(f"SELECT ts FROM `{table}` WHERE StartsWith(s, \"a\")") + logger.info("Reader stopped") + + def adversary(self, pid: int, stop_event): + while not stop_event.is_set(): + logger.info(f"Stopping s3 process, pid={pid}") + subprocess.run(["kill", "-STOP", str(pid)]) + time.sleep(1) + logger.info(f"Resuming s3 process, pid={pid}") + subprocess.run(["kill", "-CONT", str(pid)]) + time.sleep(2) + subprocess.run(["kill", "-CONT", str(pid)]) + logger.info("Adversary stopped") + + def stopwatch(self, seconds: int, stop_event): + time.sleep(seconds) + stop_event.set() + + def test(self): + ''' Implements https://github.com/ydb-platform/ydb/issues/13544''' + test_dir = f"{self.ydb_client.database}/{self.test_name}" + table_path = f"{test_dir}/table" + secret_prefix = self.test_name + access_key_id_secret_name = f"{secret_prefix}_key_id" + access_key_secret_secret_name = f"{secret_prefix}_key_secret" + eds_path = f"{test_dir}/{self.cold_bucket}" + + # Expect empty buckets to avoid unintentional data deletion/modification + if self.s3_client.get_bucket_stat(self.cold_bucket) != (0, 0): + raise Exception("Bucket for cold data is not empty") + + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + ts Timestamp NOT NULL, + s String, + val Uint64, + PRIMARY KEY(ts), + ) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 4 + ) + """ + ) + + table = ColumnTableHelper(self.ydb_client, table_path) + table.set_fast_compaction() + + self.column_types = ydb.BulkUpsertColumns() + self.column_types.add_column("ts", ydb.PrimitiveType.Timestamp) + self.column_types.add_column("s", ydb.PrimitiveType.String) + + logger.info(f"Table {table_path} created") + + self.ydb_client.query(f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'") + self.ydb_client.query(f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'") + + self.ydb_client.query(f""" + CREATE EXTERNAL DATA SOURCE `{eds_path}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{self.s3_client.endpoint}/{self.cold_bucket}", + AUTH_METHOD="AWS", + AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}", + AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}", + AWS_REGION="{self.s3_client.region}" + ) + """) + + stmt = f""" + ALTER TABLE `{table_path}` SET (TTL = + Interval("PT1S") TO EXTERNAL DATA SOURCE `{eds_path}`, + Interval("PT1M") DELETE + ON ts + ) + """ + logger.info(stmt) + self.ydb_client.query(stmt) + + with concurrent.futures.ThreadPoolExecutor(max_workers=self.num_writers+self.num_readers+2) as executor: + stop_event = threading.Event() + workers = [] + + for _ in range(self.num_writers): + workers.append(executor.submit(self.writer, table_path, stop_event)) + + for _ in range(self.num_readers): + workers.append(executor.submit(self.reader, table_path, stop_event)) + + workers.append(executor.submit(self.adversary, self.s3_pid, stop_event)) + workers.append(executor.submit(self.stopwatch, 120, stop_event)) + + time.sleep(60) + logger.info("Checking eviction") + assert table.get_portion_stat_by_tier().get(eds_path, {}).get("Rows", 0), f"Nothing is evicted after 1 minute: {table.get_portion_stat_by_tier()}" + + concurrent.futures.wait(workers) diff --git a/ydb/tests/olap/ttl_tiering/ya.make b/ydb/tests/olap/ttl_tiering/ya.make index 2caf761764..84d1a5b676 100644 --- a/ydb/tests/olap/ttl_tiering/ya.make +++ b/ydb/tests/olap/ttl_tiering/ya.make @@ -5,9 +5,11 @@ ENV(YDB_ADDITIONAL_LOG_CONFIGS="TX_TIERING:DEBUG") TEST_SRCS( base.py + data_correctness.py ttl_delete_s3.py ttl_unavailable_s3.py data_migration_when_alter_ttl.py + unstable_connection.py ) SIZE(MEDIUM) |