diff options
author | Alexander Avdonkin <aavdonkin@yandex.ru> | 2025-04-16 13:34:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-16 13:34:46 +0300 |
commit | d9afcf2dd4af2df2dc7c779d996d197562ff9a33 (patch) | |
tree | f546f7fa2c2a09e14388e5da79cb1fe73be738f5 | |
parent | cf08f1c9fbbff9a6a8617ca76aeeb2c80ef1bfab (diff) | |
download | ydb-d9afcf2dd4af2df2dc7c779d996d197562ff9a33.tar.gz |
Test checking that data are deleted from tiers after delete sql query (#13591)
-rw-r--r-- | ydb/tests/olap/ttl_tiering/base.py | 77 | ||||
-rw-r--r-- | ydb/tests/olap/ttl_tiering/tier_delete.py | 205 | ||||
-rw-r--r-- | ydb/tests/olap/ttl_tiering/ya.make | 2 |
3 files changed, 253 insertions, 31 deletions
diff --git a/ydb/tests/olap/ttl_tiering/base.py b/ydb/tests/olap/ttl_tiering/base.py index 63de525ce28..663b71967a9 100644 --- a/ydb/tests/olap/ttl_tiering/base.py +++ b/ydb/tests/olap/ttl_tiering/base.py @@ -8,6 +8,7 @@ from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.harness.util import LogLevels from ydb.tests.olap.common.s3_client import S3Mock, S3Client from ydb.tests.olap.common.ydb_client import YdbClient +from ydb.tests.olap.lib.utils import get_external_param logger = logging.getLogger(__name__) @@ -15,6 +16,13 @@ logger = logging.getLogger(__name__) class TllTieringTestBase(object): @classmethod def setup_class(cls): + cls.endpoint = get_external_param("endpoint", None) + cls.database = get_external_param("database", default="/Root/test") + cls.s3_endpoint = get_external_param("s3-endpoint", None) + cls.s3_region = get_external_param("s3-region", "us-east-1") + cls.s3_key_id = get_external_param("s3-key-id", "fake_access_key_id") + cls.s3_key_secret = get_external_param("s3-key-secret", "fake_secret_access_key") + cls._setup_ydb() cls._setup_s3() @@ -26,43 +34,50 @@ class TllTieringTestBase(object): @classmethod def _setup_ydb(cls): - ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY")) - logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) - config = KikimrConfigGenerator( - extra_feature_flags={ - "enable_external_data_sources": True, - "enable_tiering_in_column_shard": True - }, - column_shard_config={ - "lag_for_compaction_before_tierings_ms": 0, - "compaction_actualization_lag_ms": 0, - "optimizer_freshness_check_duration_ms": 0, - "small_portion_detect_size_limit": 0, - "max_read_staleness_ms": 60000, - "alter_object_enabled": True, - }, - additional_log_configs={ - "TX_COLUMNSHARD_TIERING": LogLevels.DEBUG, - "TX_COLUMNSHARD_ACTUALIZATION": LogLevels.TRACE, - "TX_COLUMNSHARD_BLOBS_TIER": LogLevels.DEBUG, - }, - query_service_config=dict( - available_external_data_sources=["ObjectStorage"] + if cls.endpoint is None: + ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY")) + logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) + config = KikimrConfigGenerator( + extra_feature_flags={ + "enable_external_data_sources": True, + "enable_tiering_in_column_shard": True + }, + column_shard_config={ + "lag_for_compaction_before_tierings_ms": 0, + "compaction_actualization_lag_ms": 0, + "optimizer_freshness_check_duration_ms": 0, + "small_portion_detect_size_limit": 0, + "max_read_staleness_ms": 60000, + "alter_object_enabled": True, + }, + additional_log_configs={ + "TX_COLUMNSHARD_TIERING": LogLevels.DEBUG, + "TX_COLUMNSHARD_ACTUALIZATION": LogLevels.TRACE, + "TX_COLUMNSHARD_BLOBS_TIER": LogLevels.DEBUG, + }, + query_service_config=dict( + available_external_data_sources=["ObjectStorage"] + ) ) - ) - cls.cluster = KiKiMR(config) - cls.cluster.start() - node = cls.cluster.nodes[1] - cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}") + cls.cluster = KiKiMR(config) + cls.cluster.start() + node = cls.cluster.nodes[1] + cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}") + else: + cls.ydb_client = YdbClient(database=cls.database, endpoint=cls.endpoint) + cls.ydb_client.wait_connection() @classmethod def _setup_s3(cls): - cls.s3_mock = S3Mock(yatest.common.binary_path(os.environ["MOTO_SERVER_PATH"])) - cls.s3_mock.start() + if cls.s3_endpoint is None: + cls.s3_mock = S3Mock(yatest.common.binary_path(os.environ["MOTO_SERVER_PATH"])) + cls.s3_mock.start() - cls.s3_pid = cls.s3_mock.s3_pid - cls.s3_client = S3Client(endpoint=cls.s3_mock.endpoint) + cls.s3_pid = cls.s3_mock.s3_pid + cls.s3_client = S3Client(endpoint=cls.s3_mock.endpoint) + else: + cls.s3_client = S3Client(cls.s3_endpoint, cls.s3_region, cls.s3_key_id, cls.s3_key_secret) @staticmethod def wait_for(condition_func, timeout_seconds): diff --git a/ydb/tests/olap/ttl_tiering/tier_delete.py b/ydb/tests/olap/ttl_tiering/tier_delete.py new file mode 100644 index 00000000000..2062ca00d62 --- /dev/null +++ b/ydb/tests/olap/ttl_tiering/tier_delete.py @@ -0,0 +1,205 @@ +import sys +import time +import logging + +from .base import TllTieringTestBase +from ydb.tests.olap.lib.utils import get_external_param + + +logger = logging.getLogger(__name__) + + +class TestTierDelete(TllTieringTestBase): + test_name = "test_delete_s3_ttl" + + @classmethod + def setup_class(cls): + super(TestTierDelete, cls).setup_class() + pass + + def test_delete_s3_ttl(self): + path_prefix = get_external_param("prefix", "olap_tests") + row_count = int(get_external_param("row-count", 10 ** 5)) + rows_in_upsert = int(get_external_param("rows-in-upsert", 10 ** 4)) + cold_bucket = get_external_param("bucket-cold", "cold") + frozen_bucket = get_external_param("bucket-frozen", "frozen") + + """ + session = boto3.session.Session() + s3client = session.client( + service_name='s3', + aws_access_key_id = s3_key_id, + aws_secret_access_key = s3_key_secret, + region_name = s3_region, + endpoint_url=s3_endpoint) + """ + self.s3_client.create_bucket("cold") + self.s3_client.create_bucket("frozen") + + ''' Implements https://github.com/ydb-platform/ydb/issues/13467 ''' + test_name = "delete_tiering" + test_dir = f"{self.ydb_client.database}/{path_prefix}/{test_name}" + table_path = f"{test_dir}/table" + secret_prefix = f"{path_prefix}_{test_name}" + access_key_id_secret_name = f"{secret_prefix}_key_id" + access_key_secret_secret_name = f"{secret_prefix}_key_secret" + cold_eds_path = f"{test_dir}/cold" + frozen_eds_path = f"{test_dir}/frozen" + days_to_cool = 1000 + days_to_froze = 3000 + + # Expect empty buckets to avoid unintentional data deletion/modification + if self.s3_client.get_bucket_stat(cold_bucket) != (0, 0): + raise Exception("Bucket for cold data is not empty") + if self.s3_client.get_bucket_stat(frozen_bucket) != (0, 0): + raise Exception("Bucket for frozen data is not empty") + + delete_resource_statements = [ + f"DROP TABLE `{table_path}`", + f"DROP EXTERNAL DATA SOURCE `{cold_eds_path}`", + f"DROP EXTERNAL DATA SOURCE `{frozen_eds_path}`", + f"DROP OBJECT {access_key_id_secret_name} (TYPE SECRET)", + f"DROP OBJECT {access_key_secret_secret_name} (TYPE SECRET)" + ] + + for s in delete_resource_statements: + try: + print(s) + self.ydb_client.query(s) + print("OK") + except Exception: + print("FAIL") + + self.ydb_client.query(f""" + CREATE TABLE `{table_path}` ( + ts Timestamp NOT NULL, + s String, + val Uint64, + PRIMARY KEY(ts), + ) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT=2) + """) + + self.ydb_client.query(f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'") + self.ydb_client.query(f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'") + + self.ydb_client.query(f""" + CREATE EXTERNAL DATA SOURCE `{cold_eds_path}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{self.s3_client.endpoint}/{cold_bucket}", + AUTH_METHOD="AWS", + AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}", + AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}", + AWS_REGION="{self.s3_client.region}" + ) + """) + + self.ydb_client.query(f""" + CREATE EXTERNAL DATA SOURCE `{frozen_eds_path}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{self.s3_client.endpoint}/{frozen_bucket}", + AUTH_METHOD="AWS", + AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}", + AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}", + AWS_REGION="{self.s3_client.region}" + ) + """) + + def get_row_count() -> int: + return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}`")[0].rows[0]["Rows"] + + def get_row_count_by_date(past_days: int) -> int: + return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}` WHERE ts < CurrentUtcTimestamp() - DateTime::IntervalFromDays({past_days})")[0].rows[0]["Rows"] + + def get_portion_count() -> int: + return self.ydb_client.query(f"select count(*) as Rows from `{table_path}/.sys/primary_index_portion_stats`")[0].rows[0]["Rows"] + + cur_rows = 0 + while cur_rows < row_count: + self.ydb_client.query(""" + $row_count = %i; + $from_us = CAST(Timestamp('2010-01-01T00:00:00.000000Z') as Uint64); + $to_us = CAST(Timestamp('2030-01-01T00:00:00.000000Z') as Uint64); + $dt = $to_us - $from_us; + $k = ((1ul << 64) - 1) / CAST($dt - 1 as Double); + $rows= ListMap(ListFromRange(0, $row_count), ($i)->{ + $us = CAST(RandomNumber($i) / $k as Uint64) + $from_us; + $ts = Unwrap(CAST($us as Timestamp)); + return <| + ts: $ts, + s: 'some date:' || CAST($ts as String), + val: $us + |>; + }); + upsert into `%s` + select * FROM AS_TABLE($rows); + """ % (min(row_count - cur_rows, rows_in_upsert), table_path)) + cur_rows = get_row_count() + print(f"{cur_rows} rows inserted in total, portions: {get_portion_count()}") + + def get_rows_by_tier() -> dict[str, int]: + results = self.ydb_client.query(f"select TierName, sum(Rows) as Rows from `{table_path}/.sys/primary_index_portion_stats` GROUP BY TierName") + return {row["TierName"]: row["Rows"] for result_set in results for row in result_set.rows} + + print(f"After inserting: {get_rows_by_tier()}, portions: {get_portion_count()}") + print(f"Rows older than {days_to_cool} days: {get_row_count_by_date(days_to_cool)}") + print(f"Rows older than {days_to_froze} days: {get_row_count_by_date(days_to_froze)}") + + def portions_actualized_in_sys(): + rows_by_tier = get_rows_by_tier() + print(f"rows by tier: {rows_by_tier}, portions: {get_portion_count()}", file=sys.stderr) + if len(rows_by_tier) != 1: + return False + return row_count <= rows_by_tier["__DEFAULT"] + + if not self.wait_for(lambda: portions_actualized_in_sys(), 60): + raise Exception(".sys reports incorrect data portions") + + t0 = time.time() + stmt = f""" + ALTER TABLE `{table_path}` SET (TTL = + Interval("P{days_to_cool}D") TO EXTERNAL DATA SOURCE `{cold_eds_path}`, + Interval("P{days_to_froze}D") TO EXTERNAL DATA SOURCE `{frozen_eds_path}` + ON ts + ) + """ + print(stmt) + self.ydb_client.query(stmt) + print(f"TTL set in {time.time() - t0} seconds") + + def data_distributes_across_tiers(): + rows_by_tier = get_rows_by_tier() + cold_bucket_stat = self.s3_client.get_bucket_stat(cold_bucket) + frozen_bucket_stat = self.s3_client.get_bucket_stat(frozen_bucket) + print(f"rows by tier: {rows_by_tier}, portions: {get_portion_count()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}") + # TODO FIXME + # We can not expect proper distribution of data across tiers due to https://github.com/ydb-platform/ydb/issues/13525 + # So we wait until some data appears in any bucket + return cold_bucket_stat[0] != 0 or frozen_bucket_stat[0] != 0 + + if not self.wait_for(lambda: data_distributes_across_tiers(), 120): + # there is a bug in tiering, after fixing, replace print by next line + # raise Exception("Data eviction has not been started") + print("Data eviction has not been started") + t0 = time.time() + stmt = f""" + DELETE FROM `{table_path}` + """ + print(stmt) + self.ydb_client.query(stmt) + + # TODO FIXME after https://github.com/ydb-platform/ydb/issues/13523 + def data_deleted_from_buckets(): + rows_by_tier = get_rows_by_tier() + cold_bucket_stat = self.s3_client.get_bucket_stat(cold_bucket) + frozen_bucket_stat = self.s3_client.get_bucket_stat(frozen_bucket) + print(f"rows by tier: {rows_by_tier}, portions: {get_portion_count()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}") + return cold_bucket_stat[0] == 0 and frozen_bucket_stat[0] == 0 + + if self.wait_for(lambda: data_deleted_from_buckets(), 180): + print("all data deleted") + return + # Now ydbd has a bug which results in not deletion all data from tiers + # After fixing next line should be replaced by + # raise Exception('Not all data deleted') + print('Not all data deleted') diff --git a/ydb/tests/olap/ttl_tiering/ya.make b/ydb/tests/olap/ttl_tiering/ya.make index 4592f4ccacc..bb532ff9636 100644 --- a/ydb/tests/olap/ttl_tiering/ya.make +++ b/ydb/tests/olap/ttl_tiering/ya.make @@ -12,6 +12,7 @@ TEST_SRCS( ttl_unavailable_s3.py data_migration_when_alter_ttl.py unstable_connection.py + tier_delete.py ) SIZE(MEDIUM) @@ -19,6 +20,7 @@ SIZE(MEDIUM) PEERDIR( ydb/tests/library ydb/tests/library/test_meta + ydb/tests/olap/lib ydb/public/sdk/python ydb/public/sdk/python/enable_v3_new_behavior contrib/python/boto3 |