1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
import threading
import traceback
import warnings
from types import TracebackType
from typing import Any
from typing import Callable
from typing import Generator
from typing import Optional
from typing import Type
import pytest
# Copied from cpython/Lib/test/support/threading_helper.py, with modifications.
class catch_threading_exception:
"""Context manager catching threading.Thread exception using
threading.excepthook.
Storing exc_value using a custom hook can create a reference cycle. The
reference cycle is broken explicitly when the context manager exits.
Storing thread using a custom hook can resurrect it if it is set to an
object which is being finalized. Exiting the context manager clears the
stored object.
Usage:
with threading_helper.catch_threading_exception() as cm:
# code spawning a thread which raises an exception
...
# check the thread exception: use cm.args
...
# cm.args attribute no longer exists at this point
# (to break a reference cycle)
"""
def __init__(self) -> None:
# See https://github.com/python/typeshed/issues/4767 regarding the underscore.
self.args: Optional["threading._ExceptHookArgs"] = None
self._old_hook: Optional[Callable[["threading._ExceptHookArgs"], Any]] = None
def _hook(self, args: "threading._ExceptHookArgs") -> None:
self.args = args
def __enter__(self) -> "catch_threading_exception":
self._old_hook = threading.excepthook
threading.excepthook = self._hook
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
assert self._old_hook is not None
threading.excepthook = self._old_hook
self._old_hook = None
del self.args
def thread_exception_runtest_hook() -> Generator[None, None, None]:
with catch_threading_exception() as cm:
yield
if cm.args:
if cm.args.thread is not None:
thread_name = cm.args.thread.name
else:
thread_name = "<unknown>"
msg = f"Exception in thread {thread_name}\n\n"
msg += "".join(
traceback.format_exception(
cm.args.exc_type, cm.args.exc_value, cm.args.exc_traceback,
)
)
warnings.warn(pytest.PytestUnhandledThreadExceptionWarning(msg))
@pytest.hookimpl(hookwrapper=True, trylast=True)
def pytest_runtest_setup() -> Generator[None, None, None]:
yield from thread_exception_runtest_hook()
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_call() -> Generator[None, None, None]:
yield from thread_exception_runtest_hook()
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_teardown() -> Generator[None, None, None]:
yield from thread_exception_runtest_hook()
|