aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py')
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py74
1 files changed, 74 insertions, 0 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py
new file mode 100644
index 0000000000..7ab3c28673
--- /dev/null
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py
@@ -0,0 +1,74 @@
+"""
+Implementation for async generators.
+"""
+from asyncio import Queue
+from typing import AsyncGenerator, Callable, Iterable, TypeVar, Union
+
+from .utils import get_event_loop, run_in_executor_with_context
+
+__all__ = [
+ "generator_to_async_generator",
+]
+
+
+_T = TypeVar("_T")
+
+
+class _Done:
+ pass
+
+
+async def generator_to_async_generator(
+ get_iterable: Callable[[], Iterable[_T]]
+) -> AsyncGenerator[_T, None]:
+ """
+ Turn a generator or iterable into an async generator.
+
+ This works by running the generator in a background thread.
+
+ :param get_iterable: Function that returns a generator or iterable when
+ called.
+ """
+ quitting = False
+ _done = _Done()
+ q: Queue[Union[_T, _Done]] = Queue()
+ loop = get_event_loop()
+
+ def runner() -> None:
+ """
+ Consume the generator in background thread.
+ When items are received, they'll be pushed to the queue.
+ """
+ try:
+ for item in get_iterable():
+ # When this async generator was cancelled (closed), stop this
+ # thread.
+ if quitting:
+ break
+
+ loop.call_soon_threadsafe(q.put_nowait, item)
+
+ finally:
+ loop.call_soon_threadsafe(q.put_nowait, _done)
+
+ # Start background thread.
+ runner_f = run_in_executor_with_context(runner)
+
+ try:
+ while True:
+ item = await q.get()
+ if isinstance(item, _Done):
+ break
+ else:
+ yield item
+ finally:
+ # When this async generator is closed (GeneratorExit exception, stop
+ # the background thread as well. - we don't need that anymore.)
+ quitting = True
+
+ # Wait for the background thread to finish. (should happen right after
+ # the next item is yielded). If we don't do this, and the event loop
+ # gets closed before the runner is done, then we'll get a
+ # `RuntimeError: Event loop is closed` exception printed to stdout that
+ # we can't handle.
+ await runner_f