diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py')
-rw-r--r-- | contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py new file mode 100644 index 0000000000..7ab3c28673 --- /dev/null +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py @@ -0,0 +1,74 @@ +""" +Implementation for async generators. +""" +from asyncio import Queue +from typing import AsyncGenerator, Callable, Iterable, TypeVar, Union + +from .utils import get_event_loop, run_in_executor_with_context + +__all__ = [ + "generator_to_async_generator", +] + + +_T = TypeVar("_T") + + +class _Done: + pass + + +async def generator_to_async_generator( + get_iterable: Callable[[], Iterable[_T]] +) -> AsyncGenerator[_T, None]: + """ + Turn a generator or iterable into an async generator. + + This works by running the generator in a background thread. + + :param get_iterable: Function that returns a generator or iterable when + called. + """ + quitting = False + _done = _Done() + q: Queue[Union[_T, _Done]] = Queue() + loop = get_event_loop() + + def runner() -> None: + """ + Consume the generator in background thread. + When items are received, they'll be pushed to the queue. + """ + try: + for item in get_iterable(): + # When this async generator was cancelled (closed), stop this + # thread. + if quitting: + break + + loop.call_soon_threadsafe(q.put_nowait, item) + + finally: + loop.call_soon_threadsafe(q.put_nowait, _done) + + # Start background thread. + runner_f = run_in_executor_with_context(runner) + + try: + while True: + item = await q.get() + if isinstance(item, _Done): + break + else: + yield item + finally: + # When this async generator is closed (GeneratorExit exception, stop + # the background thread as well. - we don't need that anymore.) + quitting = True + + # Wait for the background thread to finish. (should happen right after + # the next item is yielded). If we don't do this, and the event loop + # gets closed before the runner is done, then we'll get a + # `RuntimeError: Event loop is closed` exception printed to stdout that + # we can't handle. + await runner_f |