diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
commit | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch) | |
tree | 64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py | |
parent | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff) | |
download | ydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py')
-rw-r--r-- | contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py | 148 |
1 files changed, 74 insertions, 74 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py index dd520d1bc1..7ab3c28673 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py @@ -1,74 +1,74 @@ -""" -Implementation for async generators. -""" -from asyncio import Queue -from typing import AsyncGenerator, Callable, Iterable, TypeVar, Union - -from .utils import get_event_loop, run_in_executor_with_context - -__all__ = [ - "generator_to_async_generator", -] - - -_T = TypeVar("_T") - - -class _Done: - pass - - -async def generator_to_async_generator( - get_iterable: Callable[[], Iterable[_T]] -) -> AsyncGenerator[_T, None]: - """ - Turn a generator or iterable into an async generator. - - This works by running the generator in a background thread. - - :param get_iterable: Function that returns a generator or iterable when - called. - """ - quitting = False - _done = _Done() - q: Queue[Union[_T, _Done]] = Queue() - loop = get_event_loop() - - def runner() -> None: - """ - Consume the generator in background thread. - When items are received, they'll be pushed to the queue. - """ - try: - for item in get_iterable(): - # When this async generator was cancelled (closed), stop this - # thread. - if quitting: - break - - loop.call_soon_threadsafe(q.put_nowait, item) - - finally: - loop.call_soon_threadsafe(q.put_nowait, _done) - - # Start background thread. - runner_f = run_in_executor_with_context(runner) - - try: - while True: - item = await q.get() - if isinstance(item, _Done): - break - else: - yield item - finally: - # When this async generator is closed (GeneratorExit exception, stop - # the background thread as well. - we don't need that anymore.) - quitting = True - - # Wait for the background thread to finish. (should happen right after - # the next item is yielded). If we don't do this, and the event loop - # gets closed before the runner is done, then we'll get a - # `RuntimeError: Event loop is closed` exception printed to stdout that - # we can't handle. - await runner_f +""" +Implementation for async generators. +""" +from asyncio import Queue +from typing import AsyncGenerator, Callable, Iterable, TypeVar, Union + +from .utils import get_event_loop, run_in_executor_with_context + +__all__ = [ + "generator_to_async_generator", +] + + +_T = TypeVar("_T") + + +class _Done: + pass + + +async def generator_to_async_generator( + get_iterable: Callable[[], Iterable[_T]] +) -> AsyncGenerator[_T, None]: + """ + Turn a generator or iterable into an async generator. + + This works by running the generator in a background thread. + + :param get_iterable: Function that returns a generator or iterable when + called. + """ + quitting = False + _done = _Done() + q: Queue[Union[_T, _Done]] = Queue() + loop = get_event_loop() + + def runner() -> None: + """ + Consume the generator in background thread. + When items are received, they'll be pushed to the queue. + """ + try: + for item in get_iterable(): + # When this async generator was cancelled (closed), stop this + # thread. + if quitting: + break + + loop.call_soon_threadsafe(q.put_nowait, item) + + finally: + loop.call_soon_threadsafe(q.put_nowait, _done) + + # Start background thread. + runner_f = run_in_executor_with_context(runner) + + try: + while True: + item = await q.get() + if isinstance(item, _Done): + break + else: + yield item + finally: + # When this async generator is closed (GeneratorExit exception, stop + # the background thread as well. - we don't need that anymore.) + quitting = True + + # Wait for the background thread to finish. (should happen right after + # the next item is yielded). If we don't do this, and the event loop + # gets closed before the runner is done, then we'll get a + # `RuntimeError: Event loop is closed` exception printed to stdout that + # we can't handle. + await runner_f |