1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
|
#!/usr/bin/env python3
"""
Send batched Telegram digests for new muted-test GitHub issues.
How it works
------------
Issues are placed into ``digest_queue`` at the moment they are created
(by mute/create_new_muted_ya.py). This script reads unsent rows
(sent_at IS NULL), sends per-team Telegram messages, then marks rows
as sent by writing sent_at = NOW().
There are no timing assumptions, no cursors, no historical-data floods.
The queue is the single source of truth for "what still needs to be sent".
Scheduled by ``.github/workflows/telegram_scheduled_notifications.yml`` (job **Mute digest to Telegram**).
Reads profiles from .github/config/mute_issue_and_digest_config.json
and runs only those whose ``schedule_utc_hours`` contains the current UTC hour
and whose ``schedule_weekdays`` contains the current ISO weekday (1=Mon … 7=Sun).
Omitted ``schedule_weekdays`` defaults to Mon–Fri ``[1,2,3,4,5]``.
``--force`` skips both checks.
Usage:
python send_digest.py [--config PATH] [--dry-run] [--force] [--profile ID]
"""
import argparse
import json
import os
import sys
import ydb
from datetime import datetime, timezone
from pathlib import Path
_scripts = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if _scripts not in sys.path:
sys.path.insert(0, _scripts)
from github_issue_utils import canonical_team_slug, make_profile_id
# ISO weekday: Monday=1 … Sunday=7 (datetime.isoweekday())
_DEFAULT_SCHEDULE_WEEKDAYS = frozenset((1, 2, 3, 4, 5))
def _profile_schedule_weekdays(profile: dict) -> frozenset:
raw = profile.get("schedule_weekdays")
if raw is None:
return _DEFAULT_SCHEDULE_WEEKDAYS
if not isinstance(raw, list) or not raw:
return _DEFAULT_SCHEDULE_WEEKDAYS
try:
return frozenset(int(x) for x in raw)
except (TypeError, ValueError):
return _DEFAULT_SCHEDULE_WEEKDAYS
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "analytics"))
from ydb_wrapper import YDBWrapper
sys.path.insert(0, os.path.dirname(__file__))
from parse_and_send_team_issues import (
_sql_escape_literal,
get_all_team_data,
load_team_channels,
send_team_messages,
)
def _validate_profile_id(profile_id: str) -> str:
"""Expect ``branch:build_type``: exactly one ``:``, both sides non-empty (matches ``make_profile_id``)."""
if profile_id.count(":") != 1:
raise ValueError(
f"Invalid profile_id {profile_id!r}: expected exactly one ':' (branch:build_type)"
)
branch, _, build_type = profile_id.partition(":")
if not branch or not build_type:
raise ValueError(
f"Invalid profile_id {profile_id!r}: expected branch:build_type (e.g. main:relwithdebinfo)"
)
return profile_id
# ── YDB helpers ───────────────────────────────────────────────────────────────
_DIGEST_QUEUE_DDL = """\
CREATE TABLE IF NOT EXISTS `{table_path}` (
`profile_id` Utf8 NOT NULL,
`github_issue_number` Uint64 NOT NULL,
`github_issue_url` Utf8,
`github_issue_title` Utf8,
`owner_team` Utf8,
`branch` Utf8,
`build_type` Utf8,
`enqueued_at` Timestamp NOT NULL,
`sent_at` Timestamp,
PRIMARY KEY (profile_id, github_issue_number)
)
WITH (
STORE = COLUMN
)
"""
def _ensure_digest_queue_table(w: YDBWrapper) -> None:
"""Create digest_queue if missing (enqueue path also creates it; send may run first)."""
table_path = w.get_table_path("digest_queue")
print(f"> ensure table exists: {table_path}")
w.create_table(table_path, _DIGEST_QUEUE_DDL.format(table_path=table_path))
def _fetch_closed_unsent(w: YDBWrapper, profile_id: str) -> list:
"""Return unsent queue rows whose issues have been closed (should be silently marked sent)."""
_validate_profile_id(profile_id)
queue_path = w.get_table_path("digest_queue")
issues_path = w.get_table_path("issues")
return w.execute_scan_query(
f"""
SELECT
q.github_issue_number AS github_issue_number,
q.github_issue_url AS github_issue_url,
q.github_issue_title AS github_issue_title,
q.owner_team AS owner_team,
q.branch AS branch,
q.build_type AS build_type,
q.enqueued_at AS enqueued_at
FROM `{queue_path}` AS q
INNER JOIN `{issues_path}` AS i
ON q.github_issue_number = i.issue_number
WHERE q.profile_id = '{_sql_escape_literal(profile_id)}'
AND q.sent_at IS NULL
AND i.state = 'CLOSED'
""",
query_name="digest_fetch_closed_unsent",
)
def _mark_closed_as_sent(w: YDBWrapper, profile_id: str) -> None:
"""Silently mark closed issues as sent so they never appear in digest."""
closed = _fetch_closed_unsent(w, profile_id)
if closed:
now = datetime.now(tz=timezone.utc)
_mark_sent(w, profile_id, closed, now)
print(f"Marked {len(closed)} closed issue(s) as sent (skipped from digest)")
def _fetch_unsent(w: YDBWrapper, profile_id: str) -> list:
"""Return unsent digest_queue rows, excluding issues that have been closed since enqueue."""
_validate_profile_id(profile_id)
queue_path = w.get_table_path("digest_queue")
issues_path = w.get_table_path("issues")
return w.execute_scan_query(
f"""
SELECT
q.github_issue_number AS github_issue_number,
q.github_issue_url AS github_issue_url,
q.github_issue_title AS github_issue_title,
q.owner_team AS owner_team,
q.branch AS branch,
q.build_type AS build_type,
q.enqueued_at AS enqueued_at
FROM `{queue_path}` AS q
LEFT JOIN `{issues_path}` AS i
ON q.github_issue_number = i.issue_number
WHERE q.profile_id = '{_sql_escape_literal(profile_id)}'
AND q.sent_at IS NULL
AND (i.state IS NULL OR i.state != 'CLOSED')
ORDER BY q.owner_team, q.github_issue_number
""",
query_name="digest_fetch_unsent",
)
def _mark_sent(w: YDBWrapper, profile_id: str, unsent_rows: list, sent_at: datetime) -> None:
"""Mark rows as sent via UPSERT (re-writes the full row with sent_at populated).
All original columns are preserved — bulk_upsert replaces the whole row
so we must provide every column to avoid nulling out data.
"""
if not unsent_rows:
return
table_path = w.get_table_path("digest_queue")
rows = []
for r in unsent_rows:
rows.append({
'profile_id': profile_id,
'github_issue_number': int(r["github_issue_number"]),
'github_issue_url': r.get("github_issue_url") or "",
'github_issue_title': r.get("github_issue_title") or "",
'owner_team': r.get("owner_team") or "",
'branch': r.get("branch") or "",
'build_type': r.get("build_type") or "",
'enqueued_at': r.get("enqueued_at"),
'sent_at': sent_at,
})
column_types = (
ydb.BulkUpsertColumns()
.add_column('profile_id', ydb.PrimitiveType.Utf8)
.add_column('github_issue_number', ydb.PrimitiveType.Uint64)
.add_column('github_issue_url', ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column('github_issue_title', ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column('owner_team', ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column('branch', ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column('build_type', ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column('enqueued_at', ydb.PrimitiveType.Timestamp)
.add_column('sent_at', ydb.OptionalType(ydb.PrimitiveType.Timestamp))
)
w.bulk_upsert(table_path, rows, column_types)
# ── Digest logic ──────────────────────────────────────────────────────────────
def _group_by_team(rows: list) -> dict:
"""Return {team_name: [{url, title}, ...]}."""
teams: dict = {}
for row in rows:
team = canonical_team_slug(row.get("owner_team"))
teams.setdefault(team, []).append(
{
"url": row.get("github_issue_url") or "",
"title": row.get("github_issue_title") or "(no title)",
}
)
return teams
def run_digest(
profile: dict,
team_channels: dict,
bot_token: str,
dry_run: bool = False,
) -> bool:
"""Process one notification profile. Returns True on success."""
profile_id = make_profile_id(profile["branch"], profile["build_type"])
include_plots = profile.get("include_plots", False)
print(f"\n{'=' * 60}")
print(f"Profile : {profile_id}")
print(f"Branch : {profile['branch']} build_type: {profile['build_type']}")
print(f"{'=' * 60}")
with YDBWrapper() as w:
if not w.check_credentials():
return False
_ensure_digest_queue_table(w)
_mark_closed_as_sent(w, profile_id)
unsent = _fetch_unsent(w, profile_id)
print(f"Unsent issues in queue: {len(unsent)}")
if not unsent:
print("Nothing to send.")
return True
teams = _group_by_team(unsent)
print(f"Teams: {sorted(teams)}")
if dry_run:
print("[DRY RUN] Would send:")
for team, issues in sorted(teams.items()):
print(f" {team}: {len(issues)} issue(s)")
return True
muted_stats = None
all_team_data = None
try:
all_team_data = get_all_team_data(
build_type=profile["build_type"],
branch=profile["branch"],
)
if all_team_data:
muted_stats = {t: d["stats"] for t, d in all_team_data.items()}
except Exception as exc:
print(f"Warning: could not fetch muted stats: {exc}")
send_team_messages(
teams=teams,
bot_token=bot_token,
team_channels=team_channels,
muted_stats=muted_stats,
include_plots=include_plots,
ydb_config=(
{
"use_yesterday": False,
"build_type": profile["build_type"],
"branch": profile["branch"],
}
if include_plots
else None
),
all_team_data=all_team_data,
show_diff=True,
)
now = datetime.now(tz=timezone.utc)
_mark_sent(w, profile_id, unsent, now)
print(f"Marked {len(unsent)} issue(s) as sent.")
return True
# ── CLI ───────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Send batched Telegram digest for new muted-test issues"
)
parser.add_argument(
"--config",
default=".github/config/mute_issue_and_digest_config.json",
help="Path to digest profiles config JSON",
)
parser.add_argument("--bot-token", help="Telegram bot token")
parser.add_argument(
"--team-channels",
help="JSON string or file path with team channel config",
)
parser.add_argument("--dry-run", action="store_true", help="Print without sending")
parser.add_argument(
"--force",
action="store_true",
help="Run all profiles regardless of schedule",
)
parser.add_argument("--profile", help="Run only this profile ID")
args = parser.parse_args()
config_path = Path(args.config)
if not config_path.exists():
print(f"Config not found: {config_path}")
sys.exit(1)
config = json.loads(config_path.read_text())
profiles: list = config.get("profiles", [])
now_utc = datetime.now(tz=timezone.utc)
current_hour_utc = now_utc.hour
current_weekday = now_utc.isoweekday()
bot_token = args.bot_token or os.environ.get("TELEGRAM_BOT_TOKEN")
if not bot_token and not args.dry_run:
print("Error: --bot-token or TELEGRAM_BOT_TOKEN env var is required")
sys.exit(1)
team_channels_src = args.team_channels or os.environ.get("TG_TEAM_CHANNELS")
team_channels = load_team_channels(team_channels_src) if team_channels_src else None
if not team_channels:
print("Error: --team-channels or TG_TEAM_CHANNELS env var is required")
sys.exit(1)
active = []
for p in profiles:
if args.profile and make_profile_id(p["branch"], p["build_type"]) != args.profile:
continue
if args.force:
active.append(p)
continue
hours = p.get("schedule_utc_hours") or []
weekdays = _profile_schedule_weekdays(p)
if current_hour_utc in hours and current_weekday in weekdays:
active.append(p)
if not active:
print(
f"No profiles active for UTC weekday {current_weekday}, hour {current_hour_utc} — nothing to do"
)
sys.exit(0)
print(f"Active profiles: {[make_profile_id(p['branch'], p['build_type']) for p in active]}")
failed = False
for profile in active:
channels_var = profile.get("channels_var", "TG_TEAM_CHANNELS")
profile_channels_src = os.environ.get(channels_var, team_channels_src)
profile_channels = load_team_channels(profile_channels_src) or team_channels
ok = run_digest(profile, profile_channels, bot_token, dry_run=args.dry_run)
if not ok:
failed = True
sys.exit(1 if failed else 0)
if __name__ == "__main__":
main()
|