aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/unittest/async_case.py
blob: 65ca285d6968661945ce7c6bda95b23e10f26e45 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import asyncio 
import inspect 
 
from .case import TestCase 
 
 
class IsolatedAsyncioTestCase(TestCase): 
    # Names intentionally have a long prefix 
    # to reduce a chance of clashing with user-defined attributes 
    # from inherited test case 
    # 
    # The class doesn't call loop.run_until_complete(self.setUp()) and family 
    # but uses a different approach: 
    # 1. create a long-running task that reads self.setUp() 
    #    awaitable from queue along with a future 
    # 2. await the awaitable object passing in and set the result 
    #    into the future object 
    # 3. Outer code puts the awaitable and the future object into a queue 
    #    with waiting for the future 
    # The trick is necessary because every run_until_complete() call 
    # creates a new task with embedded ContextVar context. 
    # To share contextvars between setUp(), test and tearDown() we need to execute 
    # them inside the same task. 
 
    # Note: the test case modifies event loop policy if the policy was not instantiated 
    # yet. 
    # asyncio.get_event_loop_policy() creates a default policy on demand but never 
    # returns None 
    # I believe this is not an issue in user level tests but python itself for testing 
    # should reset a policy in every test module 
    # by calling asyncio.set_event_loop_policy(None) in tearDownModule() 
 
    def __init__(self, methodName='runTest'): 
        super().__init__(methodName) 
        self._asyncioTestLoop = None 
        self._asyncioCallsQueue = None 
 
    async def asyncSetUp(self): 
        pass 
 
    async def asyncTearDown(self): 
        pass 
 
    def addAsyncCleanup(self, func, /, *args, **kwargs): 
        # A trivial trampoline to addCleanup() 
        # the function exists because it has a different semantics 
        # and signature: 
        # addCleanup() accepts regular functions 
        # but addAsyncCleanup() accepts coroutines 
        # 
        # We intentionally don't add inspect.iscoroutinefunction() check 
        # for func argument because there is no way 
        # to check for async function reliably: 
        # 1. It can be "async def func()" itself 
        # 2. Class can implement "async def __call__()" method 
        # 3. Regular "def func()" that returns awaitable object 
        self.addCleanup(*(func, *args), **kwargs) 
 
    def _callSetUp(self): 
        self.setUp() 
        self._callAsync(self.asyncSetUp) 
 
    def _callTestMethod(self, method): 
        self._callMaybeAsync(method) 
 
    def _callTearDown(self): 
        self._callAsync(self.asyncTearDown) 
        self.tearDown() 
 
    def _callCleanup(self, function, *args, **kwargs): 
        self._callMaybeAsync(function, *args, **kwargs) 
 
    def _callAsync(self, func, /, *args, **kwargs): 
        assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' 
        ret = func(*args, **kwargs) 
        assert inspect.isawaitable(ret), f'{func!r} returned non-awaitable' 
        fut = self._asyncioTestLoop.create_future() 
        self._asyncioCallsQueue.put_nowait((fut, ret)) 
        return self._asyncioTestLoop.run_until_complete(fut) 
 
    def _callMaybeAsync(self, func, /, *args, **kwargs): 
        assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' 
        ret = func(*args, **kwargs) 
        if inspect.isawaitable(ret): 
            fut = self._asyncioTestLoop.create_future() 
            self._asyncioCallsQueue.put_nowait((fut, ret)) 
            return self._asyncioTestLoop.run_until_complete(fut) 
        else: 
            return ret 
 
    async def _asyncioLoopRunner(self, fut): 
        self._asyncioCallsQueue = queue = asyncio.Queue() 
        fut.set_result(None) 
        while True: 
            query = await queue.get() 
            queue.task_done() 
            if query is None: 
                return 
            fut, awaitable = query 
            try: 
                ret = await awaitable 
                if not fut.cancelled(): 
                    fut.set_result(ret) 
            except (SystemExit, KeyboardInterrupt): 
                raise 
            except (BaseException, asyncio.CancelledError) as ex: 
                if not fut.cancelled(): 
                    fut.set_exception(ex) 
 
    def _setupAsyncioLoop(self): 
        assert self._asyncioTestLoop is None, 'asyncio test loop already initialized' 
        loop = asyncio.new_event_loop() 
        asyncio.set_event_loop(loop) 
        loop.set_debug(True) 
        self._asyncioTestLoop = loop 
        fut = loop.create_future() 
        self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut)) 
        loop.run_until_complete(fut) 
 
    def _tearDownAsyncioLoop(self): 
        assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' 
        loop = self._asyncioTestLoop 
        self._asyncioTestLoop = None 
        self._asyncioCallsQueue.put_nowait(None) 
        loop.run_until_complete(self._asyncioCallsQueue.join()) 
 
        try: 
            # cancel all tasks 
            to_cancel = asyncio.all_tasks(loop) 
            if not to_cancel: 
                return 
 
            for task in to_cancel: 
                task.cancel() 
 
            loop.run_until_complete( 
                asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)) 
 
            for task in to_cancel: 
                if task.cancelled(): 
                    continue 
                if task.exception() is not None: 
                    loop.call_exception_handler({ 
                        'message': 'unhandled exception during test shutdown', 
                        'exception': task.exception(), 
                        'task': task, 
                    }) 
            # shutdown asyncgens 
            loop.run_until_complete(loop.shutdown_asyncgens()) 
        finally: 
            asyncio.set_event_loop(None) 
            loop.close() 
 
    def run(self, result=None): 
        self._setupAsyncioLoop() 
        try: 
            return super().run(result) 
        finally: 
            self._tearDownAsyncioLoop() 
 
    def debug(self): 
        self._setupAsyncioLoop() 
        super().debug() 
        self._tearDownAsyncioLoop() 
 
    def __del__(self): 
        if self._asyncioTestLoop is not None: 
            self._tearDownAsyncioLoop()