aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Avdonkin <aavdonkin@yandex.ru>2025-04-16 13:34:46 +0300
committerGitHub <noreply@github.com>2025-04-16 13:34:46 +0300
commitd9afcf2dd4af2df2dc7c779d996d197562ff9a33 (patch)
treef546f7fa2c2a09e14388e5da79cb1fe73be738f5
parentcf08f1c9fbbff9a6a8617ca76aeeb2c80ef1bfab (diff)
downloadydb-d9afcf2dd4af2df2dc7c779d996d197562ff9a33.tar.gz
Test checking that data are deleted from tiers after delete sql query (#13591)
-rw-r--r--ydb/tests/olap/ttl_tiering/base.py77
-rw-r--r--ydb/tests/olap/ttl_tiering/tier_delete.py205
-rw-r--r--ydb/tests/olap/ttl_tiering/ya.make2
3 files changed, 253 insertions, 31 deletions
diff --git a/ydb/tests/olap/ttl_tiering/base.py b/ydb/tests/olap/ttl_tiering/base.py
index 63de525ce28..663b71967a9 100644
--- a/ydb/tests/olap/ttl_tiering/base.py
+++ b/ydb/tests/olap/ttl_tiering/base.py
@@ -8,6 +8,7 @@ from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
from ydb.tests.library.harness.util import LogLevels
from ydb.tests.olap.common.s3_client import S3Mock, S3Client
from ydb.tests.olap.common.ydb_client import YdbClient
+from ydb.tests.olap.lib.utils import get_external_param
logger = logging.getLogger(__name__)
@@ -15,6 +16,13 @@ logger = logging.getLogger(__name__)
class TllTieringTestBase(object):
@classmethod
def setup_class(cls):
+ cls.endpoint = get_external_param("endpoint", None)
+ cls.database = get_external_param("database", default="/Root/test")
+ cls.s3_endpoint = get_external_param("s3-endpoint", None)
+ cls.s3_region = get_external_param("s3-region", "us-east-1")
+ cls.s3_key_id = get_external_param("s3-key-id", "fake_access_key_id")
+ cls.s3_key_secret = get_external_param("s3-key-secret", "fake_secret_access_key")
+
cls._setup_ydb()
cls._setup_s3()
@@ -26,43 +34,50 @@ class TllTieringTestBase(object):
@classmethod
def _setup_ydb(cls):
- ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
- logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
- config = KikimrConfigGenerator(
- extra_feature_flags={
- "enable_external_data_sources": True,
- "enable_tiering_in_column_shard": True
- },
- column_shard_config={
- "lag_for_compaction_before_tierings_ms": 0,
- "compaction_actualization_lag_ms": 0,
- "optimizer_freshness_check_duration_ms": 0,
- "small_portion_detect_size_limit": 0,
- "max_read_staleness_ms": 60000,
- "alter_object_enabled": True,
- },
- additional_log_configs={
- "TX_COLUMNSHARD_TIERING": LogLevels.DEBUG,
- "TX_COLUMNSHARD_ACTUALIZATION": LogLevels.TRACE,
- "TX_COLUMNSHARD_BLOBS_TIER": LogLevels.DEBUG,
- },
- query_service_config=dict(
- available_external_data_sources=["ObjectStorage"]
+ if cls.endpoint is None:
+ ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
+ logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
+ config = KikimrConfigGenerator(
+ extra_feature_flags={
+ "enable_external_data_sources": True,
+ "enable_tiering_in_column_shard": True
+ },
+ column_shard_config={
+ "lag_for_compaction_before_tierings_ms": 0,
+ "compaction_actualization_lag_ms": 0,
+ "optimizer_freshness_check_duration_ms": 0,
+ "small_portion_detect_size_limit": 0,
+ "max_read_staleness_ms": 60000,
+ "alter_object_enabled": True,
+ },
+ additional_log_configs={
+ "TX_COLUMNSHARD_TIERING": LogLevels.DEBUG,
+ "TX_COLUMNSHARD_ACTUALIZATION": LogLevels.TRACE,
+ "TX_COLUMNSHARD_BLOBS_TIER": LogLevels.DEBUG,
+ },
+ query_service_config=dict(
+ available_external_data_sources=["ObjectStorage"]
+ )
)
- )
- cls.cluster = KiKiMR(config)
- cls.cluster.start()
- node = cls.cluster.nodes[1]
- cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}")
+ cls.cluster = KiKiMR(config)
+ cls.cluster.start()
+ node = cls.cluster.nodes[1]
+ cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}")
+ else:
+ cls.ydb_client = YdbClient(database=cls.database, endpoint=cls.endpoint)
+
cls.ydb_client.wait_connection()
@classmethod
def _setup_s3(cls):
- cls.s3_mock = S3Mock(yatest.common.binary_path(os.environ["MOTO_SERVER_PATH"]))
- cls.s3_mock.start()
+ if cls.s3_endpoint is None:
+ cls.s3_mock = S3Mock(yatest.common.binary_path(os.environ["MOTO_SERVER_PATH"]))
+ cls.s3_mock.start()
- cls.s3_pid = cls.s3_mock.s3_pid
- cls.s3_client = S3Client(endpoint=cls.s3_mock.endpoint)
+ cls.s3_pid = cls.s3_mock.s3_pid
+ cls.s3_client = S3Client(endpoint=cls.s3_mock.endpoint)
+ else:
+ cls.s3_client = S3Client(cls.s3_endpoint, cls.s3_region, cls.s3_key_id, cls.s3_key_secret)
@staticmethod
def wait_for(condition_func, timeout_seconds):
diff --git a/ydb/tests/olap/ttl_tiering/tier_delete.py b/ydb/tests/olap/ttl_tiering/tier_delete.py
new file mode 100644
index 00000000000..2062ca00d62
--- /dev/null
+++ b/ydb/tests/olap/ttl_tiering/tier_delete.py
@@ -0,0 +1,205 @@
+import sys
+import time
+import logging
+
+from .base import TllTieringTestBase
+from ydb.tests.olap.lib.utils import get_external_param
+
+
+logger = logging.getLogger(__name__)
+
+
+class TestTierDelete(TllTieringTestBase):
+ test_name = "test_delete_s3_ttl"
+
+ @classmethod
+ def setup_class(cls):
+ super(TestTierDelete, cls).setup_class()
+ pass
+
+ def test_delete_s3_ttl(self):
+ path_prefix = get_external_param("prefix", "olap_tests")
+ row_count = int(get_external_param("row-count", 10 ** 5))
+ rows_in_upsert = int(get_external_param("rows-in-upsert", 10 ** 4))
+ cold_bucket = get_external_param("bucket-cold", "cold")
+ frozen_bucket = get_external_param("bucket-frozen", "frozen")
+
+ """
+ session = boto3.session.Session()
+ s3client = session.client(
+ service_name='s3',
+ aws_access_key_id = s3_key_id,
+ aws_secret_access_key = s3_key_secret,
+ region_name = s3_region,
+ endpoint_url=s3_endpoint)
+ """
+ self.s3_client.create_bucket("cold")
+ self.s3_client.create_bucket("frozen")
+
+ ''' Implements https://github.com/ydb-platform/ydb/issues/13467 '''
+ test_name = "delete_tiering"
+ test_dir = f"{self.ydb_client.database}/{path_prefix}/{test_name}"
+ table_path = f"{test_dir}/table"
+ secret_prefix = f"{path_prefix}_{test_name}"
+ access_key_id_secret_name = f"{secret_prefix}_key_id"
+ access_key_secret_secret_name = f"{secret_prefix}_key_secret"
+ cold_eds_path = f"{test_dir}/cold"
+ frozen_eds_path = f"{test_dir}/frozen"
+ days_to_cool = 1000
+ days_to_froze = 3000
+
+ # Expect empty buckets to avoid unintentional data deletion/modification
+ if self.s3_client.get_bucket_stat(cold_bucket) != (0, 0):
+ raise Exception("Bucket for cold data is not empty")
+ if self.s3_client.get_bucket_stat(frozen_bucket) != (0, 0):
+ raise Exception("Bucket for frozen data is not empty")
+
+ delete_resource_statements = [
+ f"DROP TABLE `{table_path}`",
+ f"DROP EXTERNAL DATA SOURCE `{cold_eds_path}`",
+ f"DROP EXTERNAL DATA SOURCE `{frozen_eds_path}`",
+ f"DROP OBJECT {access_key_id_secret_name} (TYPE SECRET)",
+ f"DROP OBJECT {access_key_secret_secret_name} (TYPE SECRET)"
+ ]
+
+ for s in delete_resource_statements:
+ try:
+ print(s)
+ self.ydb_client.query(s)
+ print("OK")
+ except Exception:
+ print("FAIL")
+
+ self.ydb_client.query(f"""
+ CREATE TABLE `{table_path}` (
+ ts Timestamp NOT NULL,
+ s String,
+ val Uint64,
+ PRIMARY KEY(ts),
+ )
+ WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT=2)
+ """)
+
+ self.ydb_client.query(f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'")
+ self.ydb_client.query(f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'")
+
+ self.ydb_client.query(f"""
+ CREATE EXTERNAL DATA SOURCE `{cold_eds_path}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{self.s3_client.endpoint}/{cold_bucket}",
+ AUTH_METHOD="AWS",
+ AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
+ AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
+ AWS_REGION="{self.s3_client.region}"
+ )
+ """)
+
+ self.ydb_client.query(f"""
+ CREATE EXTERNAL DATA SOURCE `{frozen_eds_path}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{self.s3_client.endpoint}/{frozen_bucket}",
+ AUTH_METHOD="AWS",
+ AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
+ AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
+ AWS_REGION="{self.s3_client.region}"
+ )
+ """)
+
+ def get_row_count() -> int:
+ return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}`")[0].rows[0]["Rows"]
+
+ def get_row_count_by_date(past_days: int) -> int:
+ return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}` WHERE ts < CurrentUtcTimestamp() - DateTime::IntervalFromDays({past_days})")[0].rows[0]["Rows"]
+
+ def get_portion_count() -> int:
+ return self.ydb_client.query(f"select count(*) as Rows from `{table_path}/.sys/primary_index_portion_stats`")[0].rows[0]["Rows"]
+
+ cur_rows = 0
+ while cur_rows < row_count:
+ self.ydb_client.query("""
+ $row_count = %i;
+ $from_us = CAST(Timestamp('2010-01-01T00:00:00.000000Z') as Uint64);
+ $to_us = CAST(Timestamp('2030-01-01T00:00:00.000000Z') as Uint64);
+ $dt = $to_us - $from_us;
+ $k = ((1ul << 64) - 1) / CAST($dt - 1 as Double);
+ $rows= ListMap(ListFromRange(0, $row_count), ($i)->{
+ $us = CAST(RandomNumber($i) / $k as Uint64) + $from_us;
+ $ts = Unwrap(CAST($us as Timestamp));
+ return <|
+ ts: $ts,
+ s: 'some date:' || CAST($ts as String),
+ val: $us
+ |>;
+ });
+ upsert into `%s`
+ select * FROM AS_TABLE($rows);
+ """ % (min(row_count - cur_rows, rows_in_upsert), table_path))
+ cur_rows = get_row_count()
+ print(f"{cur_rows} rows inserted in total, portions: {get_portion_count()}")
+
+ def get_rows_by_tier() -> dict[str, int]:
+ results = self.ydb_client.query(f"select TierName, sum(Rows) as Rows from `{table_path}/.sys/primary_index_portion_stats` GROUP BY TierName")
+ return {row["TierName"]: row["Rows"] for result_set in results for row in result_set.rows}
+
+ print(f"After inserting: {get_rows_by_tier()}, portions: {get_portion_count()}")
+ print(f"Rows older than {days_to_cool} days: {get_row_count_by_date(days_to_cool)}")
+ print(f"Rows older than {days_to_froze} days: {get_row_count_by_date(days_to_froze)}")
+
+ def portions_actualized_in_sys():
+ rows_by_tier = get_rows_by_tier()
+ print(f"rows by tier: {rows_by_tier}, portions: {get_portion_count()}", file=sys.stderr)
+ if len(rows_by_tier) != 1:
+ return False
+ return row_count <= rows_by_tier["__DEFAULT"]
+
+ if not self.wait_for(lambda: portions_actualized_in_sys(), 60):
+ raise Exception(".sys reports incorrect data portions")
+
+ t0 = time.time()
+ stmt = f"""
+ ALTER TABLE `{table_path}` SET (TTL =
+ Interval("P{days_to_cool}D") TO EXTERNAL DATA SOURCE `{cold_eds_path}`,
+ Interval("P{days_to_froze}D") TO EXTERNAL DATA SOURCE `{frozen_eds_path}`
+ ON ts
+ )
+ """
+ print(stmt)
+ self.ydb_client.query(stmt)
+ print(f"TTL set in {time.time() - t0} seconds")
+
+ def data_distributes_across_tiers():
+ rows_by_tier = get_rows_by_tier()
+ cold_bucket_stat = self.s3_client.get_bucket_stat(cold_bucket)
+ frozen_bucket_stat = self.s3_client.get_bucket_stat(frozen_bucket)
+ print(f"rows by tier: {rows_by_tier}, portions: {get_portion_count()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}")
+ # TODO FIXME
+ # We can not expect proper distribution of data across tiers due to https://github.com/ydb-platform/ydb/issues/13525
+ # So we wait until some data appears in any bucket
+ return cold_bucket_stat[0] != 0 or frozen_bucket_stat[0] != 0
+
+ if not self.wait_for(lambda: data_distributes_across_tiers(), 120):
+ # there is a bug in tiering, after fixing, replace print by next line
+ # raise Exception("Data eviction has not been started")
+ print("Data eviction has not been started")
+ t0 = time.time()
+ stmt = f"""
+ DELETE FROM `{table_path}`
+ """
+ print(stmt)
+ self.ydb_client.query(stmt)
+
+ # TODO FIXME after https://github.com/ydb-platform/ydb/issues/13523
+ def data_deleted_from_buckets():
+ rows_by_tier = get_rows_by_tier()
+ cold_bucket_stat = self.s3_client.get_bucket_stat(cold_bucket)
+ frozen_bucket_stat = self.s3_client.get_bucket_stat(frozen_bucket)
+ print(f"rows by tier: {rows_by_tier}, portions: {get_portion_count()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}")
+ return cold_bucket_stat[0] == 0 and frozen_bucket_stat[0] == 0
+
+ if self.wait_for(lambda: data_deleted_from_buckets(), 180):
+ print("all data deleted")
+ return
+ # Now ydbd has a bug which results in not deletion all data from tiers
+ # After fixing next line should be replaced by
+ # raise Exception('Not all data deleted')
+ print('Not all data deleted')
diff --git a/ydb/tests/olap/ttl_tiering/ya.make b/ydb/tests/olap/ttl_tiering/ya.make
index 4592f4ccacc..bb532ff9636 100644
--- a/ydb/tests/olap/ttl_tiering/ya.make
+++ b/ydb/tests/olap/ttl_tiering/ya.make
@@ -12,6 +12,7 @@ TEST_SRCS(
ttl_unavailable_s3.py
data_migration_when_alter_ttl.py
unstable_connection.py
+ tier_delete.py
)
SIZE(MEDIUM)
@@ -19,6 +20,7 @@ SIZE(MEDIUM)
PEERDIR(
ydb/tests/library
ydb/tests/library/test_meta
+ ydb/tests/olap/lib
ydb/public/sdk/python
ydb/public/sdk/python/enable_v3_new_behavior
contrib/python/boto3