diff options
author | thegeorg <thegeorg@yandex-team.com> | 2024-02-19 02:38:52 +0300 |
---|---|---|
committer | thegeorg <thegeorg@yandex-team.com> | 2024-02-19 02:50:43 +0300 |
commit | d96fa07134c06472bfee6718b5cfd1679196fc99 (patch) | |
tree | 31ec344fa9d3ff8dc038692516b6438dfbdb8a2d /contrib/tools/python3/Lib/asyncio/runners.py | |
parent | 452cf9e068aef7110e35e654c5d47eb80111ef89 (diff) | |
download | ydb-d96fa07134c06472bfee6718b5cfd1679196fc99.tar.gz |
Sync contrib/tools/python3 layout with upstream
* Move src/ subdir contents to the top of the layout
* Rename self-written lib -> lib2 to avoid CaseFolding warning from the VCS
* Regenerate contrib/libs/python proxy-headers accordingly
4ccc62ac1511abcf0fed14ccade38e984e088f1e
Diffstat (limited to 'contrib/tools/python3/Lib/asyncio/runners.py')
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/runners.py | 215 |
1 files changed, 215 insertions, 0 deletions
diff --git a/contrib/tools/python3/Lib/asyncio/runners.py b/contrib/tools/python3/Lib/asyncio/runners.py new file mode 100644 index 0000000000..1b89236599 --- /dev/null +++ b/contrib/tools/python3/Lib/asyncio/runners.py @@ -0,0 +1,215 @@ +__all__ = ('Runner', 'run') + +import contextvars +import enum +import functools +import threading +import signal +from . import coroutines +from . import events +from . import exceptions +from . import tasks +from . import constants + +class _State(enum.Enum): + CREATED = "created" + INITIALIZED = "initialized" + CLOSED = "closed" + + +class Runner: + """A context manager that controls event loop life cycle. + + The context manager always creates a new event loop, + allows to run async functions inside it, + and properly finalizes the loop at the context manager exit. + + If debug is True, the event loop will be run in debug mode. + If loop_factory is passed, it is used for new event loop creation. + + asyncio.run(main(), debug=True) + + is a shortcut for + + with asyncio.Runner(debug=True) as runner: + runner.run(main()) + + The run() method can be called multiple times within the runner's context. + + This can be useful for interactive console (e.g. IPython), + unittest runners, console tools, -- everywhere when async code + is called from existing sync framework and where the preferred single + asyncio.run() call doesn't work. + + """ + + # Note: the class is final, it is not intended for inheritance. + + def __init__(self, *, debug=None, loop_factory=None): + self._state = _State.CREATED + self._debug = debug + self._loop_factory = loop_factory + self._loop = None + self._context = None + self._interrupt_count = 0 + self._set_event_loop = False + + def __enter__(self): + self._lazy_init() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): + """Shutdown and close event loop.""" + if self._state is not _State.INITIALIZED: + return + try: + loop = self._loop + _cancel_all_tasks(loop) + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.run_until_complete( + loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT)) + finally: + if self._set_event_loop: + events.set_event_loop(None) + loop.close() + self._loop = None + self._state = _State.CLOSED + + def get_loop(self): + """Return embedded event loop.""" + self._lazy_init() + return self._loop + + def run(self, coro, *, context=None): + """Run a coroutine inside the embedded event loop.""" + if not coroutines.iscoroutine(coro): + raise ValueError("a coroutine was expected, got {!r}".format(coro)) + + if events._get_running_loop() is not None: + # fail fast with short traceback + raise RuntimeError( + "Runner.run() cannot be called from a running event loop") + + self._lazy_init() + + if context is None: + context = self._context + task = self._loop.create_task(coro, context=context) + + if (threading.current_thread() is threading.main_thread() + and signal.getsignal(signal.SIGINT) is signal.default_int_handler + ): + sigint_handler = functools.partial(self._on_sigint, main_task=task) + try: + signal.signal(signal.SIGINT, sigint_handler) + except ValueError: + # `signal.signal` may throw if `threading.main_thread` does + # not support signals (e.g. embedded interpreter with signals + # not registered - see gh-91880) + sigint_handler = None + else: + sigint_handler = None + + self._interrupt_count = 0 + try: + return self._loop.run_until_complete(task) + except exceptions.CancelledError: + if self._interrupt_count > 0: + uncancel = getattr(task, "uncancel", None) + if uncancel is not None and uncancel() == 0: + raise KeyboardInterrupt() + raise # CancelledError + finally: + if (sigint_handler is not None + and signal.getsignal(signal.SIGINT) is sigint_handler + ): + signal.signal(signal.SIGINT, signal.default_int_handler) + + def _lazy_init(self): + if self._state is _State.CLOSED: + raise RuntimeError("Runner is closed") + if self._state is _State.INITIALIZED: + return + if self._loop_factory is None: + self._loop = events.new_event_loop() + if not self._set_event_loop: + # Call set_event_loop only once to avoid calling + # attach_loop multiple times on child watchers + events.set_event_loop(self._loop) + self._set_event_loop = True + else: + self._loop = self._loop_factory() + if self._debug is not None: + self._loop.set_debug(self._debug) + self._context = contextvars.copy_context() + self._state = _State.INITIALIZED + + def _on_sigint(self, signum, frame, main_task): + self._interrupt_count += 1 + if self._interrupt_count == 1 and not main_task.done(): + main_task.cancel() + # wakeup loop if it is blocked by select() with long timeout + self._loop.call_soon_threadsafe(lambda: None) + return + raise KeyboardInterrupt() + + +def run(main, *, debug=None, loop_factory=None): + """Execute the coroutine and return the result. + + This function runs the passed coroutine, taking care of + managing the asyncio event loop, finalizing asynchronous + generators and closing the default executor. + + This function cannot be called when another asyncio event loop is + running in the same thread. + + If debug is True, the event loop will be run in debug mode. + + This function always creates a new event loop and closes it at the end. + It should be used as a main entry point for asyncio programs, and should + ideally only be called once. + + The executor is given a timeout duration of 5 minutes to shutdown. + If the executor hasn't finished within that duration, a warning is + emitted and the executor is closed. + + Example: + + async def main(): + await asyncio.sleep(1) + print('hello') + + asyncio.run(main()) + """ + if events._get_running_loop() is not None: + # fail fast with short traceback + raise RuntimeError( + "asyncio.run() cannot be called from a running event loop") + + with Runner(debug=debug, loop_factory=loop_factory) as runner: + return runner.run(main) + + +def _cancel_all_tasks(loop): + to_cancel = tasks.all_tasks(loop) + if not to_cancel: + return + + for task in to_cancel: + task.cancel() + + loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True)) + + for task in to_cancel: + if task.cancelled(): + continue + if task.exception() is not None: + loop.call_exception_handler({ + 'message': 'unhandled exception during asyncio.run() shutdown', + 'exception': task.exception(), + 'task': task, + }) |