1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
|
#!/usr/bin/env python3
import argparse
import json
import os
import sys
import ydb
from decimal import Decimal
from testowners_utils import get_testowners_for_tests
from ydb_wrapper import YDBWrapper
max_characters_for_status_description = int(7340032/3) #workaround for error "cannot split batch in according to limits: there is row with size more then limit (7340032)"
def get_column_types():
"""Get column types for bulk upsert (base columns, error_type and metrics added conditionally)"""
return (
ydb.BulkUpsertColumns()
.add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("commit", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("duration", ydb.OptionalType(ydb.PrimitiveType.Double))
.add_column("job_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))
.add_column("job_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("log", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("logsdir", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("owners", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("pull", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("run_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
.add_column("status", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("status_description", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("stderr", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("stdout", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("test_id", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
)
def get_table_schema(table_path):
"""Get SQL schema for table creation"""
return f"""
CREATE TABLE IF NOT EXISTS `{table_path}` (
-- Primary key columns (in PRIMARY KEY order)
run_timestamp Timestamp NOT NULL,
build_type Utf8 NOT NULL,
branch Utf8 NOT NULL,
full_name Utf8 NOT NULL,
test_name Utf8 NOT NULL,
suite_folder Utf8 NOT NULL,
status Utf8 NOT NULL,
-- Other required columns
test_id Utf8 NOT NULL,
-- Optional columns
job_name Utf8,
job_id Uint64,
commit Utf8,
pull Utf8,
duration Double,
status_description Utf8,
owners Utf8,
log Utf8,
logsdir Utf8,
stderr Utf8,
stdout Utf8,
error_type Utf8,
metadata Json,
metrics Json,
-- Primary key definition
PRIMARY KEY (run_timestamp, build_type, branch, full_name, test_name, suite_folder, status)
)
PARTITION BY HASH(run_timestamp, build_type, branch, suite_folder)
WITH (
STORE = COLUMN
);
"""
def parse_build_results_report(test_results_file, build_type, job_name, job_id, commit, branch, pull, run_timestamp):
with open(test_results_file, 'r') as f:
report = json.load(f)
results = []
for result in report.get("results", []):
# Filtering (suite, build, configure) is done by transform_build_results.py
status = result.get("status")
if not status:
continue
suite_folder = result.get("path", "")
name_part = result.get("name", "")
subtest_name = result.get("subtest_name", "")
# Format test_name: name.subtest_name (same as generate-summary.py)
# This matches the format used in generate-summary.py and other scripts
if subtest_name:
if name_part:
test_name = f"{name_part}.{subtest_name}"
else:
test_name = subtest_name
else:
test_name = name_part
# Get duration from result
duration = result.get("duration", 0)
# Determine status
# Status normalization (OK->PASSED, NOT_LAUNCHED->SKIPPED, mute->MUTE) is done by transform_build_results.py
status_str = result.get("status", "OK")
error_type = result.get("error_type", "")
status_description = result.get("rich-snippet", "") # Already cleaned by transform_build_results.py
if status_str == "MUTE":
status = "mute"
elif status_str == "FAILED":
status = "failure"
elif status_str == "ERROR":
status = "error"
elif status_str == "SKIPPED":
status = "skipped"
else:
# OK, PASSED, or any other status -> "passed"
status = "passed"
# Extract log URLs from links (updated by transform_build_results.py with URLs)
# Links format: {"log": ["https://..."], "stdout": ["https://..."], "logsdir": ["https://..."]}
links = result.get("links", {})
def get_link_url(link_type):
if link_type in links and isinstance(links[link_type], list) and len(links[link_type]) > 0:
return links[link_type][0] # Take first URL from array
return ""
log_url = get_link_url("log")
logsdir_url = get_link_url("logsdir")
stderr_url = get_link_url("stderr")
stdout_url = get_link_url("stdout")
# Determine entity_type: suite, chunk, or test
if result.get("suite"):
entity_type = "suite"
elif result.get("chunk"):
entity_type = "chunk"
else:
entity_type = "test"
# Build metadata JSON from available fields
metadata = {
"chunk_hid": result.get("chunk_hid"),
"suite_hid": result.get("suite_hid"),
"uid": result.get("uid"),
"toolchain": result.get("toolchain"),
"size": result.get("size"),
"name": result.get("name"),
"id": result.get("id"),
"hid": result.get("hid"),
"entity_type": entity_type,
}
# Remove None values from metadata
metadata = {k: v for k, v in metadata.items() if v is not None}
# Convert to JSON string, or None if empty
metadata_json = json.dumps(metadata) if metadata else None
# Always add metrics as separate field (default to empty dict if not available)
metrics = result.get("metrics", {})
metrics_json = json.dumps(metrics) if metrics is not None else None
results.append(
{
"build_type": build_type,
"commit": commit,
"branch": branch,
"pull": pull,
"run_timestamp": int(run_timestamp)*1000000,
"job_name": job_name,
"job_id": int(job_id),
"suite_folder": suite_folder,
"test_name": test_name,
"duration": Decimal(duration),
"status": status,
"status_description": status_description[:max_characters_for_status_description],
"log": log_url,
"logsdir": logsdir_url,
"stderr": stderr_url,
"stdout": stdout_url,
"error_type": error_type if error_type else None,
"metadata": metadata_json,
"metrics": metrics_json,
}
)
return results
def check_table_schema(wrapper, table_path):
"""Check table schema and return which columns exist (single table path)."""
schema_check_query = f"SELECT * FROM `{table_path}` LIMIT 1"
try:
_, column_metadata = wrapper.execute_scan_query_with_metadata(schema_check_query)
existing_columns = {col_name for col_name, _ in (column_metadata or [])}
has_full_name = 'full_name' in existing_columns
has_metadata = 'metadata' in existing_columns
has_error_type = 'error_type' in existing_columns
has_metrics = 'metrics' in existing_columns
return has_full_name, has_metadata, has_error_type, has_metrics
except Exception as e:
# Table doesn't exist or error - assume old schema
print(f'Table schema check failed (assuming old schema): {e}')
return False, False, False, False
def prepare_rows_for_upload(results_with_owners):
"""Prepare rows for upload - always include full_name, metadata and metrics"""
prepared_rows = []
for index, row in enumerate(results_with_owners):
upload_row = {
'branch': row['branch'],
'build_type': row['build_type'],
'commit': row['commit'],
'duration': row['duration'],
'job_id': row['job_id'],
'job_name': row['job_name'],
'log': row['log'],
'logsdir': row['logsdir'],
'owners': row['owners'],
'pull': row['pull'],
'run_timestamp': row['run_timestamp'],
'status_description': row['status_description'],
'status': row['status'],
'stderr': row['stderr'],
'stdout': row['stdout'],
'suite_folder': row['suite_folder'],
'test_id': f"{row['pull']}_{row['run_timestamp']}_{index}",
'test_name': row['test_name'],
'error_type': row.get('error_type'),
# Format: path/name.subtest_name (same as generate-summary.py)
'full_name': f"{row['suite_folder']}/{row['test_name']}",
'metadata': row.get('metadata'), # JSON string from parse_build_results_report
'metrics': row.get('metrics'), # JSON string from parse_build_results_report
}
prepared_rows.append(upload_row)
return prepared_rows
def filter_rows_for_schema(rows, has_full_name, has_metadata, has_error_type, has_metrics):
"""Filter rows to match table schema - remove fields that don't exist in table"""
filtered_rows = []
for row in rows:
filtered_row = row.copy()
# Remove full_name if table doesn't have it
if not has_full_name:
filtered_row.pop('full_name', None)
# Ensure error_type is present if table has it, otherwise remove it
if has_error_type:
# Make sure error_type key exists in row (set to None if missing)
if 'error_type' not in filtered_row:
filtered_row['error_type'] = None
else:
filtered_row.pop('error_type', None)
# Handle metadata: remove if table doesn't have it, or convert empty JSON to None
if not has_metadata:
filtered_row.pop('metadata', None)
elif 'metadata' in filtered_row:
# If metadata is empty JSON string "{}", convert to None (like export_issues_to_ydb.py)
if filtered_row['metadata'] == "{}":
filtered_row['metadata'] = None
# Ensure metrics is present if table has it, otherwise remove it
if has_metrics:
# Make sure metrics key exists in row (set to None if missing)
if 'metrics' not in filtered_row:
filtered_row['metrics'] = None
else:
filtered_row.pop('metrics', None)
filtered_rows.append(filtered_row)
return filtered_rows
def prepare_column_types(has_full_name, has_metadata, has_error_type, has_metrics):
"""Build column types based on schema"""
column_types = get_column_types()
if has_full_name:
column_types = column_types.add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
if has_metadata:
column_types = column_types.add_column("metadata", ydb.OptionalType(ydb.PrimitiveType.Json))
if has_error_type:
column_types = column_types.add_column("error_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
if has_metrics:
column_types = column_types.add_column("metrics", ydb.OptionalType(ydb.PrimitiveType.Json))
return column_types
def upload_to_table(wrapper, table_path, rows, column_types, table_name="table"):
"""Upload rows to specified table"""
if not rows:
print(f'No rows to upload to {table_name}')
return
print(f'Uploading {len(rows)} test results to {table_name}')
wrapper.bulk_upsert_batches(
table_path,
rows,
column_types,
batch_size=1000
)
print(f'Successfully uploaded to {table_name}')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--test-results-file', action='store',
required=True, help='build-results-report JSON with results of tests')
parser.add_argument('--build-type', action='store',
required=True, help='build type')
parser.add_argument('--commit', default='store',
dest="commit", required=True, help='commit sha')
parser.add_argument('--branch', default='store',
dest="branch", required=True, help='branch name ')
parser.add_argument('--pull', action='store', dest="pull",
required=True, help='pull number')
parser.add_argument('--run-timestamp', action='store',
dest="run_timestamp", required=True, help='time of test run start')
parser.add_argument('--job-name', action='store', dest="job_name",
required=True, help='job name where launched')
parser.add_argument('--job-id', action='store', dest="job_id",
required=True, help='job id of workflow')
args = parser.parse_args()
try:
with YDBWrapper() as wrapper:
# Check credentials
if not wrapper.check_credentials():
print("Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping")
return 1
# Parse build-results-report JSON with test results
results = parse_build_results_report(
args.test_results_file, args.build_type, args.job_name, args.job_id,
args.commit, args.branch, args.pull, args.run_timestamp
)
# Add owner information
result_with_owners = get_testowners_for_tests(results)
# Get table paths
test_table_path = wrapper.get_table_path("test_results")
# Create table if it doesn't exist (IF NOT EXISTS - won't affect existing table)
wrapper.create_table(test_table_path, get_table_schema(test_table_path))
# Prepare rows with full data (always includes full_name, metadata and metrics)
prepared_rows = prepare_rows_for_upload(result_with_owners)
# Check main table schema
has_full_name, has_metadata, has_error_type, has_metrics = check_table_schema(wrapper, test_table_path)
print(f'Main table schema: full_name={has_full_name}, metadata={has_metadata}, error_type={has_error_type}, metrics={has_metrics}')
# Filter rows and prepare column types for main table
main_rows = filter_rows_for_schema(prepared_rows, has_full_name, has_metadata, has_error_type, has_metrics)
main_column_types = prepare_column_types(has_full_name, has_metadata, has_error_type, has_metrics)
# Upload to main table
upload_to_table(wrapper, test_table_path, main_rows, main_column_types, "main table")
except Exception as e:
print(f"Warning: Failed to upload test results to YDB: {e}")
print("This is not a critical error, continuing with CI process...")
return 0
return 0
if __name__ == "__main__":
sys.exit(main())
|