aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKirill Rysin <35688753+naspirato@users.noreply.github.com>2024-08-14 17:03:27 +0200
committerGitHub <noreply@github.com>2024-08-14 15:03:27 +0000
commit79898e65a1845ff700b9920e7aec0680bd174b4a (patch)
treee959e6597885103f36a6c71d2016aefb3b33dfef
parentd21cb124754a91822133906fbebbf730c5942144 (diff)
downloadydb-79898e65a1845ff700b9920e7aec0680bd174b4a.tar.gz
TESTOWNERS and migration to column shard (#7733)
-rw-r--r--.github/TESTOWNERS82
-rwxr-xr-x.github/scripts/analytics/decode_imported_csv.py35
-rwxr-xr-x.github/scripts/analytics/flaky_tests_history.py6
-rwxr-xr-x.github/scripts/analytics/import_to_column.sh32
-rwxr-xr-x.github/scripts/analytics/migration_row_to_columt_tests.py216
-rw-r--r--.github/scripts/tests/get_test_history.py87
-rwxr-xr-x.github/scripts/upload_tests_results.py301
7 files changed, 555 insertions, 204 deletions
diff --git a/.github/TESTOWNERS b/.github/TESTOWNERS
new file mode 100644
index 00000000000..3d1fbc40aba
--- /dev/null
+++ b/.github/TESTOWNERS
@@ -0,0 +1,82 @@
+# TEAM:@ydb-platform/docs
+/*.md @ydb-platform/docs
+/ydb/docs/ @ydb-platform/docs
+
+# YQL @ydb-platform/yql
+/ydb/library/yql/ @ydb-platform/yql
+/ydb/library/yql/dq @ydb-platform/yql @ydb-platform/qp
+/ydb/library/yql/yt @Krock21 @Krisha11 @zlobober @gritukan
+
+#TEAM:@ydb-platform/Topics
+/ydb/core/kafka_proxy @ydb-platform/Topics
+/ydb/core/persqueue @ydb-platform/Topics
+/ydb/services/datastreams @ydb-platform/Topics
+/ydb/services/deprecated/persqueue_v0 @ydb-platform/Topics
+/ydb/services/persqueue_v1 @ydb-platform/Topics
+
+#Группа разработки строковых таблиц @azevaykin TEAM:?
+/ydb/core/tablet_flat @azevaykin
+/ydb/core/tx/datashard @azevaykin
+/ydb/core/mon_alloc @azevaykin
+
+# Сoordinator
+/ydb/core/tx/coordinator @snaury
+
+#Группа распределенной системной инфраструктуры @ijon TEAM:?
+/ydb/core/mind/ @ijon
+/ydb/core/blob_depot @ijon
+/ydb/core/tx/schemeshard @ijon
+/ydb/core/tx/columnshard @ivanmorozov333
+/ydb/services/ydb/ @ijon
+/ydb/core/util @ijon
+/ydb/core/persqueue @ijon
+/ydb/core/client @ijon
+/ydb/core/engine @ijon
+
+#YDB Query Execution Team @gridnevvvit TEAM:@ydb-platform/qp
+/ydb/core/kqp @ydb-platform/qp
+
+#YDB Application Team @asmyasnikov TEAM:?
+/ydb/public/sdk @asmyasnikov
+/ydb/tests/functional/ydb_cli @asmyasnikov
+/ydb/public/lib/ydb_cli @asmyasnikov
+
+#YDB Query Optimizer Team @pavelvelikhov TEAM:???
+/ydb/tests/functional/canonical @pavelvelikhov
+
+
+#YDB Engineering Team @maximyurchuk
+ # - пока не понятно
+
+#Группа разработки распределенного хранилища @the-ancient-1 TEAM:???
+/ydb/core/blobstorage @the-ancient-1
+/ydb/core/mind/bscontroller @the-ancient-1
+
+#Группа функциональности ядра @CyberROFL TEAM:@ydb-platform/core
+
+/ydb/core/config/ut @ydb-platform/core
+/ydb/core/scheme @ydb-platform/core
+/ydb/core/cms @ydb-platform/core
+/ydb/tests/functional/cms @ydb-platform/core
+
+
+#Группа разработки систем поставки данных / LogBroker @alexnick88 TEAM:???
+/ydb/tests/functional/sqs
+
+#YDB Analytics TEAM:???
+
+#Federative query TEAM:@ydb-platform/fq
+/ydb/tests/fq @ydb-platform/fq
+/ydb/core/fq/ @ydb-platform/fq
+/ydb/services/fq/ @ydb-platform/fq
+/ydb/library/yql/providers/common/http_gateway @ydb-platform/fq
+/ydb/library/yql/providers/common/db_id_async_resolver @ydb-platform/fq
+/ydb/library/yql/providers/generic @ydb-platform/fq
+/ydb/library/yql/providers/pq @ydb-platform/fq
+/ydb/library/yql/providers/s3 @ydb-platform/fq
+/ydb/library/yql/providers/solomon @ydb-platform/fq
+/ydb/core/public_http/ @ydb-platform/fq
+
+#@ydb-platform/ui-backend
+/ydb/core/viewer @ydb-platform/ui-backend
+/ydb/core/protos/node_whiteboard.proto @ydb-platform/ui-backend \ No newline at end of file
diff --git a/.github/scripts/analytics/decode_imported_csv.py b/.github/scripts/analytics/decode_imported_csv.py
new file mode 100755
index 00000000000..544aa1a86aa
--- /dev/null
+++ b/.github/scripts/analytics/decode_imported_csv.py
@@ -0,0 +1,35 @@
+# NOT USED ANYWHERE, YOU CAN DELETE THIS IF YOU KNOW WHAT ARE YOU DOING
+import os
+import csv
+import urllib.parse
+
+# Функция для декодирования percent-encoded строки
+def decode_percent_encoded_string(encoded_string):
+ return urllib.parse.unquote(encoded_string)
+
+# Функция для декодирования CSV файла
+def decode_csv(file_path):
+ with open(file_path, mode='r', encoding='utf-8') as infile, open(file_path + '.decoded', mode='w', encoding='utf-8', newline='') as outfile:
+ reader = csv.reader(infile)
+ writer = csv.writer(outfile,escapechar='\\', quotechar='"', quoting=csv.QUOTE_ALL, doublequote=True)
+
+ for row in reader:
+ decoded_row = [decode_percent_encoded_string(cell) for cell in row]
+ writer.writerow(decoded_row)
+
+# Функция для обработки всех CSV файлов в директории
+def decode_all_csv_files_in_directory(directory_path):
+ for filename in os.listdir(directory_path):
+ if filename.endswith('.csv'):
+ file_path = os.path.join(directory_path, filename)
+ print(f"Processing file: {file_path}")
+ decode_csv(file_path)
+ os.replace(file_path + '.decoded', file_path)
+
+def main():
+ directory_path = 'place_your_path_here'
+ decode_all_csv_files_in_directory(directory_path)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/.github/scripts/analytics/flaky_tests_history.py b/.github/scripts/analytics/flaky_tests_history.py
index 35764a34607..60ed1afefc8 100755
--- a/.github/scripts/analytics/flaky_tests_history.py
+++ b/.github/scripts/analytics/flaky_tests_history.py
@@ -151,7 +151,7 @@ def main():
DISTINCT suite_folder || '/' || test_name as full_name,
suite_folder,
test_name
- from `test_results/test_runs_results`
+ from `test_results/test_runs_column`
where
status in ('failure','mute')
and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo')
@@ -161,7 +161,7 @@ def main():
cross join (
select
DISTINCT DateTime::MakeDate(run_timestamp) as date_base
- from `test_results/test_runs_results`
+ from `test_results/test_runs_column`
where
status in ('failure','mute')
and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo')
@@ -176,7 +176,7 @@ def main():
run_timestamp,
status
--ROW_NUMBER() OVER (PARTITION BY test_name ORDER BY run_timestamp DESC) AS rn
- from `test_results/test_runs_results`
+ from `test_results/test_runs_column`
where
run_timestamp >= Date('{last_date}') -{history_for_n_day}*Interval("P1D") and
job_name in ('Nightly-run', 'Postcommit_relwithdebinfo')
diff --git a/.github/scripts/analytics/import_to_column.sh b/.github/scripts/analytics/import_to_column.sh
new file mode 100755
index 00000000000..a2e7e093a1a
--- /dev/null
+++ b/.github/scripts/analytics/import_to_column.sh
@@ -0,0 +1,32 @@
+
+#!/bin/bash
+# NOT USED ANYWHERE, YOU CAN DELETE THIS IF YOU KNOW WHAT ARE YOU DOING
+# Параметры подключения к YDB
+ENDPOINT="grpcs://lb.etnvsjbk7kh1jc6bbfi8.ydb.mdb.yandexcloud.net:2135"
+DATABASE="/ru-central1/b1ggceeul2pkher8vhb6/etnvsjbk7kh1jc6bbfi8"
+SA_KEY_FILE="/home/kirrysin/fork_2/.github/scripts/my-robot-key.json"
+TABLE="test_results/test_runs_column"
+DIRECTORY="/home/kirrysin/fork_2/~tmp_backup/test_runs_results"
+
+# Обрабатываем каждый .csv файл в указанной директории
+for FILE in "$DIRECTORY"/*.csv; do
+ if [[ -f "$FILE" ]]; then
+ echo "Импортируем файл: $FILE"
+
+ ydb -e "$ENDPOINT" \
+ -d "$DATABASE" \
+ --sa-key-file "$SA_KEY_FILE" \
+ import file csv \
+ -p "$TABLE" \
+ "$FILE"
+
+ if [[ $? -eq 0 ]]; then
+ echo "Импорт файла $FILE успешно завершен."
+ else
+ echo "Ошибка при импорте файла $FILE." >&2
+ exit 1
+ fi
+ fi
+done
+
+echo "Импорт всех файлов завершен."
diff --git a/.github/scripts/analytics/migration_row_to_columt_tests.py b/.github/scripts/analytics/migration_row_to_columt_tests.py
new file mode 100755
index 00000000000..0aeaf7aa63a
--- /dev/null
+++ b/.github/scripts/analytics/migration_row_to_columt_tests.py
@@ -0,0 +1,216 @@
+#!/usr/bin/env python3
+# NOT USED ANYWHERE, YOU CAN DELETE THIS IF YOU KNOW WHAT ARE YOU DOING
+import argparse
+import configparser
+import datetime
+import os
+import posixpath
+import traceback
+import time
+import ydb
+from collections import Counter
+
+dir = os.path.dirname(__file__)
+config = configparser.ConfigParser()
+config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
+config.read(config_file_path)
+
+build_preset = os.environ.get("build_preset")
+branch = os.environ.get("branch_to_compare")
+
+DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
+DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
+
+
+def create_tables(pool, table_path):
+ print(f"> create table: {table_path}")
+
+ def callee(session):
+ session.execute_scheme(f"""
+ CREATE table IF NOT EXISTS`{table_path}` (
+ branch Utf8 NOT NULL,
+ build_type Utf8 NOT NULL,
+ commit Utf8 NOT NULL,
+ duration Double,
+ job_id Uint64,
+ job_name Utf8,
+ log Utf8,
+ logsdir Utf8,
+ owners Utf8,
+ pull Utf8,
+ run_timestamp Timestamp NOT NULL,
+ status_description Utf8,
+ status Utf8 NOT NULL,
+ stderr Utf8,
+ stdout Utf8,
+ suite_folder Utf8 NOT NULL,
+ test_id Utf8 NOT NULL,
+ test_name Utf8 NOT NULL,
+ PRIMARY KEY (`test_name`, `suite_folder`,build_type, status, run_timestamp)
+ )
+ PARTITION BY HASH(`test_name`, `suite_folder`, branch, build_type )
+ WITH (STORE = COLUMN)
+ """)
+
+ return pool.retry_operation_sync(callee)
+
+
+def bulk_upsert(table_client, table_path, rows):
+ print(f"> bulk upsert: {table_path}")
+ column_types = (
+ ydb.BulkUpsertColumns()
+ .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("commit", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("duration", ydb.OptionalType(ydb.PrimitiveType.Double))
+ .add_column("job_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))
+ .add_column("job_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("log", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("logsdir", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("owners", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("pull", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("run_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
+ .add_column("status", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("status_description", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("stderr", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("stdout", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("test_id", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+
+ )
+ table_client.bulk_upsert(table_path, rows, column_types)
+
+
+def main():
+
+
+
+ if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
+ print(
+ "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping"
+ )
+ return 1
+ else:
+ # Do not set up 'real' variable from gh workflows because it interfere with ydb tests
+ # So, set up it locally
+ os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
+ "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
+ ]
+ with ydb.Driver(
+ endpoint=DATABASE_ENDPOINT,
+ database=DATABASE_PATH,
+ credentials=ydb.credentials_from_env_variables(),
+ ) as driver:
+ driver.wait(timeout=10, fail_fast=True)
+ session = ydb.retry_operation_sync(
+ lambda: driver.table_client.session().create()
+ )
+
+ # settings, paths, consts
+ tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True)
+ table_client = ydb.TableClient(driver, tc_settings)
+
+ table_path = 'test_results/test_runs_column'
+
+ with ydb.SessionPool(driver) as pool:
+ create_tables(pool, table_path)
+
+
+ # geting last timestamp from runs column
+ default_start_date = datetime.datetime(2024, 7, 1)
+ last_date_query = f"select max(run_timestamp) as last_run_timestamp from `{table_path}`"
+ query = ydb.ScanQuery(last_date_query, {})
+ it = table_client.scan_query(query)
+ results = []
+ start_time = time.time()
+ while True:
+ try:
+ result = next(it)
+ results = results + result.result_set.rows
+ except StopIteration:
+ break
+
+ end_time = time.time()
+ print(f"transaction 'geting last timestamp from runs column' duration: {end_time - start_time}")
+
+ if results[0] and results[0].get( 'max_date_window', default_start_date) is not None:
+ last_date = results[0].get(
+ 'max_date_window', default_start_date).strftime("%Y-%m-%dT%H:%M:%SZ")
+
+ else:
+ last_date = ddefault_start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
+ print(f'last run_datetime in table : {last_date}')
+ # geting timestamp list from runs
+ last_date_query = f"""select distinct run_timestamp from `test_results/test_runs_results`
+ where run_timestamp >=Timestamp('{last_date}')"""
+ query = ydb.ScanQuery(last_date_query, {})
+ it = table_client.scan_query(query)
+ timestamps = []
+ start_time = time.time()
+ while True:
+ try:
+ result = next(it)
+ timestamps = timestamps + result.result_set.rows
+ except StopIteration:
+ break
+ end_time = time.time()
+ print(f"transaction 'geting timestamp list from runs' duration: {end_time - start_time}")
+ print(f'count of timestamps : {len(timestamps)}')
+ for ts in timestamps:
+ # getting history for dates >= last_date
+ query_get_runs = f"""
+ select * from `test_results/test_runs_results`
+ where run_timestamp = cast({ts['run_timestamp']} as Timestamp)
+ """
+ query = ydb.ScanQuery(query_get_runs, {})
+ # start transaction time
+ start_time = time.time()
+ it = driver.table_client.scan_query(query)
+ # end transaction time
+
+ results = []
+ prepared_for_update_rows = []
+ while True:
+ try:
+ result = next(it)
+ results = results + result.result_set.rows
+ except StopIteration:
+ break
+ end_time = time.time()
+ print(f'transaction duration: {end_time - start_time}')
+
+ print(f'runs data captured, {len(results)} rows')
+ for row in results:
+ prepared_for_update_rows.append({
+ 'branch': row['branch'],
+ 'build_type': row['build_type'],
+ 'commit': row['commit'],
+ 'duration': row['duration'],
+ 'job_id': row['job_id'],
+ 'job_name': row['job_name'],
+ 'log': row['log'],
+ 'logsdir': row['logsdir'],
+ 'owners': row['owners'],
+ 'pull': row['pull'],
+ 'run_timestamp': row['run_timestamp'],
+ 'status_description': row['status_description'],
+ 'status': row['status'],
+ 'stderr': row['stderr'],
+ 'stdout': row['stdout'],
+ 'suite_folder': row['suite_folder'],
+ 'test_id': row['test_id'],
+ 'test_name': row['test_name'],
+ })
+ print('upserting runs')
+ with ydb.SessionPool(driver) as pool:
+
+ full_path = posixpath.join(DATABASE_PATH, table_path)
+ bulk_upsert(driver.table_client, full_path,
+ prepared_for_update_rows)
+
+ print('history updated')
+
+
+if __name__ == "__main__":
+ main()
diff --git a/.github/scripts/tests/get_test_history.py b/.github/scripts/tests/get_test_history.py
index f76da957422..ad2fe187b00 100644
--- a/.github/scripts/tests/get_test_history.py
+++ b/.github/scripts/tests/get_test_history.py
@@ -3,6 +3,7 @@
import configparser
import datetime
import os
+import time
import ydb
@@ -14,6 +15,7 @@ config.read(config_file_path)
DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
+
def get_test_history(test_names_array, last_n_runs_of_test_amount, build_type):
if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
print(
@@ -27,18 +29,36 @@ def get_test_history(test_names_array, last_n_runs_of_test_amount, build_type):
"CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
]
- query = f"""
+ results = {}
+ with ydb.Driver(
+ endpoint=DATABASE_ENDPOINT,
+ database=DATABASE_PATH,
+ credentials=ydb.credentials_from_env_variables(),
+ ) as driver:
+ driver.wait(timeout=10, fail_fast=True)
+ session = ydb.retry_operation_sync(
+ lambda: driver.table_client.session().create()
+ )
+ batch_size = 500
+ start_time = time.time()
+ for start in range(0, len(test_names_array), batch_size):
+ test_names_batch = test_names_array[start:start + batch_size]
+ history_query = f"""
PRAGMA AnsiInForEmptyOrNullableItemsCollections;
DECLARE $test_names AS List<Utf8>;
DECLARE $rn_max AS Int32;
DECLARE $build_type AS Utf8;
+
+ $test_names=[{','.join("'{0}'".format(x) for x in test_names_batch)}];
+ $rn_max = {last_n_runs_of_test_amount};
+ $build_type = '{build_type}';
$tests=(
SELECT
suite_folder ||'/' || test_name as full_name,test_name,build_type, commit, branch, run_timestamp, status, status_description,
ROW_NUMBER() OVER (PARTITION BY test_name ORDER BY run_timestamp DESC) AS rn
FROM
- `test_results/test_runs_results`
+ `test_results/test_runs_column`
where (job_name ='Nightly-run' or job_name like 'Postcommit%') and
build_type = $build_type and
suite_folder ||'/' || test_name in $test_names
@@ -50,46 +70,31 @@ def get_test_history(test_names_array, last_n_runs_of_test_amount, build_type):
WHERE rn <= $rn_max
ORDER BY test_name, run_timestamp;
"""
-
- with ydb.Driver(
- endpoint=DATABASE_ENDPOINT,
- database=DATABASE_PATH,
- credentials=ydb.credentials_from_env_variables(),
- ) as driver:
- driver.wait(timeout=10, fail_fast=True)
- session = ydb.retry_operation_sync(
- lambda: driver.table_client.session().create()
- )
-
- with session.transaction() as transaction:
- prepared_query = session.prepare(query)
-
- results = {}
- batch_size = 100
- for start in range(0, len(test_names_array), batch_size):
- test_names_batch = test_names_array[start:start + batch_size]
-
- query_params = {
- "$test_names": test_names_batch,
- "$rn_max": last_n_runs_of_test_amount,
- "$build_type": build_type,
+ query = ydb.ScanQuery(history_query, {})
+ it = driver.table_client.scan_query(query)
+ query_result = []
+
+ while True:
+ try:
+ result = next(it)
+ query_result = query_result + result.result_set.rows
+ except StopIteration:
+ break
+
+ for row in query_result:
+ if not row["full_name"].decode("utf-8") in results:
+ results[row["full_name"].decode("utf-8")] = {}
+
+ results[row["full_name"].decode("utf-8")][row["run_timestamp"]] = {
+ "status": row["status"],
+ "commit": row["commit"],
+ "datetime": datetime.datetime.fromtimestamp(int(row["run_timestamp"] / 1000000)).strftime("%H:%m %B %d %Y"),
+ "status_description": row["status_description"],
}
-
- result_set = session.transaction(ydb.SerializableReadWrite()).execute(
- prepared_query, parameters=query_params, commit_tx=True
- )
-
- for row in result_set[0].rows:
- if not row["full_name"].decode("utf-8") in results:
- results[row["full_name"].decode("utf-8")] = {}
-
- results[row["full_name"].decode("utf-8")][row["run_timestamp"]] = {
- "status": row["status"],
- "commit": row["commit"],
- "datetime": datetime.datetime.fromtimestamp(int(row["run_timestamp"] / 1000000)).strftime("%H:%m %B %d %Y"),
- "status_description": row["status_description"],
- }
- return results
+ end_time = time.time()
+ print(
+ f'durations of getting history for {len(test_names_array)} tests :{end_time-start_time} sec')
+ return results
if __name__ == "__main__":
diff --git a/.github/scripts/upload_tests_results.py b/.github/scripts/upload_tests_results.py
index 3e913e800db..399a3f983ca 100755
--- a/.github/scripts/upload_tests_results.py
+++ b/.github/scripts/upload_tests_results.py
@@ -2,16 +2,81 @@
import argparse
import configparser
+import datetime
import fnmatch
import os
+import posixpath
+import shlex
import subprocess
import sys
import time
-import ydb
import xml.etree.ElementTree as ET
+import ydb
+
from codeowners import CodeOwners
from concurrent.futures import ThreadPoolExecutor, as_completed
+from decimal import Decimal
+
+
+def create_tables(pool, table_path):
+ print(f"> create table: {table_path}")
+
+ def callee(session):
+ session.execute_scheme(f"""
+ CREATE table IF NOT EXISTS`{table_path}` (
+ build_type Utf8 NOT NULL,
+ job_name Utf8,
+ job_id Uint64,
+ commit Utf8,
+ branch Utf8 NOT NULL,
+ pull Utf8,
+ run_timestamp Timestamp NOT NULL,
+ test_id Utf8 NOT NULL,
+ suite_folder Utf8 NOT NULL,
+ test_name Utf8 NOT NULL,
+ duration Double,
+ status Utf8 NOT NULL,
+ status_description Utf8,
+ owners Utf8,
+ log Utf8,
+ logsdir Utf8,
+ stderr Utf8,
+ stdout Utf8,
+ PRIMARY KEY (`test_name`, `suite_folder`,build_type, branch, status, run_timestamp)
+ )
+ PARTITION BY HASH(`suite_folder`,test_name, build_type, branch )
+ WITH (STORE = COLUMN)
+ """)
+
+ return pool.retry_operation_sync(callee)
+
+
+def bulk_upsert(table_client, table_path, rows):
+ print(f"> bulk upsert: {table_path}")
+ column_types = (
+ ydb.BulkUpsertColumns()
+ .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("commit", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("duration", ydb.OptionalType(ydb.PrimitiveType.Double))
+ .add_column("job_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))
+ .add_column("job_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("log", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("logsdir", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("owners", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("pull", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("run_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
+ .add_column("status", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("status_description", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("stderr", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("stdout", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("test_id", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+
+ )
+ table_client.bulk_upsert(table_path, rows, column_types)
def parse_junit_xml(test_results_file, build_type, job_name, job_id, commit, branch, pull, run_timestamp):
@@ -23,7 +88,7 @@ def parse_junit_xml(test_results_file, build_type, job_name, job_id, commit, bra
suite_folder = testsuite.get("name")
for testcase in testsuite.findall("testcase"):
- name = testcase.get("name")
+ name = testcase.get("name")
duration = testcase.get("time")
status_description = ""
@@ -49,12 +114,12 @@ def parse_junit_xml(test_results_file, build_type, job_name, job_id, commit, bra
"commit": commit,
"branch": branch,
"pull": pull,
- "run_timestamp": run_timestamp,
+ "run_timestamp": int(run_timestamp)*1000000,
"job_name": job_name,
- "job_id": job_id,
+ "job_id": int(job_id),
"suite_folder": suite_folder,
"test_name": name,
- "duration": float(duration),
+ "duration": Decimal(duration),
"status": status,
"status_description": status_description.replace("\r\n", ";;").replace("\n", ";;").replace("\"", "'"),
"log": "" if testcase.find("properties/property/[@name='url:log']") is None else testcase.find("properties/property/[@name='url:log']").get('value'),
@@ -75,151 +140,32 @@ def get_codeowners_for_tests(codeowners_file_path, tests_data):
tests_data_with_owners = []
for test in tests_data:
target_path = f'{test["suite_folder"]}'
- owners = owners_odj.of(target_path)
- test["owners"] = joined_owners=";;".join([(":".join(x)) for x in owners])
+ owners = owners_odj.of(target_path)
+ test["owners"] = joined_owners = ";;".join(
+ [(":".join(x)) for x in owners])
tests_data_with_owners.append(test)
return tests_data_with_owners
-def create_table(session, sql):
-
- try:
- session.execute_scheme(sql)
- except ydb.issues.AlreadyExists:
- pass
- except Exception as e:
- print(f"Error creating table: {e}, sql:\n{sql}", file=sys.stderr)
- raise e
-
-
-def create_tests_table(session, table_path):
- # Creating of new table if not exist yet
- sql = f"""
- --!syntax_v1
- CREATE TABLE IF NOT EXISTS `{table_path}` (
- build_type Utf8,
- job_name Utf8,
- job_id Uint64,
- commit Utf8,
- branch Utf8,
- pull Utf8,
- run_timestamp Timestamp,
- test_id Utf8,
- suite_folder Utf8,
- test_name Utf8,
- duration Double,
- status Utf8,
- status_description Utf8,
- owners Utf8,
- log Utf8,
- logsdir Utf8,
- stderr Utf8,
- stdout Utf8,
- PRIMARY KEY(test_id)
- );
- """
- create_table(session, sql)
-
-
-def upload_results(pool, sql):
- with pool.checkout() as session:
- try:
- session.transaction().execute(sql, commit_tx=True)
- except Exception as e:
- print(f"Error executing query {e}, sql:\n{sql}", file=sys.stderr)
- raise e
-
-
-def prepare_and_upload_tests(pool, path, results, batch_size):
-
- total_records = len(results)
- if total_records == 0:
- print("Error:Object stored test results is empty, check test results artifacts")
- return 1
-
- with ThreadPoolExecutor(max_workers=10) as executor:
-
- for start in range(0, total_records, batch_size):
- futures = []
- end = min(start + batch_size, total_records)
- batch = results[start:end]
-
- sql = f"""--!syntax_v1
- UPSERT INTO `{path}`
- (build_type,
- job_name,
- job_id,
- commit,
- branch,
- pull,
- run_timestamp,
- test_id,
- suite_folder,
- test_name,
- duration,
- status,
- status_description,
- log,
- logsdir,
- stderr,
- stdout,
- owners) VALUES"""
- values = []
- for index, result in enumerate(batch):
-
- values.append(
- f"""
- ("{result['build_type']}",
- "{result['job_name']}",
- {result['job_id']},
- "{result['commit']}",
- "{result['branch']}",
- "{result['pull']}",
- DateTime::FromSeconds({result['run_timestamp']}),
- "{result['pull']}_{result['run_timestamp']}_{start+index}",
- "{result['suite_folder']}",
- "{result['test_name']}",
- {result['duration']},
- "{result['status']}",
- "{result['status_description']}",
- "{result['log']}",
- "{result['logsdir']}",
- "{result['stderr']}",
- "{result['stdout']}",
- "{result['owners']}")
- """
- )
- sql += ", ".join(values) + ";"
-
- futures.append(executor.submit(upload_results, pool, sql))
-
- for future in as_completed(futures):
- future.result() # Raise exception if occurred
-
-
-def create_pool(endpoint, database):
- driver_config = ydb.DriverConfig(
- endpoint,
- database,
- credentials=ydb.credentials_from_env_variables()
- )
-
- driver = ydb.Driver(driver_config)
- driver.wait(fail_fast=True, timeout=5)
- return ydb.SessionPool(driver)
-
-
def main():
parser = argparse.ArgumentParser()
- parser.add_argument('--test-results-file', action='store', dest="test_results_file", required=True, help='XML with results of tests')
- parser.add_argument('--build-type', action='store', dest="build_type", required=True, help='build type')
- parser.add_argument('--commit', default='store', dest="commit", required=True, help='commit sha')
- parser.add_argument('--branch', default='store', dest="branch", required=True, help='branch name ')
- parser.add_argument('--pull', action='store', dest="pull",required=True, help='pull number')
- parser.add_argument('--run-timestamp', action='store', dest="run_timestamp",required=True, help='time of test run start')
- parser.add_argument('--job-name', action='store', dest="job_name",required=True, help='job name where launched')
- parser.add_argument('--job-id', action='store', dest="job_id",required=True, help='job id of workflow')
+ parser.add_argument('--test-results-file', action='store',
+ required=True, help='XML with results of tests')
+ parser.add_argument('--build-type', action='store',
+ required=True, help='build type')
+ parser.add_argument('--commit', default='store',
+ dest="commit", required=True, help='commit sha')
+ parser.add_argument('--branch', default='store',
+ dest="branch", required=True, help='branch name ')
+ parser.add_argument('--pull', action='store', dest="pull",
+ required=True, help='pull number')
+ parser.add_argument('--run-timestamp', action='store',
+ dest="run_timestamp", required=True, help='time of test run start')
+ parser.add_argument('--job-name', action='store', dest="job_name",
+ required=True, help='job name where launched')
+ parser.add_argument('--job-id', action='store', dest="job_id",
+ required=True, help='job id of workflow')
args = parser.parse_args()
@@ -233,10 +179,9 @@ def main():
job_id = args.job_id
path_in_database = "test_results"
- batch_size_default = 30
dir = os.path.dirname(__file__)
git_root = f"{dir}/../.."
- codeowners = f"{git_root}/.github/CODEOWNERS"
+ codeowners = f"{git_root}/.github/TESTOWNERS"
config = configparser.ConfigParser()
config_file_path = f"{git_root}/.github/config/ydb_qa_db.ini"
config.read(config_file_path)
@@ -248,28 +193,64 @@ def main():
print(
"Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping"
)
- return 0
+ return 1
else:
# Do not set up 'real' variable from gh workflows because it interfere with ydb tests
# So, set up it locally
os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
"CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
]
+ test_table_name = f"{path_in_database}/test_runs_column"
+ full_path = posixpath.join(DATABASE_PATH, test_table_name)
+
+ with ydb.Driver(
+ endpoint=DATABASE_ENDPOINT,
+ database=DATABASE_PATH,
+ credentials=ydb.credentials_from_env_variables(),
+ ) as driver:
+ driver.wait(timeout=10, fail_fast=True)
+ session = ydb.retry_operation_sync(
+ lambda: driver.table_client.session().create()
+ )
- test_table_name = f"{path_in_database}/test_runs_results"
- # Prepare connection and table
- pool = create_pool(DATABASE_ENDPOINT, DATABASE_PATH)
- with pool.checkout() as session:
- create_tests_table(session, test_table_name)
-
- # Parse and upload
- results = parse_junit_xml(
- test_results_file, build_type, job_name, job_id, commit, branch, pull, run_timestamp
- )
- result_with_owners = get_codeowners_for_tests(codeowners, results)
- prepare_and_upload_tests(
- pool, test_table_name, results, batch_size=batch_size_default
- )
+ # Parse and upload
+ results = parse_junit_xml(
+ test_results_file, build_type, job_name, job_id, commit, branch, pull, run_timestamp
+ )
+ result_with_owners = get_codeowners_for_tests(codeowners, results)
+ prepared_for_update_rows = []
+ for index, row in enumerate(result_with_owners):
+ prepared_for_update_rows.append({
+ 'branch': row['branch'],
+ 'build_type': row['build_type'],
+ 'commit': row['commit'],
+ 'duration': row['duration'],
+ 'job_id': row['job_id'],
+ 'job_name': row['job_name'],
+ 'log': row['log'],
+ 'logsdir': row['logsdir'],
+ 'owners': row['owners'],
+ 'pull': row['pull'],
+ 'run_timestamp': row['run_timestamp'],
+ 'status_description': row['status_description'],
+ 'status': row['status'],
+ 'stderr': row['stderr'],
+ 'stdout': row['stdout'],
+ 'suite_folder': row['suite_folder'],
+ 'test_id': f"{row['pull']}_{row['run_timestamp']}_{index}",
+ 'test_name': row['test_name'],
+ })
+ print(f'upserting runs: {len(prepared_for_update_rows)} rows')
+ if prepared_for_update_rows:
+ with ydb.SessionPool(driver) as pool:
+ create_tables(pool, test_table_name)
+ bulk_upsert(driver.table_client, full_path,
+ prepared_for_update_rows)
+ print('tests updated')
+ else:
+ print('nothing to upsert')
+
+
if __name__ == "__main__":