aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/prompt-toolkit/py3/prompt_toolkit/eventloop/async_generator.py
blob: 7ab3c286738fd2b229b019ef554b9e067d901841 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
Implementation for async generators.
"""
from asyncio import Queue
from typing import AsyncGenerator, Callable, Iterable, TypeVar, Union

from .utils import get_event_loop, run_in_executor_with_context

__all__ = [
    "generator_to_async_generator",
]


_T = TypeVar("_T")


class _Done:
    pass


async def generator_to_async_generator(
    get_iterable: Callable[[], Iterable[_T]]
) -> AsyncGenerator[_T, None]:
    """
    Turn a generator or iterable into an async generator.

    This works by running the generator in a background thread.

    :param get_iterable: Function that returns a generator or iterable when
        called.
    """
    quitting = False
    _done = _Done()
    q: Queue[Union[_T, _Done]] = Queue()
    loop = get_event_loop()

    def runner() -> None:
        """
        Consume the generator in background thread.
        When items are received, they'll be pushed to the queue.
        """
        try:
            for item in get_iterable():
                # When this async generator was cancelled (closed), stop this
                # thread.
                if quitting:
                    break

                loop.call_soon_threadsafe(q.put_nowait, item)

        finally:
            loop.call_soon_threadsafe(q.put_nowait, _done)

    # Start background thread.
    runner_f = run_in_executor_with_context(runner)

    try:
        while True:
            item = await q.get()
            if isinstance(item, _Done):
                break
            else:
                yield item
    finally:
        # When this async generator is closed (GeneratorExit exception, stop
        # the background thread as well. - we don't need that anymore.)
        quitting = True

        # Wait for the background thread to finish. (should happen right after
        # the next item is yielded). If we don't do this, and the event loop
        # gets closed before the runner is done, then we'll get a
        # `RuntimeError: Event loop is closed` exception printed to stdout that
        # we can't handle.
        await runner_f