aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon <yentsovsemyon@ydb.tech>2025-02-12 14:26:30 +0300
committerGitHub <noreply@github.com>2025-02-12 14:26:30 +0300
commit39ecbe0625acd9cf9e79deeddab7762474d29dcc (patch)
tree9962cbebb9deda2178c64a774ecf90c0d16dc1ed
parent82981ccb32e9522bc09564bde0613e9aa4e12bea (diff)
downloadydb-39ecbe0625acd9cf9e79deeddab7762474d29dcc.tar.gz
functional tiering tests: data correctness, unstable connection (#13629)
-rw-r--r--ydb/tests/olap/ttl_tiering/base.py17
-rw-r--r--ydb/tests/olap/ttl_tiering/data_correctness.py191
-rw-r--r--ydb/tests/olap/ttl_tiering/ttl_delete_s3.py6
-rw-r--r--ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py26
-rw-r--r--ydb/tests/olap/ttl_tiering/unstable_connection.py154
-rw-r--r--ydb/tests/olap/ttl_tiering/ya.make2
6 files changed, 386 insertions, 10 deletions
diff --git a/ydb/tests/olap/ttl_tiering/base.py b/ydb/tests/olap/ttl_tiering/base.py
index 16e12719d0..3882ef632b 100644
--- a/ydb/tests/olap/ttl_tiering/base.py
+++ b/ydb/tests/olap/ttl_tiering/base.py
@@ -67,6 +67,13 @@ class YdbClient:
def query(self, statement):
return self.session_pool.execute_with_retries(statement)
+ def bulk_upsert(self, table_path, column_types: ydb.BulkUpsertColumns, data_slice):
+ self.driver.table_client.bulk_upsert(
+ table_path,
+ data_slice,
+ column_types
+ )
+
class ColumnTableHelper:
def __init__(self, ydb_client: YdbClient, path: str):
@@ -92,6 +99,15 @@ class ColumnTableHelper:
results = self.ydb_client.query(stmt)
return {row["TierName"]: {"Portions": row["Portions"], "BlobSize": row["BlobSize"], "BlobCount": row["BlobCount"]} for result_set in results for row in result_set.rows}
+ def set_fast_compaction(self):
+ self.ydb_client.query(
+ f"""
+ ALTER OBJECT `{self.path}` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=`
+ {{"levels" : [{{"class_name" : "Zero", "portions_live_duration" : "5s", "expected_blobs_size" : 1000000000000, "portions_count_available" : 2}},
+ {{"class_name" : "Zero"}}]}}`);
+ """
+ )
+
class TllTieringTestBase(object):
@classmethod
@@ -120,6 +136,7 @@ class TllTieringTestBase(object):
"optimizer_freshness_check_duration_ms": 0,
"small_portion_detect_size_limit": 0,
"max_read_staleness_ms": 5000,
+ "alter_object_enabled": True,
},
additional_log_configs={
"TX_COLUMNSHARD_TIERING": LogLevels.DEBUG,
diff --git a/ydb/tests/olap/ttl_tiering/data_correctness.py b/ydb/tests/olap/ttl_tiering/data_correctness.py
new file mode 100644
index 0000000000..efc9b46638
--- /dev/null
+++ b/ydb/tests/olap/ttl_tiering/data_correctness.py
@@ -0,0 +1,191 @@
+import time
+import logging
+from .base import TllTieringTestBase, ColumnTableHelper
+import ydb
+import concurrent
+import random
+import datetime
+
+logger = logging.getLogger(__name__)
+
+
+class TestDataCorrectness(TllTieringTestBase):
+ test_name = "data_correctness"
+ cold_bucket = "cold"
+ n_shards = 4
+
+ @classmethod
+ def setup_class(cls):
+ super(TestDataCorrectness, cls).setup_class()
+ cls.s3_client.create_bucket(cls.cold_bucket)
+
+ def write_data(
+ self,
+ table: str,
+ timestamp_from_ms: int,
+ rows: int,
+ value: int = 1,
+ ):
+ chunk_size = 100
+ while rows:
+ current_chunk_size = min(chunk_size, rows)
+ data = [
+ {
+ "ts": timestamp_from_ms + i,
+ "s": random.randbytes(1024 * 10),
+ "val": value,
+ }
+ for i in range(current_chunk_size)
+ ]
+ self.ydb_client.bulk_upsert(
+ table,
+ self.column_types,
+ data,
+ )
+ timestamp_from_ms += current_chunk_size
+ rows -= current_chunk_size
+ assert rows >= 0
+
+ def total_values(self, table: str) -> int:
+ return (
+ self.ydb_client.query(f"select sum(val) as Values from `{table}`")[0].rows[
+ 0
+ ]["Values"]
+ or 0
+ )
+
+ def wait_eviction(self, table: ColumnTableHelper):
+ deadline = datetime.datetime.now() + datetime.timedelta(seconds=60)
+ while (
+ table.get_portion_stat_by_tier().get("__DEFAULT", {}).get("Portions", 0)
+ > self.n_shards
+ ):
+ assert (
+ datetime.datetime.now() < deadline
+ ), "Timeout for wait eviction is exceeded"
+
+ logger.info(
+ f"Waiting for data eviction: {table.get_portion_stat_by_tier()}"
+ )
+ time.sleep(1)
+
+ stats = table.get_portion_stat_by_tier()
+ assert len(stats) > 1 or '__DEFAULT' not in stats
+
+ def test(self):
+ """Implements https://github.com/ydb-platform/ydb/issues/13465"""
+ test_dir = f"{self.ydb_client.database}/{self.test_name}"
+ table_path = f"{test_dir}/table"
+ secret_prefix = self.test_name
+ access_key_id_secret_name = f"{secret_prefix}_key_id"
+ access_key_secret_secret_name = f"{secret_prefix}_key_secret"
+ eds_path = f"{test_dir}/{self.cold_bucket}"
+
+ # Expect empty buckets to avoid unintentional data deletion/modification
+ if self.s3_client.get_bucket_stat(self.cold_bucket) != (0, 0):
+ raise Exception("Bucket for cold data is not empty")
+
+ self.ydb_client.query(
+ f"""
+ CREATE TABLE `{table_path}` (
+ ts Timestamp NOT NULL,
+ s String,
+ val Uint64,
+ PRIMARY KEY(ts),
+ )
+ WITH (
+ STORE = COLUMN,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = {self.n_shards}
+ )
+ """
+ )
+
+ table = ColumnTableHelper(self.ydb_client, table_path)
+ table.set_fast_compaction()
+
+ self.column_types = ydb.BulkUpsertColumns()
+ self.column_types.add_column("ts", ydb.PrimitiveType.Timestamp)
+ self.column_types.add_column("s", ydb.PrimitiveType.String)
+ self.column_types.add_column("val", ydb.PrimitiveType.Uint64)
+
+ logger.info(f"Table {table_path} created")
+
+ self.ydb_client.query(
+ f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'"
+ )
+ self.ydb_client.query(
+ f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'"
+ )
+
+ self.ydb_client.query(
+ f"""
+ CREATE EXTERNAL DATA SOURCE `{eds_path}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{self.s3_client.endpoint}/{self.cold_bucket}",
+ AUTH_METHOD="AWS",
+ AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
+ AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
+ AWS_REGION="{self.s3_client.region}"
+ )
+ """
+ )
+
+ stmt = f"""
+ ALTER TABLE `{table_path}` SET (TTL =
+ Interval("PT1S") TO EXTERNAL DATA SOURCE `{eds_path}`
+ ON ts
+ )
+ """
+ logger.info(stmt)
+ self.ydb_client.query(stmt)
+
+ ts_start = int(datetime.datetime.now().timestamp() * 1000000)
+ rows = 10000
+ num_threads = 10
+ assert rows % num_threads == 0
+ chunk_size = rows // num_threads
+
+ # Write data
+ with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
+ insert_futures = [
+ executor.submit(
+ self.write_data,
+ table_path,
+ ts_start + i * chunk_size,
+ chunk_size,
+ 1,
+ )
+ for i in range(num_threads)
+ ]
+
+ concurrent.futures.wait(insert_futures)
+ for future in insert_futures:
+ future.result()
+
+ self.wait_eviction(table)
+ assert self.total_values(table_path) == rows
+
+ # Update data
+ with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
+ insert_futures = [
+ executor.submit(
+ self.write_data,
+ table_path,
+ ts_start + i * chunk_size,
+ chunk_size,
+ 2,
+ )
+ for i in range(num_threads)
+ ]
+
+ concurrent.futures.wait(insert_futures)
+ for future in insert_futures:
+ future.result()
+
+ self.wait_eviction(table)
+ assert self.total_values(table_path) == rows * 2
+
+ # Delete data
+ self.ydb_client.query(f"delete from `{table_path}`")
+
+ assert not self.total_values(table_path)
diff --git a/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py b/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py
index 9ea6848ced..9990bf8693 100644
--- a/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py
+++ b/ydb/tests/olap/ttl_tiering/ttl_delete_s3.py
@@ -66,7 +66,8 @@ class TestDeleteS3Ttl(TllTieringTestBase):
if self.s3_client.get_bucket_stat(medium_bucket) != (0, 0):
raise Exception("Bucket for medium data is not empty")
- self.ydb_client.query(f"""
+ self.ydb_client.query(
+ f"""
CREATE TABLE `{table_path}` (
ts Timestamp NOT NULL,
s String,
@@ -229,7 +230,8 @@ class TestDeleteS3Ttl(TllTieringTestBase):
if self.s3_client.get_bucket_stat(self.frozen_bucket) != (0, 0):
raise Exception("Bucket for frozen data is not empty")
- self.ydb_client.query(f"""
+ self.ydb_client.query(
+ f"""
CREATE TABLE `{table_path}` (
ts Timestamp NOT NULL,
s String,
diff --git a/ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py b/ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py
index db813d282a..c8758f7132 100644
--- a/ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py
+++ b/ydb/tests/olap/ttl_tiering/ttl_unavailable_s3.py
@@ -2,8 +2,11 @@ import os
import signal
import sys
import time
+import logging
-from .base import TllTieringTestBase
+from .base import TllTieringTestBase, ColumnTableHelper
+
+logger = logging.getLogger(__name__)
ROWS_CHUNK_SIZE = 1000000
ROWS_CHUNKS_COUNT = 10
@@ -21,9 +24,15 @@ class TestUnavailableS3(TllTieringTestBase):
v String,
PRIMARY KEY(ts),
)
- WITH (STORE = COLUMN)
+ WITH (
+ STORE = COLUMN,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2
+ )
""")
+ table = ColumnTableHelper(self.ydb_client, f"{self.ydb_client.database}/table")
+ table.set_fast_compaction()
+
self.s3_client.create_bucket(bucket_s3_name)
self.ydb_client.query(f"CREATE OBJECT s3_id (TYPE SECRET) WITH value = '{self.s3_client.key_id}'")
@@ -58,6 +67,9 @@ class TestUnavailableS3(TllTieringTestBase):
SELECT * FROM AS_TABLE($rows_list);
""")
+ def get_stat():
+ return self.s3_client.get_bucket_stat(bucket_s3_name)[0]
+
for i in range(0, ROWS_CHUNKS_COUNT // 2):
upsert_chunk(i)
@@ -68,6 +80,8 @@ class TestUnavailableS3(TllTieringTestBase):
)
""")
+ assert self.wait_for(get_stat, 30), "initial eviction"
+
print("!!! simulating S3 hang up -- sending SIGSTOP", file=sys.stderr)
os.kill(self.s3_pid, signal.SIGSTOP)
@@ -76,13 +90,9 @@ class TestUnavailableS3(TllTieringTestBase):
print("!!! simulating S3 recovery -- sending SIGCONT", file=sys.stderr)
os.kill(self.s3_pid, signal.SIGCONT)
- def get_stat():
- return self.s3_client.get_bucket_stat(bucket_s3_name)[0]
-
- # stat_old = get_stat()
+ stat_old = get_stat()
for i in range(ROWS_CHUNKS_COUNT // 2, ROWS_CHUNKS_COUNT):
upsert_chunk(i)
- # Uncomment after fixing https://github.com/ydb-platform/ydb/issues/13719
- # assert self.wait_for(lambda: get_stat() != stat_old, 120), "data distribution continuation"
+ assert self.wait_for(lambda: get_stat() != stat_old, 120), "data distribution continuation"
diff --git a/ydb/tests/olap/ttl_tiering/unstable_connection.py b/ydb/tests/olap/ttl_tiering/unstable_connection.py
new file mode 100644
index 0000000000..0c85a80035
--- /dev/null
+++ b/ydb/tests/olap/ttl_tiering/unstable_connection.py
@@ -0,0 +1,154 @@
+import time
+import logging
+from .base import TllTieringTestBase, ColumnTableHelper
+import ydb
+import concurrent
+import random
+import datetime
+import threading
+import subprocess
+
+logger = logging.getLogger(__name__)
+
+
+class TestUnstableConnection(TllTieringTestBase):
+ test_name = "unstable_connection"
+ cold_bucket = "cold"
+ num_writers = 10
+ num_readers = 4
+
+ @classmethod
+ def setup_class(cls):
+ super(TestUnstableConnection, cls).setup_class()
+ cls.s3_client.create_bucket(cls.cold_bucket)
+
+ def write_data(
+ self,
+ table: str,
+ timestamp_from_ms: int,
+ rows: int,
+ ):
+ chunk_size = 100
+ while rows:
+ current_chunk_size = min(chunk_size, rows)
+ data = [
+ {
+ 'ts': timestamp_from_ms + i,
+ 's': random.randbytes(1024),
+ } for i in range(current_chunk_size)
+ ]
+ self.ydb_client.bulk_upsert(
+ table,
+ self.column_types,
+ data,
+ )
+ timestamp_from_ms += current_chunk_size
+ rows -= current_chunk_size
+ assert rows >= 0
+
+ def writer(self, table: str, stop_event):
+ logger.info("Writer started")
+ while not stop_event.is_set():
+ self.write_data(table, int(datetime.datetime.now().timestamp() * 1000000), 1000)
+ logger.info("Writer stopped")
+
+ def reader(self, table: str, stop_event):
+ logger.info("Reader started")
+ while not stop_event.is_set():
+ self.ydb_client.query(f"SELECT ts FROM `{table}` WHERE StartsWith(s, \"a\")")
+ logger.info("Reader stopped")
+
+ def adversary(self, pid: int, stop_event):
+ while not stop_event.is_set():
+ logger.info(f"Stopping s3 process, pid={pid}")
+ subprocess.run(["kill", "-STOP", str(pid)])
+ time.sleep(1)
+ logger.info(f"Resuming s3 process, pid={pid}")
+ subprocess.run(["kill", "-CONT", str(pid)])
+ time.sleep(2)
+ subprocess.run(["kill", "-CONT", str(pid)])
+ logger.info("Adversary stopped")
+
+ def stopwatch(self, seconds: int, stop_event):
+ time.sleep(seconds)
+ stop_event.set()
+
+ def test(self):
+ ''' Implements https://github.com/ydb-platform/ydb/issues/13544'''
+ test_dir = f"{self.ydb_client.database}/{self.test_name}"
+ table_path = f"{test_dir}/table"
+ secret_prefix = self.test_name
+ access_key_id_secret_name = f"{secret_prefix}_key_id"
+ access_key_secret_secret_name = f"{secret_prefix}_key_secret"
+ eds_path = f"{test_dir}/{self.cold_bucket}"
+
+ # Expect empty buckets to avoid unintentional data deletion/modification
+ if self.s3_client.get_bucket_stat(self.cold_bucket) != (0, 0):
+ raise Exception("Bucket for cold data is not empty")
+
+ self.ydb_client.query(
+ f"""
+ CREATE TABLE `{table_path}` (
+ ts Timestamp NOT NULL,
+ s String,
+ val Uint64,
+ PRIMARY KEY(ts),
+ )
+ WITH (
+ STORE = COLUMN,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 4
+ )
+ """
+ )
+
+ table = ColumnTableHelper(self.ydb_client, table_path)
+ table.set_fast_compaction()
+
+ self.column_types = ydb.BulkUpsertColumns()
+ self.column_types.add_column("ts", ydb.PrimitiveType.Timestamp)
+ self.column_types.add_column("s", ydb.PrimitiveType.String)
+
+ logger.info(f"Table {table_path} created")
+
+ self.ydb_client.query(f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'")
+ self.ydb_client.query(f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'")
+
+ self.ydb_client.query(f"""
+ CREATE EXTERNAL DATA SOURCE `{eds_path}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{self.s3_client.endpoint}/{self.cold_bucket}",
+ AUTH_METHOD="AWS",
+ AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
+ AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
+ AWS_REGION="{self.s3_client.region}"
+ )
+ """)
+
+ stmt = f"""
+ ALTER TABLE `{table_path}` SET (TTL =
+ Interval("PT1S") TO EXTERNAL DATA SOURCE `{eds_path}`,
+ Interval("PT1M") DELETE
+ ON ts
+ )
+ """
+ logger.info(stmt)
+ self.ydb_client.query(stmt)
+
+ with concurrent.futures.ThreadPoolExecutor(max_workers=self.num_writers+self.num_readers+2) as executor:
+ stop_event = threading.Event()
+ workers = []
+
+ for _ in range(self.num_writers):
+ workers.append(executor.submit(self.writer, table_path, stop_event))
+
+ for _ in range(self.num_readers):
+ workers.append(executor.submit(self.reader, table_path, stop_event))
+
+ workers.append(executor.submit(self.adversary, self.s3_pid, stop_event))
+ workers.append(executor.submit(self.stopwatch, 120, stop_event))
+
+ time.sleep(60)
+ logger.info("Checking eviction")
+ assert table.get_portion_stat_by_tier().get(eds_path, {}).get("Rows", 0), f"Nothing is evicted after 1 minute: {table.get_portion_stat_by_tier()}"
+
+ concurrent.futures.wait(workers)
diff --git a/ydb/tests/olap/ttl_tiering/ya.make b/ydb/tests/olap/ttl_tiering/ya.make
index 2caf761764..84d1a5b676 100644
--- a/ydb/tests/olap/ttl_tiering/ya.make
+++ b/ydb/tests/olap/ttl_tiering/ya.make
@@ -5,9 +5,11 @@ ENV(YDB_ADDITIONAL_LOG_CONFIGS="TX_TIERING:DEBUG")
TEST_SRCS(
base.py
+ data_correctness.py
ttl_delete_s3.py
ttl_unavailable_s3.py
data_migration_when_alter_ttl.py
+ unstable_connection.py
)
SIZE(MEDIUM)