#!/usr/bin/env python3 import datetime import inspect import json import os import sys import time import uuid import ydb from typing import List, Dict, Any, Optional, Callable, Tuple from contextlib import contextmanager # Explicit settings for long-running read queries via Query Service. # Without these the server applies its default deadline (~600s) which # is too short for full-day analytics scans. _LONG_QUERY_SETTINGS = ( ydb.BaseRequestSettings() .with_operation_timeout(3600) # 1h server-side .with_cancel_after(3600) .with_timeout(3600) # 1h gRPC deadline ) # Preserve Table Service semantics: return raw integers/bytes instead of # Python datetime/str so downstream scripts (e.g. get_test_history.py) # don't break. Table Service had native_*_in_result_sets=False by default; # Query Service flips them all to True by default. _QUERY_CLIENT_SETTINGS = ( ydb.QueryClientSettings() .with_native_date_in_result_sets(False) .with_native_datetime_in_result_sets(False) .with_native_timestamp_in_result_sets(False) .with_native_interval_in_result_sets(False) .with_native_json_in_result_sets(False) ) class YDBWrapper: """Wrapper for YDB with statistics logging. Use as:: with YDBWrapper() as w: w.execute_scan_query_with_metadata(...) w.create_table(...) w.bulk_upsert_batches(...) All data operations (scan, create_table, bulk_upsert, execute_dml) use a single cached main driver (no reconnection between calls). Statistics are written via a separate cached stats driver. Both drivers are created lazily and closed when exiting the ``with`` block. """ def __init__(self, config_path: str = None, enable_statistics: bool = None, script_name: str = None, silent: bool = False, use_local_config: bool = None): # use_local_config=None (default): YDB_QA_CONFIG env if set, else local config file # use_local_config=True: only local config file (ignore YDB_QA_CONFIG env) # use_local_config=False: YDB_QA_CONFIG env > config file (JSON) # By default logs go to stdout (as before) # If silent=True, logs go to stderr (for scripts called from other scripts) self._log_stream = sys.stderr if silent else sys.stdout _explicit_local = use_local_config # remember if caller forced the choice if use_local_config is None: use_local_config = not os.environ.get("YDB_QA_CONFIG") if use_local_config: resolved_path = self._default_config_path(config_path) enable_statistics = self._load_config_from_file(resolved_path, enable_statistics) if _explicit_local is True: self._config_source = f"file:{resolved_path} (forced, vars.YDB_QA_CONFIG ignored)" else: self._config_source = f"file:{resolved_path} (vars.YDB_QA_CONFIG not set)" else: ydb_qa_config_env = os.environ.get("YDB_QA_CONFIG") if ydb_qa_config_env: try: config_dict = json.loads(ydb_qa_config_env) enable_statistics = self._load_config_from_dict(config_dict, enable_statistics) self._config_source = "env:YDB_QA_CONFIG" except (json.JSONDecodeError, KeyError) as e: raise RuntimeError(f"Invalid YDB_QA_CONFIG format: {e}") else: resolved_path = self._default_config_path(config_path) enable_statistics = self._load_config_from_file(resolved_path, enable_statistics) if _explicit_local is False: self._config_source = f"file:{resolved_path} (fallback: vars.YDB_QA_CONFIG not set)" else: self._config_source = f"file:{resolved_path} (vars.YDB_QA_CONFIG not set)" self._log("info", f"YDB config source: {self._config_source}") # Statistics settings self._enable_statistics = enable_statistics self._cluster_version = None self._session_id = str(uuid.uuid4()) # One driver per endpoint for the whole wrapper lifetime self._driver = None self._stats_driver = None # Automatically determine script_name if not provided if script_name is None: script_name = self._get_caller_script_name() self._script_name = script_name # GitHub Action info - get once self._github_info = self._get_github_action_info() # Get cluster version once during initialization (uses cached main driver) try: self._get_cluster_version(self._get_driver()) except Exception as e: self._log("warning", f"Failed to get cluster version: {e}") self._cluster_version = "unknown" # Statistics: if stats and main use the same endpoint/path, reuse main driver (no extra connection) self._stats_available = None if self._enable_statistics: stats_same_as_main = ( self.stats_endpoint == self.database_endpoint and self.stats_path == self.database_path ) if stats_same_as_main: self._stats_available = True self._log("info", f"Statistics logging enabled (same DB as main) - session_id: {self._session_id}") else: self._stats_available = self._check_stats_availability() if self._stats_available: self._log("info", f"Statistics logging enabled - session_id: {self._session_id}") else: self._log("warning", "Statistics database is not available, statistics will not be logged") else: self._log("info", "Statistics logging disabled") @property def config_source(self) -> str: """Human-readable description of where YDB QA config was loaded from.""" return self._config_source @staticmethod def _default_config_path(config_path: Optional[str]) -> str: """Resolve default config path relative to this file if not provided.""" if config_path is not None: return config_path dir_path = os.path.dirname(__file__) return os.path.normpath(os.path.join(dir_path, "..", "..", "config", "ydb_qa_config.json")) def _load_config_from_file(self, config_path: str, enable_statistics: Optional[bool]) -> Optional[bool]: """Load config dict from JSON file and apply it.""" try: with open(config_path, 'r') as f: config_dict = json.load(f) return self._load_config_from_dict(config_dict, enable_statistics) except FileNotFoundError: raise RuntimeError(f"Config file not found: {config_path}") except json.JSONDecodeError as e: raise RuntimeError(f"Invalid JSON config file: {e}") def _load_config_from_dict(self, config_dict: dict, enable_statistics: bool = None): """Load configuration from dictionary""" dbs = config_dict["databases"] main = dbs["main"] stats = dbs.get("statistics", {}) variables = config_dict.get("variables", {}) flags = config_dict.get("flags", {}) # Store flags as attribute for access from other scripts self._flags = flags # Automatic field mapping config_mapping = { 'database_endpoint': main["endpoint"], 'database_path': main["path"], '_connection_timeout': main.get("connection_timeout", 60), '_main_db_tables': main.get("tables", {}), 'stats_endpoint': stats.get("endpoint", main["endpoint"]), 'stats_path': stats.get("path", main["path"]), '_stats_connection_timeout': stats.get("connection_timeout",60), 'stats_table': stats.get("tables", {}).get("query_statistics", "analytics/query_statistics"), '_stats_db_tables': stats.get("tables", {}) } for key, value in config_mapping.items(): setattr(self, key, value) # Enable statistics from flags (default True) if enable_statistics is None: enable_statistics = flags.get("enable_statistics", True) return enable_statistics def _get_caller_script_name(self) -> str: """Automatically determine the name of the script that called YDBWrapper""" try: # Get call stack stack = inspect.stack() # Find first frame that is outside ydb_wrapper.py for frame_info in stack: frame_filename = frame_info.filename # Skip current file (ydb_wrapper.py) if 'ydb_wrapper.py' not in frame_filename: # Get filename without path but with extension script_name = os.path.basename(frame_filename) return script_name # If not found, use default value return "unknown_script" except Exception: # On error return default value return "unknown_script" def _make_full_path(self, table_path: str) -> str: """Convert relative path to full path (with database_path) Args: table_path: Relative path to table Returns: Full path to table """ # If path is already full (starts with database_path), return as is if table_path.startswith(self.database_path): return table_path # Remove leading slash if present table_path = table_path.lstrip('/') # Add database_path return f"{self.database_path}/{table_path}" def __enter__(self): """Context manager - entry""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager - exit: close cached drivers""" if self._driver is not None: try: self._driver.stop() except Exception: pass self._driver = None if self._stats_driver is not None: try: self._stats_driver.stop() except Exception: pass self._stats_driver = None def _log(self, level: str, message: str, details: str = ""): """Universal logging""" timestamp = datetime.datetime.now().strftime("%H:%M:%S") icons = {"start": "🚀", "progress": "âŗ", "success": "✅", "error": "❌", "info": "â„šī¸", "warning": "âš ī¸", "stats": "📊"} if details: print(f"🕐 [{timestamp}] {icons.get(level, '📝')} {message} | {details}", file=self._log_stream) else: print(f"🕐 [{timestamp}] {icons.get(level, '📝')} {message}", file=self._log_stream) def _setup_credentials(self): """Setup YDB credentials""" if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ: raise RuntimeError("Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing") os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[ "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" ] def check_credentials(self): """Check for YDB credentials""" if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ: print("Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping", file=self._log_stream) return False # Do not set up 'real' variable from gh workflows because it interfere with ydb tests # So, set up it locally os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[ "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" ] return True def _get_cluster_version(self, driver) -> str: """Get YDB cluster version""" if self._cluster_version is not None: return self._cluster_version try: with ydb.QuerySessionPool(driver, query_client_settings=_QUERY_CLIENT_SETTINGS) as pool: result_sets = pool.execute_with_retries("SELECT Version() as version") for result_set in result_sets: if result_set.rows: self._cluster_version = result_set.rows[0]["version"] return self._cluster_version raise RuntimeError("No version data returned from cluster") except Exception as e: raise RuntimeError(f"Failed to get cluster version: {e}") def _get_driver(self): """Return the single cached main driver; create and connect once if needed.""" self._setup_credentials() if self._driver is None: self._driver = ydb.Driver( endpoint=self.database_endpoint, database=self.database_path, credentials=ydb.credentials_from_env_variables(), ) try: self._driver.wait(timeout=self._connection_timeout, fail_fast=True) except TimeoutError as e: self._log("error", f"Failed to connect to YDB: {e}") self._log("error", f"Endpoint: {self.database_endpoint}") self._log("error", f"Database: {self.database_path}") self._log("error", f"Timeout: {self._connection_timeout} seconds") self._log("error", "Possible causes: network issues, YDB service unavailable, or invalid credentials") self._log("error", f"Try increasing timeout with: export YDB_CONNECTION_TIMEOUT=30") try: self._driver.stop() except Exception: pass self._driver = None raise RuntimeError(f"YDB connection timeout after {self._connection_timeout}s: {e}") from e except Exception as e: self._log("error", f"Failed to connect to YDB: {e}") self._log("error", f"Endpoint: {self.database_endpoint}") self._log("error", f"Database: {self.database_path}") self._log("error", f"Timeout: {self._connection_timeout} seconds") self._log("error", "Check your YDB credentials and network connectivity") try: self._driver.stop() except Exception: pass self._driver = None raise RuntimeError(f"YDB connection failed: {e}") from e if self._cluster_version is None: try: self._get_cluster_version(self._driver) except Exception as e: self._log("warning", f"Failed to get cluster version during operation: {e}") self._cluster_version = "unknown" return self._driver @contextmanager def get_driver(self): """Context manager that yields the cached main driver (for compatibility).""" yield self._get_driver() def _check_stats_availability(self) -> bool: """Check statistics database availability and cache stats driver on success (called once in __init__).""" driver = None try: self._setup_credentials() driver = ydb.Driver( endpoint=self.stats_endpoint, database=self.stats_path, credentials=ydb.credentials_from_env_variables() ) driver.wait(timeout=self._stats_connection_timeout, fail_fast=True) with ydb.QuerySessionPool(driver, query_client_settings=_QUERY_CLIENT_SETTINGS) as pool: pool.execute_with_retries("SELECT 1 as test") self._stats_driver = driver return True except Exception: if driver is not None: try: driver.stop() except Exception: pass return False def _log_statistics(self, operation_type: str, query: str, duration: float, status: str, error: str = None, rows_affected: int = None, cluster_version: str = None, table_path: str = None, query_name: str = None): """Log operation statistics (synchronously) Args: query_name: Optional name for the query (e.g., monitoring query filename) """ # Check if we need to log statistics if not self._enable_statistics or not self._stats_available: return # Prepare data for writing # Use timestamp in microseconds (as YDB Timestamp) timestamp_us = int(time.time() * 1000000) stats_data = { 'timestamp': timestamp_us, 'session_id': self._session_id, 'operation_type': operation_type, 'query': query, 'duration_ms': int(duration * 1000), 'status': status, 'error': error, 'rows_affected': rows_affected, 'script_name': self._script_name, 'query_name': query_name, 'cluster_version': cluster_version, 'database_endpoint': self.database_endpoint, 'database_path': self.database_path, 'table_path': self._normalize_table_path(table_path), 'github_workflow_name': self._github_info['workflow_name'], 'github_run_id': self._github_info['run_id'], 'github_run_url': self._github_info['run_url'] } # Log details of statistics being sent self._log("stats", "Sending statistics", f"operation={operation_type}, status={status}, duration={duration:.2f}s, rows={rows_affected or 0}, cluster={cluster_version or 'unknown'}") # Write statistics synchronously send_start = time.time() success = self._write_stats_sync(stats_data) send_duration = time.time() - send_start # Log send result if success: self._log("success", f"Statistics sent successfully in {send_duration:.2f}s") def _write_stats_sync(self, stats_data): """Synchronous statistics writing (uses stats driver or main driver if same DB).""" if not self._stats_available: return False try: self._setup_credentials() driver = self._stats_driver if self._stats_driver is not None else self._get_driver() # Ensure statistics table exists self._ensure_stats_table_exists(driver) stats_data_list = [stats_data] table_client = ydb.TableClient(driver) column_types = ( ydb.BulkUpsertColumns() .add_column("timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp)) .add_column("session_id", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("operation_type", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("query", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("duration_ms", ydb.OptionalType(ydb.PrimitiveType.Uint64)) .add_column("status", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("error", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("rows_affected", ydb.OptionalType(ydb.PrimitiveType.Uint64)) .add_column("script_name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("query_name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("cluster_version", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("database_endpoint", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("database_path", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("table_path", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("github_workflow_name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("github_run_id", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("github_run_url", ydb.OptionalType(ydb.PrimitiveType.Utf8)) ) full_path = f"{self.stats_path}/{self.stats_table}" table_client.bulk_upsert(full_path, stats_data_list, column_types) return True except TimeoutError as e: self._log("warning", f"Failed to send statistics: connection timeout (timeout={self._stats_connection_timeout}s)") self._stats_available = False return False except Exception as e: error_type = type(e).__name__ error_msg = str(e) self._log("warning", f"Failed to send statistics ({error_type}): {error_msg}") return False def _ensure_stats_table_exists(self, driver): """Create statistics table if it doesn't exist""" def create_stats_table(session): create_sql = f""" CREATE TABLE IF NOT EXISTS `{self.stats_table}` ( `timestamp` Timestamp NOT NULL, `session_id` Utf8 NOT NULL, `operation_type` Utf8 NOT NULL, `query` Utf8, `duration_ms` Uint64 NOT NULL, `status` Utf8 NOT NULL, `error` Utf8, `rows_affected` Uint64, `script_name` Utf8 NOT NULL, `query_name` Utf8, `cluster_version` Utf8, `database_endpoint` Utf8, `database_path` Utf8, `table_path` Utf8, `github_workflow_name` Utf8, `github_run_id` Utf8, `github_run_url` Utf8, PRIMARY KEY (`timestamp`, `session_id`, `operation_type`, `script_name`) ) PARTITION BY HASH(`session_id`) WITH (STORE = COLUMN) """ session.execute_scheme(create_sql) with ydb.SessionPool(driver) as pool: pool.retry_operation_sync(create_stats_table) def _execute_with_logging(self, operation_type: str, operation_func: Callable, query: str = None, table_path: str = None, query_name: str = None) -> Any: """Universal method for executing operations with logging""" start_time = None # Log operation start self._log("start", f"Executing {operation_type}") if query: # For bulk_upsert don't show details for each batch if operation_type != "bulk_upsert": self._log("info", f"Query details:\n{query}") status = "success" error = None rows_affected = 0 # Use _cluster_version or 'unknown' if it's None cluster_version = self._cluster_version or "unknown" try: driver = self._get_driver() start_time = time.time() if operation_type == "scan_query": results = [] batch_count = 0 scan_start = time.time() with ydb.QuerySessionPool(driver, query_client_settings=_QUERY_CLIENT_SETTINGS) as pool: with pool.checkout() as session: for result_set in session.execute(query, settings=_LONG_QUERY_SETTINGS): batch_count += 1 batch_rows = result_set.rows or [] batch_size = len(batch_rows) results.extend(batch_rows) rows_affected += batch_size if (batch_count % 50 == 0 or (rows_affected > 0 and rows_affected % 10000 == 0)) and rows_affected > 0: elapsed = time.time() - scan_start self._log("progress", f"Batch {batch_count}: {batch_size} rows (total: {rows_affected})", f"{elapsed:.2f}s") duration = time.time() - scan_start if rows_affected == 0: self._log("success", "Scan query completed", f"No results found, Duration: {duration:.2f}s") else: self._log("success", "Scan query completed", f"Total results: {rows_affected} rows, Duration: {duration:.2f}s") self._log_statistics( operation_type=operation_type, query=query, duration=duration, status=status, rows_affected=rows_affected, cluster_version=cluster_version, table_path=table_path, query_name=query_name ) return results elif operation_type == "scan_query_with_metadata": result = operation_func(driver) data, metadata = result rows_affected = len(data) if isinstance(data, list) else 0 duration = getattr(self, "_last_scan_duration", None) if duration is None: duration = time.time() - start_time else: del self._last_scan_duration if rows_affected == 0: self._log("success", f"Scan query with metadata completed", f"No results found, Duration: {duration:.2f}s") else: self._log("success", f"Scan query with metadata completed", f"Total results: {rows_affected} rows, Duration: {duration:.2f}s") self._log_statistics( operation_type="scan_query", query=query, duration=duration, status=status, rows_affected=rows_affected, cluster_version=cluster_version, table_path=table_path, query_name=query_name ) return result else: result = operation_func(driver) if isinstance(result, (int, float)) and operation_type in ["bulk_upsert", "create_table"]: rows_affected = int(result) elif isinstance(result, tuple) and len(result) == 2 and operation_type == "scan_query": data, metadata = result rows_affected = len(data) if isinstance(data, list) else 0 elif isinstance(result, list) and operation_type == "scan_query": rows_affected = len(result) end_time = time.time() duration = end_time - start_time self._log("success", f"{operation_type} completed", f"Duration: {duration:.2f}s") self._log_statistics( operation_type=operation_type, query=query or f"{operation_type} operation", duration=duration, status=status, rows_affected=rows_affected, cluster_version=cluster_version, table_path=table_path, query_name=query_name ) return result except Exception as e: end_time = time.time() duration = end_time - start_time if start_time is not None else 0 status = "error" error = str(e) self._log("error", f"{operation_type} failed", f"Error: {error}, Duration: {duration:.2f}s") # Normalize operation_type for statistics (use "scan_query" for both scan operations) stats_operation_type = "scan_query" if operation_type == "scan_query_with_metadata" else operation_type # Log error statistics self._log_statistics( operation_type=stats_operation_type, query=query or f"{operation_type} operation", duration=duration, status=status, error=error, cluster_version=cluster_version, table_path=table_path if operation_type == "bulk_upsert" else None, query_name=query_name ) raise def execute_scan_query(self, query: str, query_name: str = None) -> List[Dict[str, Any]]: """Execute scan query with logging""" return self._execute_with_logging("scan_query", None, query, None, query_name) def execute_scan_query_with_metadata(self, query: str, query_name: str = None) -> Tuple[List[Dict[str, Any]], List[Tuple[str, Any]]]: """Execute scan query with return of data and column metadata""" def operation(driver): results = [] column_types = None scan_start = time.time() with ydb.QuerySessionPool(driver, query_client_settings=_QUERY_CLIENT_SETTINGS) as pool: with pool.checkout() as session: for result_set in session.execute(query, settings=_LONG_QUERY_SETTINGS): if column_types is None: column_types = [(col.name, col.type) for col in result_set.columns] results.extend(result_set.rows or []) self._last_scan_duration = time.time() - scan_start if column_types is None: column_types = [] return results, column_types return self._execute_with_logging("scan_query_with_metadata", operation, query, None, query_name) def create_table(self, table_path: str, create_sql: str): """Create table with logging Args: table_path: Relative path to table (e.g., 'test_results/test_runs') create_sql: SQL for table creation """ # Convert to full path for YDB (if not already full) full_path = self._make_full_path(table_path) def operation(driver): def callee(session): session.execute_scheme(create_sql) with ydb.SessionPool(driver) as pool: pool.retry_operation_sync(callee) return 1 # Return 1 to indicate table creation return self._execute_with_logging("create_table", operation, create_sql, table_path) def bulk_upsert(self, table_path: str, rows: List[Dict[str, Any]], column_types: ydb.BulkUpsertColumns): """Execute bulk upsert with logging Args: table_path: Relative path to table (e.g., 'test_results/test_runs') """ # Convert to full path for YDB full_path = self._make_full_path(table_path) rows_count = len(rows) if rows else 0 def operation(driver): table_client = ydb.TableClient(driver) table_client.bulk_upsert(full_path, rows, column_types) return rows_count # Return row count for statistics return self._execute_with_logging("bulk_upsert", operation, f"BULK_UPSERT to {table_path}", table_path) def bulk_upsert_batches(self, table_path: str, all_rows: List[Dict[str, Any]], column_types: ydb.BulkUpsertColumns, batch_size: int = 1000, query_name: str = None): """Execute bulk upsert with batching and aggregated statistics Args: table_path: Relative path to table (e.g., 'test_results/test_runs') all_rows: All data to insert column_types: Column types batch_size: Batch size (default 1000) query_name: Optional name for the query (e.g., table name) """ # Convert to full path for YDB full_path = self._make_full_path(table_path) start_time = None total_rows = len(all_rows) if total_rows == 0: self._log("info", "bulk_upsert_batches: no rows to insert") return num_batches = (total_rows - 1) // batch_size + 1 self._log("start", f"Executing bulk_upsert_batches", f"{total_rows} rows in {num_batches} batches of {batch_size}") status = "success" error = None # Use _cluster_version or 'unknown' if it's None cluster_version = self._cluster_version or "unknown" try: driver = self._get_driver() start_time = time.time() table_client = ydb.TableClient(driver) for batch_num, start_idx in enumerate(range(0, total_rows, batch_size), 1): batch_rows = all_rows[start_idx:start_idx + batch_size] table_client.bulk_upsert(full_path, batch_rows, column_types) if batch_num % 10 == 0 or start_idx + batch_size >= total_rows: elapsed = time.time() - start_time processed = min(start_idx + batch_size, total_rows) self._log("progress", f"Batch {batch_num}/{num_batches}", f"{processed}/{total_rows} rows, {elapsed:.2f}s") duration = time.time() - start_time self._log("success", f"bulk_upsert_batches completed", f"Total: {total_rows} rows in {num_batches} batches, Duration: {duration:.2f}s") # Log ONE statistics record for the entire operation self._log_statistics( operation_type="bulk_upsert", query=f"BULK_UPSERT to {table_path} ({total_rows} rows in {num_batches} batches)", duration=duration, status=status, rows_affected=total_rows, cluster_version=cluster_version, table_path=table_path, query_name=query_name ) except Exception as e: duration = time.time() - start_time if start_time is not None else 0 status = "error" error = str(e) self._log("error", f"bulk_upsert_batches failed", f"Error: {error}, Duration: {duration:.2f}s") self._log_statistics( operation_type="bulk_upsert", query=f"BULK_UPSERT to {table_path}", duration=duration, status=status, error=error, cluster_version=cluster_version, table_path=table_path, query_name=query_name ) raise def execute_dml(self, query: str, parameters: Dict[str, Any] = None, query_name: str = None): """Execute DML query (INSERT/UPDATE/DELETE) with parameters Args: query: SQL query with DECLARE parameters parameters: Parameters dictionary {$param_name: value} query_name: Query name for logging """ def operation(driver): # Use Query Service API because Table Service DML does not support # modifications for column shard tables. with ydb.QuerySessionPool(driver, query_client_settings=_QUERY_CLIENT_SETTINGS) as pool: pool.execute_with_retries(query=query, parameters=parameters or {}) return 1 # Successful execution return self._execute_with_logging("dml_query", operation, query, None, query_name) def _get_github_action_info(self) -> dict: """Get GitHub Action information if script is run in GitHub Actions""" github_info = { "workflow_name": None, "run_id": None, "run_url": None } try: # GitHub Actions sets these environment variables workflow_name = os.environ.get("GITHUB_WORKFLOW") run_id = os.environ.get("GITHUB_RUN_ID") repository = os.environ.get("GITHUB_REPOSITORY") if workflow_name: github_info["workflow_name"] = workflow_name if run_id and repository: github_info["run_id"] = run_id github_info["run_url"] = f"https://github.com/{repository}/actions/runs/{run_id}" except Exception: # Ignore errors getting GitHub information pass return github_info def _normalize_table_path(self, table_path: str) -> str: """Normalize table path - exclude database_path for brevity""" if not table_path: return None # If path starts with database_path, remove it if self.database_path and table_path.startswith(self.database_path): normalized = table_path[len(self.database_path):] # Remove leading slash if present if normalized.startswith('/'): normalized = normalized[1:] return normalized return table_path def get_table_path(self, table_name: str, database: str = "main") -> str: """Get table path from configuration Args: table_name: Table name (e.g., 'test_results') database: Database ('main' or 'statistics') Returns: Table path relative to database Raises: KeyError: If table not found in configuration """ if database == "main": if table_name not in self._main_db_tables: raise KeyError(f"Table '{table_name}' not found in databases.main.tables config") return self._main_db_tables[table_name] elif database == "statistics": if table_name not in self._stats_db_tables: raise KeyError(f"Table '{table_name}' not found in databases.statistics.tables config") return self._stats_db_tables[table_name] else: raise ValueError(f"Unknown database: {database}. Use 'main' or 'statistics'") def get_flag(self, flag_name: str, default: Any = None) -> Any: """Get flag value from configuration Args: flag_name: Flag name (e.g., 'enable_backup_write') default: Default value if flag is not found Returns: Flag value or default if not found """ flags = getattr(self, '_flags', {}) return flags.get(flag_name, default) def get_available_tables(self, database: str = "main") -> dict: """Get dictionary of all available tables Args: database: Database ('main' or 'statistics') Returns: Dictionary {table_name: table_path} """ if database == "main": return self._main_db_tables.copy() elif database == "statistics": return self._stats_db_tables.copy() else: raise ValueError(f"Unknown database: {database}") def get_cluster_info(self) -> Dict[str, Any]: """Get cluster and statistics information (without creating new connection)""" try: # Use already obtained version, don't create new connection version = self._cluster_version # Check statistics status stats_status = "disabled" if self._enable_statistics: stats_status = "enabled_and_available" if self._stats_available else "enabled_but_unavailable" return { 'session_id': self._session_id, 'version': version, 'endpoint': self.database_endpoint, 'database': self.database_path, 'statistics_enabled': self._enable_statistics, 'statistics_status': stats_status, 'statistics_endpoint': self.stats_endpoint, 'statistics_database': self.stats_path, 'statistics_table': self.stats_table, 'github_workflow': self._github_info['workflow_name'], 'github_run_id': self._github_info['run_id'], 'github_run_url': self._github_info['run_url'] } except Exception as e: return { 'session_id': self._session_id, 'version': None, 'endpoint': self.database_endpoint, 'database': self.database_path, 'error': str(e), 'statistics_enabled': self._enable_statistics, 'statistics_status': "disabled", 'statistics_endpoint': self.stats_endpoint, 'statistics_database': self.stats_path, 'statistics_table': self.stats_table }