diff options
author | Kirill Rysin <35688753+naspirato@users.noreply.github.com> | 2024-08-14 17:03:27 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-14 15:03:27 +0000 |
commit | 79898e65a1845ff700b9920e7aec0680bd174b4a (patch) | |
tree | e959e6597885103f36a6c71d2016aefb3b33dfef | |
parent | d21cb124754a91822133906fbebbf730c5942144 (diff) | |
download | ydb-79898e65a1845ff700b9920e7aec0680bd174b4a.tar.gz |
TESTOWNERS and migration to column shard (#7733)
-rw-r--r-- | .github/TESTOWNERS | 82 | ||||
-rwxr-xr-x | .github/scripts/analytics/decode_imported_csv.py | 35 | ||||
-rwxr-xr-x | .github/scripts/analytics/flaky_tests_history.py | 6 | ||||
-rwxr-xr-x | .github/scripts/analytics/import_to_column.sh | 32 | ||||
-rwxr-xr-x | .github/scripts/analytics/migration_row_to_columt_tests.py | 216 | ||||
-rw-r--r-- | .github/scripts/tests/get_test_history.py | 87 | ||||
-rwxr-xr-x | .github/scripts/upload_tests_results.py | 301 |
7 files changed, 555 insertions, 204 deletions
diff --git a/.github/TESTOWNERS b/.github/TESTOWNERS new file mode 100644 index 00000000000..3d1fbc40aba --- /dev/null +++ b/.github/TESTOWNERS @@ -0,0 +1,82 @@ +# TEAM:@ydb-platform/docs +/*.md @ydb-platform/docs +/ydb/docs/ @ydb-platform/docs + +# YQL @ydb-platform/yql +/ydb/library/yql/ @ydb-platform/yql +/ydb/library/yql/dq @ydb-platform/yql @ydb-platform/qp +/ydb/library/yql/yt @Krock21 @Krisha11 @zlobober @gritukan + +#TEAM:@ydb-platform/Topics +/ydb/core/kafka_proxy @ydb-platform/Topics +/ydb/core/persqueue @ydb-platform/Topics +/ydb/services/datastreams @ydb-platform/Topics +/ydb/services/deprecated/persqueue_v0 @ydb-platform/Topics +/ydb/services/persqueue_v1 @ydb-platform/Topics + +#Группа разработки строковых таблиц @azevaykin TEAM:? +/ydb/core/tablet_flat @azevaykin +/ydb/core/tx/datashard @azevaykin +/ydb/core/mon_alloc @azevaykin + +# Сoordinator +/ydb/core/tx/coordinator @snaury + +#Группа распределенной системной инфраструктуры @ijon TEAM:? +/ydb/core/mind/ @ijon +/ydb/core/blob_depot @ijon +/ydb/core/tx/schemeshard @ijon +/ydb/core/tx/columnshard @ivanmorozov333 +/ydb/services/ydb/ @ijon +/ydb/core/util @ijon +/ydb/core/persqueue @ijon +/ydb/core/client @ijon +/ydb/core/engine @ijon + +#YDB Query Execution Team @gridnevvvit TEAM:@ydb-platform/qp +/ydb/core/kqp @ydb-platform/qp + +#YDB Application Team @asmyasnikov TEAM:? +/ydb/public/sdk @asmyasnikov +/ydb/tests/functional/ydb_cli @asmyasnikov +/ydb/public/lib/ydb_cli @asmyasnikov + +#YDB Query Optimizer Team @pavelvelikhov TEAM:??? +/ydb/tests/functional/canonical @pavelvelikhov + + +#YDB Engineering Team @maximyurchuk + # - пока не понятно + +#Группа разработки распределенного хранилища @the-ancient-1 TEAM:??? +/ydb/core/blobstorage @the-ancient-1 +/ydb/core/mind/bscontroller @the-ancient-1 + +#Группа функциональности ядра @CyberROFL TEAM:@ydb-platform/core + +/ydb/core/config/ut @ydb-platform/core +/ydb/core/scheme @ydb-platform/core +/ydb/core/cms @ydb-platform/core +/ydb/tests/functional/cms @ydb-platform/core + + +#Группа разработки систем поставки данных / LogBroker @alexnick88 TEAM:??? +/ydb/tests/functional/sqs + +#YDB Analytics TEAM:??? + +#Federative query TEAM:@ydb-platform/fq +/ydb/tests/fq @ydb-platform/fq +/ydb/core/fq/ @ydb-platform/fq +/ydb/services/fq/ @ydb-platform/fq +/ydb/library/yql/providers/common/http_gateway @ydb-platform/fq +/ydb/library/yql/providers/common/db_id_async_resolver @ydb-platform/fq +/ydb/library/yql/providers/generic @ydb-platform/fq +/ydb/library/yql/providers/pq @ydb-platform/fq +/ydb/library/yql/providers/s3 @ydb-platform/fq +/ydb/library/yql/providers/solomon @ydb-platform/fq +/ydb/core/public_http/ @ydb-platform/fq + +#@ydb-platform/ui-backend +/ydb/core/viewer @ydb-platform/ui-backend +/ydb/core/protos/node_whiteboard.proto @ydb-platform/ui-backend
\ No newline at end of file diff --git a/.github/scripts/analytics/decode_imported_csv.py b/.github/scripts/analytics/decode_imported_csv.py new file mode 100755 index 00000000000..544aa1a86aa --- /dev/null +++ b/.github/scripts/analytics/decode_imported_csv.py @@ -0,0 +1,35 @@ +# NOT USED ANYWHERE, YOU CAN DELETE THIS IF YOU KNOW WHAT ARE YOU DOING +import os +import csv +import urllib.parse + +# Функция для декодирования percent-encoded строки +def decode_percent_encoded_string(encoded_string): + return urllib.parse.unquote(encoded_string) + +# Функция для декодирования CSV файла +def decode_csv(file_path): + with open(file_path, mode='r', encoding='utf-8') as infile, open(file_path + '.decoded', mode='w', encoding='utf-8', newline='') as outfile: + reader = csv.reader(infile) + writer = csv.writer(outfile,escapechar='\\', quotechar='"', quoting=csv.QUOTE_ALL, doublequote=True) + + for row in reader: + decoded_row = [decode_percent_encoded_string(cell) for cell in row] + writer.writerow(decoded_row) + +# Функция для обработки всех CSV файлов в директории +def decode_all_csv_files_in_directory(directory_path): + for filename in os.listdir(directory_path): + if filename.endswith('.csv'): + file_path = os.path.join(directory_path, filename) + print(f"Processing file: {file_path}") + decode_csv(file_path) + os.replace(file_path + '.decoded', file_path) + +def main(): + directory_path = 'place_your_path_here' + decode_all_csv_files_in_directory(directory_path) + + +if __name__ == "__main__": + main() diff --git a/.github/scripts/analytics/flaky_tests_history.py b/.github/scripts/analytics/flaky_tests_history.py index 35764a34607..60ed1afefc8 100755 --- a/.github/scripts/analytics/flaky_tests_history.py +++ b/.github/scripts/analytics/flaky_tests_history.py @@ -151,7 +151,7 @@ def main(): DISTINCT suite_folder || '/' || test_name as full_name, suite_folder, test_name - from `test_results/test_runs_results` + from `test_results/test_runs_column` where status in ('failure','mute') and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo') @@ -161,7 +161,7 @@ def main(): cross join ( select DISTINCT DateTime::MakeDate(run_timestamp) as date_base - from `test_results/test_runs_results` + from `test_results/test_runs_column` where status in ('failure','mute') and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo') @@ -176,7 +176,7 @@ def main(): run_timestamp, status --ROW_NUMBER() OVER (PARTITION BY test_name ORDER BY run_timestamp DESC) AS rn - from `test_results/test_runs_results` + from `test_results/test_runs_column` where run_timestamp >= Date('{last_date}') -{history_for_n_day}*Interval("P1D") and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo') diff --git a/.github/scripts/analytics/import_to_column.sh b/.github/scripts/analytics/import_to_column.sh new file mode 100755 index 00000000000..a2e7e093a1a --- /dev/null +++ b/.github/scripts/analytics/import_to_column.sh @@ -0,0 +1,32 @@ + +#!/bin/bash +# NOT USED ANYWHERE, YOU CAN DELETE THIS IF YOU KNOW WHAT ARE YOU DOING +# Параметры подключения к YDB +ENDPOINT="grpcs://lb.etnvsjbk7kh1jc6bbfi8.ydb.mdb.yandexcloud.net:2135" +DATABASE="/ru-central1/b1ggceeul2pkher8vhb6/etnvsjbk7kh1jc6bbfi8" +SA_KEY_FILE="/home/kirrysin/fork_2/.github/scripts/my-robot-key.json" +TABLE="test_results/test_runs_column" +DIRECTORY="/home/kirrysin/fork_2/~tmp_backup/test_runs_results" + +# Обрабатываем каждый .csv файл в указанной директории +for FILE in "$DIRECTORY"/*.csv; do + if [[ -f "$FILE" ]]; then + echo "Импортируем файл: $FILE" + + ydb -e "$ENDPOINT" \ + -d "$DATABASE" \ + --sa-key-file "$SA_KEY_FILE" \ + import file csv \ + -p "$TABLE" \ + "$FILE" + + if [[ $? -eq 0 ]]; then + echo "Импорт файла $FILE успешно завершен." + else + echo "Ошибка при импорте файла $FILE." >&2 + exit 1 + fi + fi +done + +echo "Импорт всех файлов завершен." diff --git a/.github/scripts/analytics/migration_row_to_columt_tests.py b/.github/scripts/analytics/migration_row_to_columt_tests.py new file mode 100755 index 00000000000..0aeaf7aa63a --- /dev/null +++ b/.github/scripts/analytics/migration_row_to_columt_tests.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +# NOT USED ANYWHERE, YOU CAN DELETE THIS IF YOU KNOW WHAT ARE YOU DOING +import argparse +import configparser +import datetime +import os +import posixpath +import traceback +import time +import ydb +from collections import Counter + +dir = os.path.dirname(__file__) +config = configparser.ConfigParser() +config_file_path = f"{dir}/../../config/ydb_qa_db.ini" +config.read(config_file_path) + +build_preset = os.environ.get("build_preset") +branch = os.environ.get("branch_to_compare") + +DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"] +DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"] + + +def create_tables(pool, table_path): + print(f"> create table: {table_path}") + + def callee(session): + session.execute_scheme(f""" + CREATE table IF NOT EXISTS`{table_path}` ( + branch Utf8 NOT NULL, + build_type Utf8 NOT NULL, + commit Utf8 NOT NULL, + duration Double, + job_id Uint64, + job_name Utf8, + log Utf8, + logsdir Utf8, + owners Utf8, + pull Utf8, + run_timestamp Timestamp NOT NULL, + status_description Utf8, + status Utf8 NOT NULL, + stderr Utf8, + stdout Utf8, + suite_folder Utf8 NOT NULL, + test_id Utf8 NOT NULL, + test_name Utf8 NOT NULL, + PRIMARY KEY (`test_name`, `suite_folder`,build_type, status, run_timestamp) + ) + PARTITION BY HASH(`test_name`, `suite_folder`, branch, build_type ) + WITH (STORE = COLUMN) + """) + + return pool.retry_operation_sync(callee) + + +def bulk_upsert(table_client, table_path, rows): + print(f"> bulk upsert: {table_path}") + column_types = ( + ydb.BulkUpsertColumns() + .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("commit", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("duration", ydb.OptionalType(ydb.PrimitiveType.Double)) + .add_column("job_id", ydb.OptionalType(ydb.PrimitiveType.Uint64)) + .add_column("job_name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("log", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("logsdir", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("owners", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("pull", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("run_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp)) + .add_column("status", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("status_description", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("stderr", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("stdout", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("test_id", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + + ) + table_client.bulk_upsert(table_path, rows, column_types) + + +def main(): + + + + if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ: + print( + "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping" + ) + return 1 + else: + # Do not set up 'real' variable from gh workflows because it interfere with ydb tests + # So, set up it locally + os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[ + "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" + ] + with ydb.Driver( + endpoint=DATABASE_ENDPOINT, + database=DATABASE_PATH, + credentials=ydb.credentials_from_env_variables(), + ) as driver: + driver.wait(timeout=10, fail_fast=True) + session = ydb.retry_operation_sync( + lambda: driver.table_client.session().create() + ) + + # settings, paths, consts + tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True) + table_client = ydb.TableClient(driver, tc_settings) + + table_path = 'test_results/test_runs_column' + + with ydb.SessionPool(driver) as pool: + create_tables(pool, table_path) + + + # geting last timestamp from runs column + default_start_date = datetime.datetime(2024, 7, 1) + last_date_query = f"select max(run_timestamp) as last_run_timestamp from `{table_path}`" + query = ydb.ScanQuery(last_date_query, {}) + it = table_client.scan_query(query) + results = [] + start_time = time.time() + while True: + try: + result = next(it) + results = results + result.result_set.rows + except StopIteration: + break + + end_time = time.time() + print(f"transaction 'geting last timestamp from runs column' duration: {end_time - start_time}") + + if results[0] and results[0].get( 'max_date_window', default_start_date) is not None: + last_date = results[0].get( + 'max_date_window', default_start_date).strftime("%Y-%m-%dT%H:%M:%SZ") + + else: + last_date = ddefault_start_date.strftime("%Y-%m-%dT%H:%M:%SZ") + print(f'last run_datetime in table : {last_date}') + # geting timestamp list from runs + last_date_query = f"""select distinct run_timestamp from `test_results/test_runs_results` + where run_timestamp >=Timestamp('{last_date}')""" + query = ydb.ScanQuery(last_date_query, {}) + it = table_client.scan_query(query) + timestamps = [] + start_time = time.time() + while True: + try: + result = next(it) + timestamps = timestamps + result.result_set.rows + except StopIteration: + break + end_time = time.time() + print(f"transaction 'geting timestamp list from runs' duration: {end_time - start_time}") + print(f'count of timestamps : {len(timestamps)}') + for ts in timestamps: + # getting history for dates >= last_date + query_get_runs = f""" + select * from `test_results/test_runs_results` + where run_timestamp = cast({ts['run_timestamp']} as Timestamp) + """ + query = ydb.ScanQuery(query_get_runs, {}) + # start transaction time + start_time = time.time() + it = driver.table_client.scan_query(query) + # end transaction time + + results = [] + prepared_for_update_rows = [] + while True: + try: + result = next(it) + results = results + result.result_set.rows + except StopIteration: + break + end_time = time.time() + print(f'transaction duration: {end_time - start_time}') + + print(f'runs data captured, {len(results)} rows') + for row in results: + prepared_for_update_rows.append({ + 'branch': row['branch'], + 'build_type': row['build_type'], + 'commit': row['commit'], + 'duration': row['duration'], + 'job_id': row['job_id'], + 'job_name': row['job_name'], + 'log': row['log'], + 'logsdir': row['logsdir'], + 'owners': row['owners'], + 'pull': row['pull'], + 'run_timestamp': row['run_timestamp'], + 'status_description': row['status_description'], + 'status': row['status'], + 'stderr': row['stderr'], + 'stdout': row['stdout'], + 'suite_folder': row['suite_folder'], + 'test_id': row['test_id'], + 'test_name': row['test_name'], + }) + print('upserting runs') + with ydb.SessionPool(driver) as pool: + + full_path = posixpath.join(DATABASE_PATH, table_path) + bulk_upsert(driver.table_client, full_path, + prepared_for_update_rows) + + print('history updated') + + +if __name__ == "__main__": + main() diff --git a/.github/scripts/tests/get_test_history.py b/.github/scripts/tests/get_test_history.py index f76da957422..ad2fe187b00 100644 --- a/.github/scripts/tests/get_test_history.py +++ b/.github/scripts/tests/get_test_history.py @@ -3,6 +3,7 @@ import configparser import datetime import os +import time import ydb @@ -14,6 +15,7 @@ config.read(config_file_path) DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"] DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"] + def get_test_history(test_names_array, last_n_runs_of_test_amount, build_type): if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ: print( @@ -27,18 +29,36 @@ def get_test_history(test_names_array, last_n_runs_of_test_amount, build_type): "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" ] - query = f""" + results = {} + with ydb.Driver( + endpoint=DATABASE_ENDPOINT, + database=DATABASE_PATH, + credentials=ydb.credentials_from_env_variables(), + ) as driver: + driver.wait(timeout=10, fail_fast=True) + session = ydb.retry_operation_sync( + lambda: driver.table_client.session().create() + ) + batch_size = 500 + start_time = time.time() + for start in range(0, len(test_names_array), batch_size): + test_names_batch = test_names_array[start:start + batch_size] + history_query = f""" PRAGMA AnsiInForEmptyOrNullableItemsCollections; DECLARE $test_names AS List<Utf8>; DECLARE $rn_max AS Int32; DECLARE $build_type AS Utf8; + + $test_names=[{','.join("'{0}'".format(x) for x in test_names_batch)}]; + $rn_max = {last_n_runs_of_test_amount}; + $build_type = '{build_type}'; $tests=( SELECT suite_folder ||'/' || test_name as full_name,test_name,build_type, commit, branch, run_timestamp, status, status_description, ROW_NUMBER() OVER (PARTITION BY test_name ORDER BY run_timestamp DESC) AS rn FROM - `test_results/test_runs_results` + `test_results/test_runs_column` where (job_name ='Nightly-run' or job_name like 'Postcommit%') and build_type = $build_type and suite_folder ||'/' || test_name in $test_names @@ -50,46 +70,31 @@ def get_test_history(test_names_array, last_n_runs_of_test_amount, build_type): WHERE rn <= $rn_max ORDER BY test_name, run_timestamp; """ - - with ydb.Driver( - endpoint=DATABASE_ENDPOINT, - database=DATABASE_PATH, - credentials=ydb.credentials_from_env_variables(), - ) as driver: - driver.wait(timeout=10, fail_fast=True) - session = ydb.retry_operation_sync( - lambda: driver.table_client.session().create() - ) - - with session.transaction() as transaction: - prepared_query = session.prepare(query) - - results = {} - batch_size = 100 - for start in range(0, len(test_names_array), batch_size): - test_names_batch = test_names_array[start:start + batch_size] - - query_params = { - "$test_names": test_names_batch, - "$rn_max": last_n_runs_of_test_amount, - "$build_type": build_type, + query = ydb.ScanQuery(history_query, {}) + it = driver.table_client.scan_query(query) + query_result = [] + + while True: + try: + result = next(it) + query_result = query_result + result.result_set.rows + except StopIteration: + break + + for row in query_result: + if not row["full_name"].decode("utf-8") in results: + results[row["full_name"].decode("utf-8")] = {} + + results[row["full_name"].decode("utf-8")][row["run_timestamp"]] = { + "status": row["status"], + "commit": row["commit"], + "datetime": datetime.datetime.fromtimestamp(int(row["run_timestamp"] / 1000000)).strftime("%H:%m %B %d %Y"), + "status_description": row["status_description"], } - - result_set = session.transaction(ydb.SerializableReadWrite()).execute( - prepared_query, parameters=query_params, commit_tx=True - ) - - for row in result_set[0].rows: - if not row["full_name"].decode("utf-8") in results: - results[row["full_name"].decode("utf-8")] = {} - - results[row["full_name"].decode("utf-8")][row["run_timestamp"]] = { - "status": row["status"], - "commit": row["commit"], - "datetime": datetime.datetime.fromtimestamp(int(row["run_timestamp"] / 1000000)).strftime("%H:%m %B %d %Y"), - "status_description": row["status_description"], - } - return results + end_time = time.time() + print( + f'durations of getting history for {len(test_names_array)} tests :{end_time-start_time} sec') + return results if __name__ == "__main__": diff --git a/.github/scripts/upload_tests_results.py b/.github/scripts/upload_tests_results.py index 3e913e800db..399a3f983ca 100755 --- a/.github/scripts/upload_tests_results.py +++ b/.github/scripts/upload_tests_results.py @@ -2,16 +2,81 @@ import argparse import configparser +import datetime import fnmatch import os +import posixpath +import shlex import subprocess import sys import time -import ydb import xml.etree.ElementTree as ET +import ydb + from codeowners import CodeOwners from concurrent.futures import ThreadPoolExecutor, as_completed +from decimal import Decimal + + +def create_tables(pool, table_path): + print(f"> create table: {table_path}") + + def callee(session): + session.execute_scheme(f""" + CREATE table IF NOT EXISTS`{table_path}` ( + build_type Utf8 NOT NULL, + job_name Utf8, + job_id Uint64, + commit Utf8, + branch Utf8 NOT NULL, + pull Utf8, + run_timestamp Timestamp NOT NULL, + test_id Utf8 NOT NULL, + suite_folder Utf8 NOT NULL, + test_name Utf8 NOT NULL, + duration Double, + status Utf8 NOT NULL, + status_description Utf8, + owners Utf8, + log Utf8, + logsdir Utf8, + stderr Utf8, + stdout Utf8, + PRIMARY KEY (`test_name`, `suite_folder`,build_type, branch, status, run_timestamp) + ) + PARTITION BY HASH(`suite_folder`,test_name, build_type, branch ) + WITH (STORE = COLUMN) + """) + + return pool.retry_operation_sync(callee) + + +def bulk_upsert(table_client, table_path, rows): + print(f"> bulk upsert: {table_path}") + column_types = ( + ydb.BulkUpsertColumns() + .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("commit", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("duration", ydb.OptionalType(ydb.PrimitiveType.Double)) + .add_column("job_id", ydb.OptionalType(ydb.PrimitiveType.Uint64)) + .add_column("job_name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("log", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("logsdir", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("owners", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("pull", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("run_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp)) + .add_column("status", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("status_description", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("stderr", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("stdout", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("test_id", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + + ) + table_client.bulk_upsert(table_path, rows, column_types) def parse_junit_xml(test_results_file, build_type, job_name, job_id, commit, branch, pull, run_timestamp): @@ -23,7 +88,7 @@ def parse_junit_xml(test_results_file, build_type, job_name, job_id, commit, bra suite_folder = testsuite.get("name") for testcase in testsuite.findall("testcase"): - name = testcase.get("name") + name = testcase.get("name") duration = testcase.get("time") status_description = "" @@ -49,12 +114,12 @@ def parse_junit_xml(test_results_file, build_type, job_name, job_id, commit, bra "commit": commit, "branch": branch, "pull": pull, - "run_timestamp": run_timestamp, + "run_timestamp": int(run_timestamp)*1000000, "job_name": job_name, - "job_id": job_id, + "job_id": int(job_id), "suite_folder": suite_folder, "test_name": name, - "duration": float(duration), + "duration": Decimal(duration), "status": status, "status_description": status_description.replace("\r\n", ";;").replace("\n", ";;").replace("\"", "'"), "log": "" if testcase.find("properties/property/[@name='url:log']") is None else testcase.find("properties/property/[@name='url:log']").get('value'), @@ -75,151 +140,32 @@ def get_codeowners_for_tests(codeowners_file_path, tests_data): tests_data_with_owners = [] for test in tests_data: target_path = f'{test["suite_folder"]}' - owners = owners_odj.of(target_path) - test["owners"] = joined_owners=";;".join([(":".join(x)) for x in owners]) + owners = owners_odj.of(target_path) + test["owners"] = joined_owners = ";;".join( + [(":".join(x)) for x in owners]) tests_data_with_owners.append(test) return tests_data_with_owners -def create_table(session, sql): - - try: - session.execute_scheme(sql) - except ydb.issues.AlreadyExists: - pass - except Exception as e: - print(f"Error creating table: {e}, sql:\n{sql}", file=sys.stderr) - raise e - - -def create_tests_table(session, table_path): - # Creating of new table if not exist yet - sql = f""" - --!syntax_v1 - CREATE TABLE IF NOT EXISTS `{table_path}` ( - build_type Utf8, - job_name Utf8, - job_id Uint64, - commit Utf8, - branch Utf8, - pull Utf8, - run_timestamp Timestamp, - test_id Utf8, - suite_folder Utf8, - test_name Utf8, - duration Double, - status Utf8, - status_description Utf8, - owners Utf8, - log Utf8, - logsdir Utf8, - stderr Utf8, - stdout Utf8, - PRIMARY KEY(test_id) - ); - """ - create_table(session, sql) - - -def upload_results(pool, sql): - with pool.checkout() as session: - try: - session.transaction().execute(sql, commit_tx=True) - except Exception as e: - print(f"Error executing query {e}, sql:\n{sql}", file=sys.stderr) - raise e - - -def prepare_and_upload_tests(pool, path, results, batch_size): - - total_records = len(results) - if total_records == 0: - print("Error:Object stored test results is empty, check test results artifacts") - return 1 - - with ThreadPoolExecutor(max_workers=10) as executor: - - for start in range(0, total_records, batch_size): - futures = [] - end = min(start + batch_size, total_records) - batch = results[start:end] - - sql = f"""--!syntax_v1 - UPSERT INTO `{path}` - (build_type, - job_name, - job_id, - commit, - branch, - pull, - run_timestamp, - test_id, - suite_folder, - test_name, - duration, - status, - status_description, - log, - logsdir, - stderr, - stdout, - owners) VALUES""" - values = [] - for index, result in enumerate(batch): - - values.append( - f""" - ("{result['build_type']}", - "{result['job_name']}", - {result['job_id']}, - "{result['commit']}", - "{result['branch']}", - "{result['pull']}", - DateTime::FromSeconds({result['run_timestamp']}), - "{result['pull']}_{result['run_timestamp']}_{start+index}", - "{result['suite_folder']}", - "{result['test_name']}", - {result['duration']}, - "{result['status']}", - "{result['status_description']}", - "{result['log']}", - "{result['logsdir']}", - "{result['stderr']}", - "{result['stdout']}", - "{result['owners']}") - """ - ) - sql += ", ".join(values) + ";" - - futures.append(executor.submit(upload_results, pool, sql)) - - for future in as_completed(futures): - future.result() # Raise exception if occurred - - -def create_pool(endpoint, database): - driver_config = ydb.DriverConfig( - endpoint, - database, - credentials=ydb.credentials_from_env_variables() - ) - - driver = ydb.Driver(driver_config) - driver.wait(fail_fast=True, timeout=5) - return ydb.SessionPool(driver) - - def main(): parser = argparse.ArgumentParser() - parser.add_argument('--test-results-file', action='store', dest="test_results_file", required=True, help='XML with results of tests') - parser.add_argument('--build-type', action='store', dest="build_type", required=True, help='build type') - parser.add_argument('--commit', default='store', dest="commit", required=True, help='commit sha') - parser.add_argument('--branch', default='store', dest="branch", required=True, help='branch name ') - parser.add_argument('--pull', action='store', dest="pull",required=True, help='pull number') - parser.add_argument('--run-timestamp', action='store', dest="run_timestamp",required=True, help='time of test run start') - parser.add_argument('--job-name', action='store', dest="job_name",required=True, help='job name where launched') - parser.add_argument('--job-id', action='store', dest="job_id",required=True, help='job id of workflow') + parser.add_argument('--test-results-file', action='store', + required=True, help='XML with results of tests') + parser.add_argument('--build-type', action='store', + required=True, help='build type') + parser.add_argument('--commit', default='store', + dest="commit", required=True, help='commit sha') + parser.add_argument('--branch', default='store', + dest="branch", required=True, help='branch name ') + parser.add_argument('--pull', action='store', dest="pull", + required=True, help='pull number') + parser.add_argument('--run-timestamp', action='store', + dest="run_timestamp", required=True, help='time of test run start') + parser.add_argument('--job-name', action='store', dest="job_name", + required=True, help='job name where launched') + parser.add_argument('--job-id', action='store', dest="job_id", + required=True, help='job id of workflow') args = parser.parse_args() @@ -233,10 +179,9 @@ def main(): job_id = args.job_id path_in_database = "test_results" - batch_size_default = 30 dir = os.path.dirname(__file__) git_root = f"{dir}/../.." - codeowners = f"{git_root}/.github/CODEOWNERS" + codeowners = f"{git_root}/.github/TESTOWNERS" config = configparser.ConfigParser() config_file_path = f"{git_root}/.github/config/ydb_qa_db.ini" config.read(config_file_path) @@ -248,28 +193,64 @@ def main(): print( "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping" ) - return 0 + return 1 else: # Do not set up 'real' variable from gh workflows because it interfere with ydb tests # So, set up it locally os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[ "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" ] + test_table_name = f"{path_in_database}/test_runs_column" + full_path = posixpath.join(DATABASE_PATH, test_table_name) + + with ydb.Driver( + endpoint=DATABASE_ENDPOINT, + database=DATABASE_PATH, + credentials=ydb.credentials_from_env_variables(), + ) as driver: + driver.wait(timeout=10, fail_fast=True) + session = ydb.retry_operation_sync( + lambda: driver.table_client.session().create() + ) - test_table_name = f"{path_in_database}/test_runs_results" - # Prepare connection and table - pool = create_pool(DATABASE_ENDPOINT, DATABASE_PATH) - with pool.checkout() as session: - create_tests_table(session, test_table_name) - - # Parse and upload - results = parse_junit_xml( - test_results_file, build_type, job_name, job_id, commit, branch, pull, run_timestamp - ) - result_with_owners = get_codeowners_for_tests(codeowners, results) - prepare_and_upload_tests( - pool, test_table_name, results, batch_size=batch_size_default - ) + # Parse and upload + results = parse_junit_xml( + test_results_file, build_type, job_name, job_id, commit, branch, pull, run_timestamp + ) + result_with_owners = get_codeowners_for_tests(codeowners, results) + prepared_for_update_rows = [] + for index, row in enumerate(result_with_owners): + prepared_for_update_rows.append({ + 'branch': row['branch'], + 'build_type': row['build_type'], + 'commit': row['commit'], + 'duration': row['duration'], + 'job_id': row['job_id'], + 'job_name': row['job_name'], + 'log': row['log'], + 'logsdir': row['logsdir'], + 'owners': row['owners'], + 'pull': row['pull'], + 'run_timestamp': row['run_timestamp'], + 'status_description': row['status_description'], + 'status': row['status'], + 'stderr': row['stderr'], + 'stdout': row['stdout'], + 'suite_folder': row['suite_folder'], + 'test_id': f"{row['pull']}_{row['run_timestamp']}_{index}", + 'test_name': row['test_name'], + }) + print(f'upserting runs: {len(prepared_for_update_rows)} rows') + if prepared_for_update_rows: + with ydb.SessionPool(driver) as pool: + create_tables(pool, test_table_name) + bulk_upsert(driver.table_client, full_path, + prepared_for_update_rows) + print('tests updated') + else: + print('nothing to upsert') + + if __name__ == "__main__": |