aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2024-12-24 14:50:24 +0300
committerGitHub <noreply@github.com>2024-12-24 14:50:24 +0300
commitc35f0028b0f9fdf6f1ebeda60b1f1aaed74504a2 (patch)
tree6ad81004117128e875c1d3d573601e6348f2be42
parent3b45c906bef6671a321a223bf3c6e3aa117fc6df (diff)
downloadydb-c35f0028b0f9fdf6f1ebeda60b1f1aaed74504a2.tar.gz
rework olap_workload (#12870)
-rw-r--r--ydb/tools/olap_workload/__main__.py417
-rw-r--r--ydb/tools/olap_workload/ya.make1
2 files changed, 257 insertions, 161 deletions
diff --git a/ydb/tools/olap_workload/__main__.py b/ydb/tools/olap_workload/__main__.py
index 68f4c88590..19d7359724 100644
--- a/ydb/tools/olap_workload/__main__.py
+++ b/ydb/tools/olap_workload/__main__.py
@@ -4,65 +4,134 @@ import ydb
import time
import os
import random
-import string
+import threading
ydb.interceptor.monkey_patch_event_handler()
-def timestamp():
- return int(1000 * time.time())
-
-
-def table_name_with_timestamp():
- return os.path.join("column_table_" + str(timestamp()))
-
-
-def random_string(length):
- letters = string.ascii_lowercase
- return bytes(''.join(random.choice(letters) for i in range(length)), encoding='utf8')
-
-
-def random_type():
- return random.choice([ydb.PrimitiveType.Int64, ydb.PrimitiveType.String])
-
-
-def random_value(type):
- if isinstance(type, ydb.OptionalType):
- return random_value(type.item)
- if type == ydb.PrimitiveType.Int64:
- return random.randint(0, 1 << 31)
- if type == ydb.PrimitiveType.String:
- return random_string(random.randint(1, 32))
-
-
-class Workload(object):
- def __init__(self, endpoint, database, duration, batch_size):
+class YdbClient:
+ def __init__(self, endpoint, database, use_query_service=False):
+ self.driver = ydb.Driver(endpoint=endpoint, database=database, oauth=None)
self.database = database
- self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database))
- self.pool = ydb.SessionPool(self.driver, size=200)
- self.duration = duration
- self.batch_size = batch_size
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.pool.stop()
- self.driver.stop()
-
- def run_query_ignore_errors(self, callee):
+ self.use_query_service = use_query_service
+ self.session_pool = ydb.QuerySessionPool(self.driver) if use_query_service else ydb.SessionPool(self.driver)
+
+ def wait_connection(self, timeout=5):
+ self.driver.wait(timeout, fail_fast=True)
+
+ def query(self, statement, is_ddl):
+ if self.use_query_service:
+ return self.session_pool.execute_with_retries(statement)
+ else:
+ if is_ddl:
+ return self.session_pool.retry_operation_sync(lambda session: session.execute_scheme(statement))
+ else:
+ raise "Unsuppported dml" # TODO implement me
+
+ def drop_table(self, path_to_table):
+ if self.use_query_service:
+ self.session_pool.execute_with_retries(f"DROP TABLE `{path_to_table}`")
+ else:
+ self.session_pool.retry_operation_sync(lambda session: session.drop_table(path_to_table))
+
+ def describe(self, path):
try:
- self.pool.retry_operation_sync(callee)
- except Exception as e:
- print(type(e), e)
-
- def create_table(self, table_name):
- print(f"Create table {table_name}")
-
- def callee(session):
- session.execute_scheme(
- f"""
- CREATE TABLE {table_name} (
+ return self.driver.scheme_client.describe_path(path)
+ except ydb.issues.SchemeError as e:
+ if "Path not found" in e.message:
+ return None
+ raise e
+
+ def _remove_recursively(self, path):
+ deleted = 0
+ d = self.driver.scheme_client.list_directory(path)
+ for entry in d.children:
+ entry_path = "/".join([path, entry.name])
+ if entry.is_directory():
+ deleted += self._remove_recursively(entry_path)
+ elif entry.is_column_table() or entry.is_table():
+ self.drop_table(entry_path)
+ deleted += 1
+ else:
+ raise f"Scheme entry {entry_path} of unexpected type"
+ self.driver.scheme_client.remove_directory(path)
+ return deleted
+
+ def remove_recursively(self, path):
+ d = self.describe(path)
+ if d is None:
+ return
+ if not d.is_directory():
+ raise f"{path} has unexpected type"
+ return self._remove_recursively(path)
+
+
+class WorkloadBase:
+ def __init__(self, client, tables_prefix, workload_name, stop):
+ self.client = client
+ self.table_prefix = tables_prefix + '/' + workload_name
+ self.name = workload_name
+ self.stop = stop
+ self.workload_threads = []
+
+ def name(self):
+ return self.name
+
+ def get_table_path(self, table_name):
+ return "/".join([self.client.database, self.table_prefix, table_name])
+
+ def is_stop_requested(self):
+ return self.stop.is_set()
+
+ def start(self):
+ funcs = self.get_workload_thread_funcs()
+
+ def wrapper(f):
+ try:
+ f()
+ except Exception as e:
+ print(f"FATAL: {e}")
+ os._exit(1)
+
+ for f in funcs:
+ t = threading.Thread(target=lambda: wrapper(f))
+ t.start()
+ self.workload_threads.append(t)
+
+ def join(self):
+ for t in self.workload_threads:
+ t.join()
+
+
+class WorkloadTablesCreateDrop(WorkloadBase):
+ def __init__(self, client, prefix, stop):
+ super().__init__(client, prefix, "create_drop", stop)
+ self.created = 0
+ self.deleted = 0
+ self.tables = set()
+ self.lock = threading.Lock()
+
+ def get_stat(self):
+ with self.lock:
+ return f"Created: {self.created}, Deleted: {self.deleted}, Exists: {len(self.tables)}"
+
+ def _generate_new_table_n(self):
+ while True:
+ r = random.randint(1, 40000)
+ with self.lock:
+ if r not in self.tables:
+ return r
+
+ def _get_existing_table_n(self):
+ with self.lock:
+ if len(self.tables) == 0:
+ return None
+ return next(iter(self.tables))
+
+ def create_table(self, table):
+ path = self.get_table_path(table)
+ stmt = f"""
+ CREATE TABLE `{path}` (
id Int64 NOT NULL,
i64Val Int64,
PRIMARY KEY(id)
@@ -72,124 +141,150 @@ class Workload(object):
STORE = COLUMN
)
"""
+ self.client.query(stmt, True)
+
+ def _create_tables_loop(self):
+ while not self.is_stop_requested():
+ n = self._generate_new_table_n()
+ self.create_table(str(n))
+ with self.lock:
+ self.tables.add(n)
+ self.created += 1
+
+ def _delete_tables_loop(self):
+ while not self.is_stop_requested():
+ n = self._get_existing_table_n()
+ if n is None:
+ print("create_drop: No tables to delete")
+ time.sleep(10)
+ continue
+ self.client.drop_table(self.get_table_path(str(n)))
+ with self.lock:
+ self.tables.remove(n)
+ self.deleted += 1
+
+ def get_workload_thread_funcs(self):
+ r = [self._create_tables_loop for x in range(0, 10)]
+ r.append(self._delete_tables_loop)
+ return r
+
+
+class WorkloadInsertDelete(WorkloadBase):
+ def __init__(self, client, prefix, stop):
+ super().__init__(client, prefix, "insert_delete", stop)
+ self.inserted = 0
+ self.current = 0
+ self.table_name = "table"
+ self.lock = threading.Lock()
+
+ def get_stat(self):
+ with self.lock:
+ return f"Inserted: {self.inserted}, Current: {self.current}"
+
+ def _loop(self):
+ table_path = self.get_table_path(self.table_name)
+ self.client.query(
+ f"""
+ CREATE TABLE `{table_path}` (
+ id Int64 NOT NULL,
+ i64Val Int64,
+ PRIMARY KEY(id)
+ )
+ PARTITION BY HASH(id)
+ WITH (
+ STORE = COLUMN
+ )
+ """,
+ True,
+ )
+ i = 1
+ while not self.is_stop_requested():
+ self.client.query(
+ f"""
+ INSERT INTO `{table_path}` (`id`, `i64Val`)
+ VALUES
+ ({i * 2}, {i * 10}),
+ ({i * 2 + 1}, {i * 10 + 1})
+ """,
+ False,
)
- self.run_query_ignore_errors(callee)
-
- def drop_table(self, table_name):
- print(f"Drop table {table_name}")
-
- def callee(session):
- session.drop_table(self.database + "/" + table_name)
-
- self.run_query_ignore_errors(callee)
-
- def add_column(self, table_name, col_name, col_type):
- print(f"Add column {table_name}.{col_name} {str(col_type)}")
-
- def callee(session):
- session.execute_scheme(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {str(col_type)}")
-
- self.run_query_ignore_errors(callee)
-
- def drop_column(self, table_name, col_name):
- print(f"Drop column {table_name}.{col_name}")
-
- def callee(session):
- session.execute_scheme(f"ALTER TABLE {table_name} DROP COLUMN {col_name}")
-
- self.run_query_ignore_errors(callee)
-
- def generate_batch(self, schema):
- data = []
-
- for i in range(self.batch_size):
- data.append({c.name: random_value(c.type) for c in schema})
-
- return data
-
- def add_batch(self, table_name, schema):
- print(f"Add batch {table_name}")
-
- column_types = ydb.BulkUpsertColumns()
-
- for c in schema:
- column_types.add_column(c.name, c.type)
-
- batch = self.generate_batch(schema)
-
- self.driver.table_client.bulk_upsert(self.database + "/" + table_name, batch, column_types)
-
- def list_tables(self):
- db = self.driver.scheme_client.list_directory(self.database)
- return [t.name for t in db.children if t.type == ydb.SchemeEntryType.COLUMN_TABLE]
-
- def list_columns(self, table_name):
- path = self.database + "/" + table_name
-
- def callee(session):
- return session.describe_table(path).columns
-
- return self.pool.retry_operation_sync(callee)
-
- def rows_count(self, table_name):
- return self.driver.table_client.scan_query(f"SELECT count(*) FROM {table_name}").next().result_set.rows[0][0]
-
- def select_n(self, table_name, limit):
- print(f"Select {limit} from {table_name}")
- self.driver.table_client.scan_query(f"SELECT * FROM {table_name} limit {limit}").next()
-
- def drop_all_tables(self):
- for t in self.list_tables():
- if t.startswith("column_table_"):
- self.drop_table(t)
-
- def drop_all_columns(self, table_name):
- for c in self.list_columns(table_name):
- if c.name != "id":
- self.drop_column(table_name, c.name)
-
- def queries_while_alter(self, table_name):
-
- schema = self.list_columns(table_name)
+ self.client.query(
+ f"""
+ DELETE FROM `{table_path}`
+ WHERE i64Val % 2 == 1
+ """,
+ False,
+ )
- self.select_n(table_name, 1000)
- self.add_batch(table_name, schema)
- self.select_n(table_name, 100)
- self.add_batch(table_name, schema)
- self.select_n(table_name, 300)
+ actual = self.client.query(
+ f"""
+ SELECT COUNT(*) as cnt, SUM(i64Val) as vals, SUM(id) as ids FROM `{table_path}`
+ """,
+ False,
+ )[0].rows[0]
+ expected = {"cnt": i, "vals": i * (i + 1) * 5, "ids": i * (i + 1)}
+ if actual != expected:
+ raise Exception(f"Incorrect result: expected:{expected}, actual:{actual}")
+ i += 1
+ with self.lock:
+ self.inserted += 2
+ self.current = actual["cnt"]
+
+ def get_workload_thread_funcs(self):
+ return [self._loop]
+
+
+class WorkloadRunner:
+ def __init__(self, client, name, duration):
+ self.client = client
+ self.name = name
+ self.tables_prefix = "/".join([self.client.database, self.name])
+ self.duration = duration
- if len(schema) > 50:
- self.drop_all_columns(table_name)
+ def __enter__(self):
+ self._cleanup()
+ return self
- if self.rows_count(table_name) > 100000:
- self.drop_table(table_name)
+ def __exit__(self, exc_type, exc_value, traceback):
+ self._cleanup()
- col = "col_" + str(timestamp())
- self.add_column(table_name, col, random_type())
+ def _cleanup(self):
+ print(f"Cleaning up {self.tables_prefix}...")
+ deleted = client.remove_recursively(self.tables_prefix)
+ print(f"Cleaning up {self.tables_prefix}... done, {deleted} tables deleted")
def run(self):
- started_at = time.time()
-
+ stop = threading.Event()
+ workloads = [
+ WorkloadTablesCreateDrop(self.client, self.name, stop),
+ WorkloadInsertDelete(self.client, self.name, stop),
+ ]
+ for w in workloads:
+ w.start()
+ started_at = started_at = time.time()
while time.time() - started_at < self.duration:
- try:
- # TODO: drop only created tables by current workload
- self.drop_all_tables()
- table_name = table_name_with_timestamp()
- self.create_table(table_name)
- self.queries_while_alter(table_name)
- except Exception as e:
- print(type(e), e)
-
-
-if __name__ == '__main__':
+ print(f"Elapsed {(int)(time.time() - started_at)} seconds, stat:")
+ for w in workloads:
+ print(f"\t{w.name}: {w.get_stat()}")
+ time.sleep(10)
+ stop.set()
+ print("Waiting for stop...")
+ for w in workloads:
+ w.join()
+ print("Waiting for stop... stopped")
+
+
+if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="olap stability workload", formatter_class=argparse.RawDescriptionHelpFormatter
)
- parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used")
- parser.add_argument('--database', default=None, required=True, help='A database to connect')
- parser.add_argument('--duration', default=10 ** 9, type=lambda x: int(x), help='A duration of workload in seconds.')
- parser.add_argument('--batch_size', default=1000, help='Batch size for bulk insert')
+ parser.add_argument("--endpoint", default="localhost:2135", help="An endpoint to be used")
+ parser.add_argument("--database", default="Root/test", help="A database to connect")
+ parser.add_argument("--path", default="olap_workload", help="A path prefix for tables")
+ parser.add_argument("--duration", default=10 ** 9, type=lambda x: int(x), help="A duration of workload in seconds.")
args = parser.parse_args()
- with Workload(args.endpoint, args.database, args.duration, args.batch_size) as workload:
- workload.run()
+ client = YdbClient(args.endpoint, args.database, True)
+ client.wait_connection()
+ with WorkloadRunner(client, args.path, args.duration) as runner:
+ runner.run()
diff --git a/ydb/tools/olap_workload/ya.make b/ydb/tools/olap_workload/ya.make
index 939ecf1af9..85338ccf88 100644
--- a/ydb/tools/olap_workload/ya.make
+++ b/ydb/tools/olap_workload/ya.make
@@ -6,6 +6,7 @@ PY_SRCS(
PEERDIR(
ydb/public/sdk/python
+ ydb/public/sdk/python/enable_v3_new_behavior
library/python/monlib
)