summaryrefslogtreecommitdiffstats
path: root/.github/scripts/analytics/sync_area_to_owner_mapping.py
blob: 182062160e0710853d2b6812c9eb1e2b1c256f9b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python3
"""
Sync area -> owner_team mapping from .github/config/owner_area_mapping.json to YDB.
Table: one row per area, join in SQL (e.g. github.sql) to get owner_team for issues by area.
"""

import json
import os
import sys
import ydb
from typing import List, Set, Tuple
from ydb_wrapper import YDBWrapper


def load_area_to_owner(config_dir: str) -> List[Tuple[str, str]]:
    """Build (area, owner_team) from owner_area_mapping.json (owner -> area or list of areas). owner_team stored lowercase for consistency (Topics -> topics)."""
    path = os.path.join(config_dir, 'owner_area_mapping.json')
    area_to_owner = {}
    with open(path, 'r', encoding='utf-8') as f:
        owner_to_area = json.load(f)
    for owner_team, area_or_list in owner_to_area.items():
        areas = [area_or_list] if isinstance(area_or_list, str) else area_or_list
        owner_lower = owner_team.lower()
        for area in areas:
            if area:
                area_to_owner[area] = owner_lower
    return list(area_to_owner.items())


def create_table(ydb_wrapper, table_path: str):
    create_sql = f"""
        CREATE TABLE IF NOT EXISTS `{table_path}` (
            `area` Utf8 NOT NULL,
            `owner_team` Utf8 NOT NULL,
            PRIMARY KEY (`area`)
        )
        WITH (STORE = COLUMN)
    """
    ydb_wrapper.create_table(table_path, create_sql)


def get_existing_areas(ydb_wrapper: YDBWrapper, table_path: str) -> Set[str]:
    query = f"""
        SELECT area
        FROM `{table_path}`
    """
    rows = ydb_wrapper.execute_scan_query(query, "sync_area_to_owner_mapping_existing_areas")
    return {row["area"] for row in rows if row.get("area")}


def delete_stale_areas(ydb_wrapper: YDBWrapper, table_path: str, stale_areas: List[str]) -> None:
    if not stale_areas:
        return

    delete_query = f"""
        DECLARE $area AS Utf8;

        DELETE FROM `{table_path}`
        WHERE area = $area;
    """
    for area in stale_areas:
        ydb_wrapper.execute_dml(
            delete_query,
            {"$area": area},
            query_name="sync_area_to_owner_mapping_delete_stale_area",
        )


def main():
    script_dir = os.path.dirname(os.path.abspath(__file__))
    config_dir = os.path.join(script_dir, '..', '..', 'config')
    rows = load_area_to_owner(config_dir)
    if not rows:
        print("No mapping rows loaded")
        return 1

    with YDBWrapper() as ydb_wrapper:
        if not ydb_wrapper.check_credentials():
            return 1
        table_path = ydb_wrapper.get_table_path("area_to_owner_mapping")
        create_table(ydb_wrapper, table_path)

        bulk_rows = [{'area': a, 'owner_team': o} for a, o in rows]
        desired_areas = {a for a, _ in rows}
        col_types = (
            ydb.BulkUpsertColumns()
            .add_column("area", ydb.PrimitiveType.Utf8)
            .add_column("owner_team", ydb.PrimitiveType.Utf8)
        )
        ydb_wrapper.bulk_upsert(table_path, bulk_rows, col_types)
        print(f"Upserted {len(bulk_rows)} rows to {table_path}")

        existing_areas = get_existing_areas(ydb_wrapper, table_path)
        stale_areas = sorted(existing_areas - desired_areas)
        delete_stale_areas(ydb_wrapper, table_path, stale_areas)
        if stale_areas:
            print(f"Deleted {len(stale_areas)} stale area rows from {table_path}")
        else:
            print("No stale area rows to delete")
    return 0


if __name__ == "__main__":
    sys.exit(main())