aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-12-24 17:53:52 +0000
committerGitHub <noreply@github.com>2024-12-24 20:53:52 +0300
commit8165ac169019ce18729318c775eb674f82843c6d (patch)
tree0261336089c8558ff0ee5bd85bc30949913390d5
parent3e28916df1854d5db04229c6542ef69494a0fb95 (diff)
downloadydb-8165ac169019ce18729318c775eb674f82843c6d.tar.gz
remove forgotten files (#12939)
-rw-r--r--ydb/tools/statistics_workload/__main__.py205
-rw-r--r--ydb/tools/statistics_workload/ya.make13
2 files changed, 0 insertions, 218 deletions
diff --git a/ydb/tools/statistics_workload/__main__.py b/ydb/tools/statistics_workload/__main__.py
deleted file mode 100644
index 14359e5a05..0000000000
--- a/ydb/tools/statistics_workload/__main__.py
+++ /dev/null
@@ -1,205 +0,0 @@
-# -*- coding: utf-8 -*-
-import argparse
-import ydb
-import logging
-import time
-import os
-import random
-import string
-from ydb.tests.library.clients.kikimr_client import kikimr_client_factory
-from ydb.tests.library.common.protobuf_ss import SchemeDescribeRequest
-
-ydb.interceptor.monkey_patch_event_handler()
-
-
-logger = logging.getLogger("StatisticsWorkload")
-
-
-def table_name_with_prefix(table_prefix):
- table_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
- return os.path.join(table_prefix + "_" + table_suffix)
-
-
-def random_string(length):
- letters = string.ascii_lowercase
- return ''.join(random.choice(letters) for i in range(length))
-
-
-def random_type():
- return random.choice([ydb.PrimitiveType.Int64, ydb.PrimitiveType.String])
-
-
-def random_value(type):
- if isinstance(type, ydb.OptionalType):
- return random_value(type.item)
- if type == ydb.PrimitiveType.Int64:
- return random.randint(0, 1 << 31)
- if type == ydb.PrimitiveType.String:
- return bytes(random_string(random.randint(1, 32)), encoding='utf8')
-
-
-class Workload(object):
- def __init__(self, host, port, database, duration, batch_size, batch_count):
- self.database = database
- self.driver = ydb.Driver(ydb.DriverConfig(f"{host}:{port}", database))
- self.kikimr_client = kikimr_client_factory(host, port)
- self.pool = ydb.SessionPool(self.driver, size=200)
- self.duration = duration
- self.batch_size = batch_size
- self.batch_count = batch_count
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.pool.stop()
- self.driver.stop()
-
- def run_query_ignore_errors(self, callee):
- try:
- self.pool.retry_operation_sync(callee)
- except Exception as e:
- logger.error(f'{type(e)}, {e}')
-
- def generate_batch(self, schema):
- data = []
- for i in range(self.batch_size):
- data.append({c.name: random_value(c.type) for c in schema})
- return data
-
- def create_table(self, table_name):
- def callee(session):
- session.execute_scheme(f"""
- CREATE TABLE `{table_name}` (
- id Int64 NOT NULL,
- value Int64,
- PRIMARY KEY(id)
- )
- PARTITION BY HASH(id)
- WITH (
- STORE = COLUMN
- )
- """)
- self.run_query_ignore_errors(callee)
-
- def enable_statistics(self, table_name):
- def callee(session):
- session.execute_scheme(f"""
- ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_key, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['id']}`);
- """)
- session.execute_scheme(f"""
- ALTER OBJECT `{table_name}` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_value, TYPE=COUNT_MIN_SKETCH, FEATURES=`{'column_names': ['value']}`);
- """)
- self.run_query_ignore_errors(callee)
-
- def drop_table(self, table_name):
- def callee(session):
- session.drop_table(table_name)
- self.run_query_ignore_errors(callee)
-
- def list_columns(self, table_path):
- def callee(session):
- return session.describe_table(table_path).columns
- return self.pool.retry_operation_sync(callee)
-
- def add_data(self, table_path, trace_id):
- logger.info(f"[{trace_id}] insert {self.batch_count} batches of {self.batch_size} bytes each")
- schema = self.list_columns(table_path)
- column_types = ydb.BulkUpsertColumns()
-
- for c in schema:
- column_types.add_column(c.name, c.type)
-
- for i in range(self.batch_count):
- logger.info(f"[{trace_id}] add batch #{i}")
- batch = self.generate_batch(schema)
- self.driver.table_client.bulk_upsert(table_path, batch, column_types)
-
- def rows_count(self, table_name):
- return self.driver.table_client.scan_query(f"SELECT count(*) FROM `{table_name}`").next().result_set.rows[0][0]
-
- def statistics_count(self, table_statistics, path_id):
- query = f"SELECT count(*) FROM `{table_statistics}` WHERE local_path_id = {path_id}"
- return self.driver.table_client.scan_query(query).next().result_set.rows[0][0]
-
- def analyze(self, table_path):
- def callee(session):
- session.execute_scheme(f"ANALYZE `{table_path}`")
- self.run_query_ignore_errors(callee)
-
- def execute(self):
- table_prefix = "test_table"
- table_name = table_name_with_prefix(table_prefix)
- table_path = self.database + "/" + table_name
- table_statistics = ".metadata/_statistics"
- trace_id = random_string(5)
-
- try:
- logger.info(f"[{trace_id}] start new round")
-
- self.pool.acquire()
-
- logger.info(f"[{trace_id}] create table '{table_name}'")
- self.create_table(table_name)
-
- scheme = self.kikimr_client.send(
- SchemeDescribeRequest(table_path).protobuf,
- method='SchemeDescribe'
- )
- path_id = scheme.PathDescription.Self.PathId
- logger.info(f"[{trace_id}] table '{table_name}' path id: {path_id}")
-
- self.add_data(table_path, trace_id)
- count = self.rows_count(table_name)
- logger.info(f"[{trace_id}] number of rows in table '{table_name}' {count}")
- if count != self.batch_count*self.batch_size:
- raise Exception(f"[{trace_id}] the number of rows in the '{table_name}' does not match the expected")
-
- logger.info(f"[{trace_id}] waiting to receive information about the table '{table_name}' from scheme shard")
- time.sleep(300)
-
- logger.info(f"[{trace_id}] analyze '{table_name}'")
- self.analyze(table_path)
-
- count = self.statistics_count(table_statistics, path_id)
- logger.info(f"[{trace_id}] number of rows in statistics table '{table_statistics}' {count}")
- if count == 0:
- raise Exception(f"[{trace_id}] statistics table '{table_statistics}' is empty")
- except Exception as e:
- logger.error(f"[{trace_id}] {type(e)}, {e}")
-
- logger.info(f"[{trace_id}] drop table '{table_name}'")
- self.drop_table(table_path)
-
- def run(self):
- started_at = time.time()
-
- while time.time() - started_at < self.duration:
- self.execute()
-
-
-if __name__ == '__main__':
- parser = argparse.ArgumentParser(
- description="statistics stability workload", formatter_class=argparse.RawDescriptionHelpFormatter
- )
- parser.add_argument('--host', default='localhost', help="An host to be used")
- parser.add_argument('--port', default='2135', help="A port to be used")
- parser.add_argument('--database', default=None, required=True, help='A database to connect')
- parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds')
- parser.add_argument('--batch_size', default=1000, help='Batch size for bulk insert')
- parser.add_argument('--batch_count', default=3, help='The number of butches to be inserted')
- parser.add_argument('--log_file', default=None, help='Append log into specified file')
-
- args = parser.parse_args()
-
- if args.log_file:
- logging.basicConfig(
- filename=args.log_file,
- filemode='a',
- format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
- datefmt='%H:%M:%S',
- level=logging.INFO
- )
-
- with Workload(args.host, args.port, args.database, args.duration, args.batch_size, args.batch_count) as workload:
- workload.run()
diff --git a/ydb/tools/statistics_workload/ya.make b/ydb/tools/statistics_workload/ya.make
deleted file mode 100644
index 2b136aca24..0000000000
--- a/ydb/tools/statistics_workload/ya.make
+++ /dev/null
@@ -1,13 +0,0 @@
-PY3_PROGRAM(statistics_workload)
-
-PY_SRCS(
- __main__.py
-)
-
-PEERDIR(
- ydb/tests/library
- ydb/public/sdk/python
- library/python/monlib
-)
-
-END()