diff options
author | thegeorg <thegeorg@yandex-team.com> | 2024-02-19 02:38:52 +0300 |
---|---|---|
committer | thegeorg <thegeorg@yandex-team.com> | 2024-02-19 02:50:43 +0300 |
commit | d96fa07134c06472bfee6718b5cfd1679196fc99 (patch) | |
tree | 31ec344fa9d3ff8dc038692516b6438dfbdb8a2d /contrib/tools/python3/Lib/asyncio/queues.py | |
parent | 452cf9e068aef7110e35e654c5d47eb80111ef89 (diff) | |
download | ydb-d96fa07134c06472bfee6718b5cfd1679196fc99.tar.gz |
Sync contrib/tools/python3 layout with upstream
* Move src/ subdir contents to the top of the layout
* Rename self-written lib -> lib2 to avoid CaseFolding warning from the VCS
* Regenerate contrib/libs/python proxy-headers accordingly
4ccc62ac1511abcf0fed14ccade38e984e088f1e
Diffstat (limited to 'contrib/tools/python3/Lib/asyncio/queues.py')
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/queues.py | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/contrib/tools/python3/Lib/asyncio/queues.py b/contrib/tools/python3/Lib/asyncio/queues.py new file mode 100644 index 0000000000..a9656a6df5 --- /dev/null +++ b/contrib/tools/python3/Lib/asyncio/queues.py @@ -0,0 +1,244 @@ +__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') + +import collections +import heapq +from types import GenericAlias + +from . import locks +from . import mixins + + +class QueueEmpty(Exception): + """Raised when Queue.get_nowait() is called on an empty Queue.""" + pass + + +class QueueFull(Exception): + """Raised when the Queue.put_nowait() method is called on a full Queue.""" + pass + + +class Queue(mixins._LoopBoundMixin): + """A queue, useful for coordinating producer and consumer coroutines. + + If maxsize is less than or equal to zero, the queue size is infinite. If it + is an integer greater than 0, then "await put()" will block when the + queue reaches maxsize, until an item is removed by get(). + + Unlike the standard library Queue, you can reliably know this Queue's size + with qsize(), since your single-threaded asyncio application won't be + interrupted between calling qsize() and doing an operation on the Queue. + """ + + def __init__(self, maxsize=0): + self._maxsize = maxsize + + # Futures. + self._getters = collections.deque() + # Futures. + self._putters = collections.deque() + self._unfinished_tasks = 0 + self._finished = locks.Event() + self._finished.set() + self._init(maxsize) + + # These three are overridable in subclasses. + + def _init(self, maxsize): + self._queue = collections.deque() + + def _get(self): + return self._queue.popleft() + + def _put(self, item): + self._queue.append(item) + + # End of the overridable methods. + + def _wakeup_next(self, waiters): + # Wake up the next waiter (if any) that isn't cancelled. + while waiters: + waiter = waiters.popleft() + if not waiter.done(): + waiter.set_result(None) + break + + def __repr__(self): + return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' + + def __str__(self): + return f'<{type(self).__name__} {self._format()}>' + + __class_getitem__ = classmethod(GenericAlias) + + def _format(self): + result = f'maxsize={self._maxsize!r}' + if getattr(self, '_queue', None): + result += f' _queue={list(self._queue)!r}' + if self._getters: + result += f' _getters[{len(self._getters)}]' + if self._putters: + result += f' _putters[{len(self._putters)}]' + if self._unfinished_tasks: + result += f' tasks={self._unfinished_tasks}' + return result + + def qsize(self): + """Number of items in the queue.""" + return len(self._queue) + + @property + def maxsize(self): + """Number of items allowed in the queue.""" + return self._maxsize + + def empty(self): + """Return True if the queue is empty, False otherwise.""" + return not self._queue + + def full(self): + """Return True if there are maxsize items in the queue. + + Note: if the Queue was initialized with maxsize=0 (the default), + then full() is never True. + """ + if self._maxsize <= 0: + return False + else: + return self.qsize() >= self._maxsize + + async def put(self, item): + """Put an item into the queue. + + Put an item into the queue. If the queue is full, wait until a free + slot is available before adding item. + """ + while self.full(): + putter = self._get_loop().create_future() + self._putters.append(putter) + try: + await putter + except: + putter.cancel() # Just in case putter is not done yet. + try: + # Clean self._putters from canceled putters. + self._putters.remove(putter) + except ValueError: + # The putter could be removed from self._putters by a + # previous get_nowait call. + pass + if not self.full() and not putter.cancelled(): + # We were woken up by get_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._putters) + raise + return self.put_nowait(item) + + def put_nowait(self, item): + """Put an item into the queue without blocking. + + If no free slot is immediately available, raise QueueFull. + """ + if self.full(): + raise QueueFull + self._put(item) + self._unfinished_tasks += 1 + self._finished.clear() + self._wakeup_next(self._getters) + + async def get(self): + """Remove and return an item from the queue. + + If queue is empty, wait until an item is available. + """ + while self.empty(): + getter = self._get_loop().create_future() + self._getters.append(getter) + try: + await getter + except: + getter.cancel() # Just in case getter is not done yet. + try: + # Clean self._getters from canceled getters. + self._getters.remove(getter) + except ValueError: + # The getter could be removed from self._getters by a + # previous put_nowait call. + pass + if not self.empty() and not getter.cancelled(): + # We were woken up by put_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._getters) + raise + return self.get_nowait() + + def get_nowait(self): + """Remove and return an item from the queue. + + Return an item if one is immediately available, else raise QueueEmpty. + """ + if self.empty(): + raise QueueEmpty + item = self._get() + self._wakeup_next(self._putters) + return item + + def task_done(self): + """Indicate that a formerly enqueued task is complete. + + Used by queue consumers. For each get() used to fetch a task, + a subsequent call to task_done() tells the queue that the processing + on the task is complete. + + If a join() is currently blocking, it will resume when all items have + been processed (meaning that a task_done() call was received for every + item that had been put() into the queue). + + Raises ValueError if called more times than there were items placed in + the queue. + """ + if self._unfinished_tasks <= 0: + raise ValueError('task_done() called too many times') + self._unfinished_tasks -= 1 + if self._unfinished_tasks == 0: + self._finished.set() + + async def join(self): + """Block until all items in the queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer calls task_done() to + indicate that the item was retrieved and all work on it is complete. + When the count of unfinished tasks drops to zero, join() unblocks. + """ + if self._unfinished_tasks > 0: + await self._finished.wait() + + +class PriorityQueue(Queue): + """A subclass of Queue; retrieves entries in priority order (lowest first). + + Entries are typically tuples of the form: (priority number, data). + """ + + def _init(self, maxsize): + self._queue = [] + + def _put(self, item, heappush=heapq.heappush): + heappush(self._queue, item) + + def _get(self, heappop=heapq.heappop): + return heappop(self._queue) + + +class LifoQueue(Queue): + """A subclass of Queue that retrieves most recently added entries first.""" + + def _init(self, maxsize): + self._queue = [] + + def _put(self, item): + self._queue.append(item) + + def _get(self): + return self._queue.pop() |