1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
import datetime
import os
import requests
import argparse
from telegram.send_telegram_message import send_telegram_message
def get_alert_logins() -> str:
logins = os.getenv('GH_ALERTS_TG_LOGINS')
return logins.strip() if logins else "@empEfarinov"
def str_to_date(str):
return datetime.datetime.strptime(str, '%Y-%m-%dT%H:%M:%SZ')
def get_workflows_from_ts(owner, repo, token, ts, max_runs=50):
"""Get recent workflow runs filtered by status and event type."""
url = f"https://api.github.com/repos/{owner}/{repo}/actions/runs"
workflow_runs = []
page_size = min(100, max_runs)
while len(workflow_runs) < max_runs:
params = {
'status': 'cancelled',
'event': 'pull_request_target',
'per_page': page_size,
'page': len(workflow_runs) // page_size + 1
}
headers = {
'Accept': 'application/vnd.github.v3+json',
'Authorization': f'token {token}'
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
workflow_runs += response.json()['workflow_runs']
return list(filter(lambda run: str_to_date(run['updated_at']) >= ts, workflow_runs))
def get_workflow_jobs(owner, repo, run_id, token):
"""Get jobs for a specific workflow run."""
url = f"https://api.github.com/repos/{owner}/{repo}/actions/runs/{run_id}/jobs"
headers = {
'Accept': 'application/vnd.github.v3+json',
'Authorization': f'token {token}'
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()['jobs']
def check_logs_for_shutdown_signal(owner, repo, run_id, token):
"""Check workflow logs for the specific shutdown signal error."""
all_jobs = get_workflow_jobs(owner, repo, run_id, token)
fail = False
try:
for job in all_jobs:
if job['conclusion'] != 'cancelled':
continue
url = f"https://api.github.com/repos/{owner}/{repo}/actions/jobs/{job['id']}/logs"
headers = {
'Accept': 'application/vnd.github.v3+json',
'Authorization': f'token {token}'
}
# Get the redirect URL for logs
response = requests.get(url, headers=headers, allow_redirects=False)
if response.status_code == 302:
log_url = response.headers['Location']
log_response = requests.get(log_url)
if log_response.status_code == 200:
fail = fail or ("##[error]The runner has received a shutdown signal" in log_response.text)
return fail
except Exception as e:
print(f"Error while log reading: {e}")
return False
def main():
"""Main function to execute the workflow check."""
parser = argparse.ArgumentParser(
description='Check GitHub PR target workflows for runner shutdown errors'
)
parser.add_argument('--bot-token', help='Telegram bot token (or use TELEGRAM_BOT_TOKEN env var)')
parser.add_argument('--chat-id', help='Telegram chat ID')
parser.add_argument('--channel', help='Telegram channel ID (alternative to --chat-id)')
parser.add_argument('--thread-id', type=int, help='Telegram thread ID for group messages')
parser.add_argument('--owner', help='Repository owner')
parser.add_argument('--repo', help='Repository name')
parser.add_argument('--token', help='Github token')
parser.add_argument('--hours-delta', help='Number of hours to analyze from current timestamp', default=12, type=int)
parser.add_argument('--max-rows', help='Max number of workflow runs to analyze', default=100, type=int)
args = parser.parse_args()
# Get GitHub token from environment
token = args.token or os.getenv('GITHUB_TOKEN')
if not token:
print("Error: GITHUB_TOKEN environment variable not set")
return
# Get repo owner/name from arguments or environment
repo_full_name = os.getenv('GITHUB_REPOSITORY')
if repo_full_name:
owner, repo = repo_full_name.split('/')
elif args.owner and args.repo:
owner, repo = args.owner, args.repo
else:
parser.print_help()
return
try:
# Get recent workflow runs
current_date = datetime.datetime.now()
workflows = get_workflows_from_ts(owner, repo, token, current_date - datetime.timedelta(hours=args.hours_delta), max_runs=args.max_rows)
print(f'Got {len(workflows)} workflows created from the last {args.hours_delta} hours')
recent_workflows = sorted(workflows, key=lambda x: x['updated_at'], reverse=True)
errors = []
for workflow in recent_workflows:
if check_logs_for_shutdown_signal(owner, repo, workflow['id'], token):
print(f"\n🔴 SHUTDOWN ERROR - Workflow #{workflow['id']}")
print(f"Created: {workflow['created_at']}")
print(f"Finished: {workflow['updated_at']}")
print(f"URL: {workflow['html_url']}")
print(f"PR: {workflow['pull_requests'][0]['url'] if workflow['pull_requests'] else 'N/A'}")
errors.append({
"workflow_id": workflow['id'],
"workflow_name": workflow['name'],
"created_at": str_to_date(workflow['created_at']).strftime('%Y-%m-%d %H:%M'),
"updated_at": str_to_date(workflow['updated_at']).strftime('%Y-%m-%d %H:%M'),
"workflow_url": workflow['html_url'],
"pr_url": workflow['pull_requests'][0]['url'] if workflow['pull_requests'] else None
})
if len(errors) > 0:
bot_token = args.bot_token or os.getenv('TELEGRAM_BOT_TOKEN')
chat_id = args.channel or args.chat_id or os.getenv('TELEGRAM_CHAT_ID')
thread_id = args.thread_id or os.getenv('TELEGRAM_THREAD_ID')
message = "🚨 *RUNNER DIED DURING RUN*\n"
for error in errors:
message += f"""
• Workflow *{error['workflow_name']}* [#{error['workflow_id']}]({error['workflow_url']})
Created at: {error['created_at']}
Finished at: {error['updated_at']}"""
if error['pr_url']:
message += f"""
Linked PR: {error['pr_url']}"""
message += "\n"
message += f"""
CC {get_alert_logins()}"""
if chat_id and not chat_id.startswith('-') and len(chat_id) >= 10:
# Add -100 prefix for supergroup
chat_id = f"-100{chat_id}"
send_telegram_message(
bot_token,
chat_id,
message,
message_thread_id=thread_id)
print(f"\nFound {len(errors)} workflows with shutdown errors out of {len(recent_workflows)} checked")
except Exception as e:
print(f"Error: {str(e)}")
raise e
if __name__ == "__main__":
main()
|