1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
#!/usr/bin/env python3
#--query_path .github/scripts/analytics/data_mart_queries/perfomance_olap_mart.sql --table_path perfomance/olap/fast_results --store_type column --partition_keys Run_start_timestamp --primary_keys Db Suite Test Branch Run_start_timestamp --ttl_min 43200 --ttl_key Run_start_timestamp
import argparse
import ydb
import configparser
import os
import time
# Load configuration
dir = os.path.dirname(__file__)
config = configparser.ConfigParser()
config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
config.read(config_file_path)
repo_path = os.path.abspath(f"{dir}/../../../")
DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
def get_data_from_query_with_metadata(driver, query):
results = []
scan_query = ydb.ScanQuery(query, {})
it = driver.table_client.scan_query(scan_query)
print(f"Executing query")
start_time = time.time()
column_types = None
while True:
try:
result = next(it)
if column_types is None:
column_types = [(col.name, col.type) for col in result.result_set.columns]
results.extend(result.result_set.rows)
except StopIteration:
break
end_time = time.time()
print(f'Captured {len(results)} rows, duration: {end_time - start_time}s')
return results, column_types
def ydb_type_to_str(ydb_type, store_type = 'ROW'):
# Converts YDB type to string representation for table creation
is_optional = False
if ydb_type.HasField('optional_type'):
is_optional = True
base_type = ydb_type.optional_type.item
else:
base_type = ydb_type
for type in ydb.PrimitiveType:
if type.proto.type_id == base_type.type_id:
break
if is_optional:
result_type = ydb.OptionalType(type)
name = result_type._repr
else:
result_type = type
name = result_type.name
if name.upper() == 'BOOL' and store_type.upper() == 'COLUMN':
if is_optional:
result_type = ydb.OptionalType(ydb.PrimitiveType.Uint8)
else:
result_type = ydb.PrimitiveType.Uint8
name = 'Uint8'
return result_type, name
def create_table(session, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key):
"""Create table based on the structure of the provided column types."""
if not column_types:
raise ValueError("No column types to create table from.")
columns_sql = []
for column_name, column_ydb_type in column_types:
column_type_obj, column_type_str = ydb_type_to_str(column_ydb_type, store_type.upper())
if column_name in primary_keys:
columns_sql.append(f"`{column_name}` {column_type_str.replace('?','')} NOT NULL")
else:
columns_sql.append(f"`{column_name}` {column_type_str.replace('?','')}")
partition_keys_sql = ", ".join([f"`{key}`" for key in partition_keys])
primary_keys_sql = ", ".join([f"`{key}`" for key in primary_keys])
# Добавляем TTL только если оба аргумента заданы
ttl_clause = ""
if ttl_min and ttl_key:
ttl_clause = f' TTL = Interval("PT{ttl_min}M") ON {ttl_key}'
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS `{table_path}` (
{', '.join(columns_sql)},
PRIMARY KEY ({primary_keys_sql})
)
PARTITION BY HASH({partition_keys_sql})
WITH (
{"STORE = COLUMN" if store_type.upper() == 'COLUMN' else ''}
{',' if store_type and ttl_clause else ''}
{ttl_clause}
)
"""
print(f"Creating table with query: {create_table_sql}")
session.execute_scheme(create_table_sql)
def create_table_if_not_exists(session, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key):
"""Create table if it does not already exist, based on column types."""
try:
session.describe_table(table_path)
print(f"Table '{table_path}' already exists.")
except ydb.Error:
print(f"Table '{table_path}' does not exist. Creating table...")
create_table(session, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key)
def bulk_upsert(table_client, table_path, rows, column_types,store_type='ROW'):
print(f"> Bulk upsert into: {table_path}")
column_types_map = ydb.BulkUpsertColumns()
for column_name, column_ydb_type in column_types:
column_type_obj, column_type_str = ydb_type_to_str(column_ydb_type, store_type.upper())
column_types_map.add_column(column_name, column_type_obj)
table_client.bulk_upsert(table_path, rows, column_types_map)
def parse_args():
parser = argparse.ArgumentParser(description="YDB Table Manager")
parser.add_argument("--table_path", required=True, help="Table path and name")
parser.add_argument("--query_path", required=True, help="Path to the SQL query file")
parser.add_argument("--store_type", choices=["column", "row"], required=True, help="Table store type (column or row)")
parser.add_argument("--partition_keys", nargs="+", required=True, help="List of partition keys")
parser.add_argument("--primary_keys", nargs="+", required=True, help="List of primary keys")
parser.add_argument("--ttl_min", type=int, help="TTL in minutes")
parser.add_argument("--ttl_key", help="TTL key column name")
return parser.parse_args()
def main():
args = parse_args()
if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
print("Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping")
return 1
else:
os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
"CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
]
table_path = args.table_path
batch_size = 1000
# Read SQL query from file
sql_query_path = os.path.join(repo_path, args.query_path)
print(f'Query found: {sql_query_path}')
with open(sql_query_path, 'r') as file:
sql_query = file.read()
with ydb.Driver(
endpoint=DATABASE_ENDPOINT,
database=DATABASE_PATH,
credentials=ydb.credentials_from_env_variables()
) as driver:
driver.wait(timeout=10, fail_fast=True)
with ydb.SessionPool(driver) as pool:
# Run query to get sample data and column types
results, column_types = get_data_from_query_with_metadata(driver, sql_query)
if not results:
print("No data to create table from.")
return
# Create table if not exists based on sample column types
pool.retry_operation_sync(
lambda session: create_table_if_not_exists(
session, f'{DATABASE_PATH}/{table_path}', column_types, args.store_type,
args.partition_keys, args.primary_keys, args.ttl_min, args.ttl_key
)
)
print(f'Preparing to upsert: {len(results)} rows')
for start in range(0, len(results), batch_size):
batch_rows = results[start:start + batch_size]
print(f'Upserting: {start}-{start + len(batch_rows)}/{len(results)} rows')
bulk_upsert(driver.table_client, f'{DATABASE_PATH}/{table_path}', batch_rows, column_types, args.store_type)
print('Data uploaded')
if __name__ == "__main__":
main()
|