aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/Lib/asyncio/queues.py
diff options
context:
space:
mode:
authorthegeorg <thegeorg@yandex-team.com>2024-02-19 02:38:52 +0300
committerthegeorg <thegeorg@yandex-team.com>2024-02-19 02:50:43 +0300
commitd96fa07134c06472bfee6718b5cfd1679196fc99 (patch)
tree31ec344fa9d3ff8dc038692516b6438dfbdb8a2d /contrib/tools/python3/Lib/asyncio/queues.py
parent452cf9e068aef7110e35e654c5d47eb80111ef89 (diff)
downloadydb-d96fa07134c06472bfee6718b5cfd1679196fc99.tar.gz
Sync contrib/tools/python3 layout with upstream
* Move src/ subdir contents to the top of the layout * Rename self-written lib -> lib2 to avoid CaseFolding warning from the VCS * Regenerate contrib/libs/python proxy-headers accordingly 4ccc62ac1511abcf0fed14ccade38e984e088f1e
Diffstat (limited to 'contrib/tools/python3/Lib/asyncio/queues.py')
-rw-r--r--contrib/tools/python3/Lib/asyncio/queues.py244
1 files changed, 244 insertions, 0 deletions
diff --git a/contrib/tools/python3/Lib/asyncio/queues.py b/contrib/tools/python3/Lib/asyncio/queues.py
new file mode 100644
index 0000000000..a9656a6df5
--- /dev/null
+++ b/contrib/tools/python3/Lib/asyncio/queues.py
@@ -0,0 +1,244 @@
+__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
+
+import collections
+import heapq
+from types import GenericAlias
+
+from . import locks
+from . import mixins
+
+
+class QueueEmpty(Exception):
+ """Raised when Queue.get_nowait() is called on an empty Queue."""
+ pass
+
+
+class QueueFull(Exception):
+ """Raised when the Queue.put_nowait() method is called on a full Queue."""
+ pass
+
+
+class Queue(mixins._LoopBoundMixin):
+ """A queue, useful for coordinating producer and consumer coroutines.
+
+ If maxsize is less than or equal to zero, the queue size is infinite. If it
+ is an integer greater than 0, then "await put()" will block when the
+ queue reaches maxsize, until an item is removed by get().
+
+ Unlike the standard library Queue, you can reliably know this Queue's size
+ with qsize(), since your single-threaded asyncio application won't be
+ interrupted between calling qsize() and doing an operation on the Queue.
+ """
+
+ def __init__(self, maxsize=0):
+ self._maxsize = maxsize
+
+ # Futures.
+ self._getters = collections.deque()
+ # Futures.
+ self._putters = collections.deque()
+ self._unfinished_tasks = 0
+ self._finished = locks.Event()
+ self._finished.set()
+ self._init(maxsize)
+
+ # These three are overridable in subclasses.
+
+ def _init(self, maxsize):
+ self._queue = collections.deque()
+
+ def _get(self):
+ return self._queue.popleft()
+
+ def _put(self, item):
+ self._queue.append(item)
+
+ # End of the overridable methods.
+
+ def _wakeup_next(self, waiters):
+ # Wake up the next waiter (if any) that isn't cancelled.
+ while waiters:
+ waiter = waiters.popleft()
+ if not waiter.done():
+ waiter.set_result(None)
+ break
+
+ def __repr__(self):
+ return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
+
+ def __str__(self):
+ return f'<{type(self).__name__} {self._format()}>'
+
+ __class_getitem__ = classmethod(GenericAlias)
+
+ def _format(self):
+ result = f'maxsize={self._maxsize!r}'
+ if getattr(self, '_queue', None):
+ result += f' _queue={list(self._queue)!r}'
+ if self._getters:
+ result += f' _getters[{len(self._getters)}]'
+ if self._putters:
+ result += f' _putters[{len(self._putters)}]'
+ if self._unfinished_tasks:
+ result += f' tasks={self._unfinished_tasks}'
+ return result
+
+ def qsize(self):
+ """Number of items in the queue."""
+ return len(self._queue)
+
+ @property
+ def maxsize(self):
+ """Number of items allowed in the queue."""
+ return self._maxsize
+
+ def empty(self):
+ """Return True if the queue is empty, False otherwise."""
+ return not self._queue
+
+ def full(self):
+ """Return True if there are maxsize items in the queue.
+
+ Note: if the Queue was initialized with maxsize=0 (the default),
+ then full() is never True.
+ """
+ if self._maxsize <= 0:
+ return False
+ else:
+ return self.qsize() >= self._maxsize
+
+ async def put(self, item):
+ """Put an item into the queue.
+
+ Put an item into the queue. If the queue is full, wait until a free
+ slot is available before adding item.
+ """
+ while self.full():
+ putter = self._get_loop().create_future()
+ self._putters.append(putter)
+ try:
+ await putter
+ except:
+ putter.cancel() # Just in case putter is not done yet.
+ try:
+ # Clean self._putters from canceled putters.
+ self._putters.remove(putter)
+ except ValueError:
+ # The putter could be removed from self._putters by a
+ # previous get_nowait call.
+ pass
+ if not self.full() and not putter.cancelled():
+ # We were woken up by get_nowait(), but can't take
+ # the call. Wake up the next in line.
+ self._wakeup_next(self._putters)
+ raise
+ return self.put_nowait(item)
+
+ def put_nowait(self, item):
+ """Put an item into the queue without blocking.
+
+ If no free slot is immediately available, raise QueueFull.
+ """
+ if self.full():
+ raise QueueFull
+ self._put(item)
+ self._unfinished_tasks += 1
+ self._finished.clear()
+ self._wakeup_next(self._getters)
+
+ async def get(self):
+ """Remove and return an item from the queue.
+
+ If queue is empty, wait until an item is available.
+ """
+ while self.empty():
+ getter = self._get_loop().create_future()
+ self._getters.append(getter)
+ try:
+ await getter
+ except:
+ getter.cancel() # Just in case getter is not done yet.
+ try:
+ # Clean self._getters from canceled getters.
+ self._getters.remove(getter)
+ except ValueError:
+ # The getter could be removed from self._getters by a
+ # previous put_nowait call.
+ pass
+ if not self.empty() and not getter.cancelled():
+ # We were woken up by put_nowait(), but can't take
+ # the call. Wake up the next in line.
+ self._wakeup_next(self._getters)
+ raise
+ return self.get_nowait()
+
+ def get_nowait(self):
+ """Remove and return an item from the queue.
+
+ Return an item if one is immediately available, else raise QueueEmpty.
+ """
+ if self.empty():
+ raise QueueEmpty
+ item = self._get()
+ self._wakeup_next(self._putters)
+ return item
+
+ def task_done(self):
+ """Indicate that a formerly enqueued task is complete.
+
+ Used by queue consumers. For each get() used to fetch a task,
+ a subsequent call to task_done() tells the queue that the processing
+ on the task is complete.
+
+ If a join() is currently blocking, it will resume when all items have
+ been processed (meaning that a task_done() call was received for every
+ item that had been put() into the queue).
+
+ Raises ValueError if called more times than there were items placed in
+ the queue.
+ """
+ if self._unfinished_tasks <= 0:
+ raise ValueError('task_done() called too many times')
+ self._unfinished_tasks -= 1
+ if self._unfinished_tasks == 0:
+ self._finished.set()
+
+ async def join(self):
+ """Block until all items in the queue have been gotten and processed.
+
+ The count of unfinished tasks goes up whenever an item is added to the
+ queue. The count goes down whenever a consumer calls task_done() to
+ indicate that the item was retrieved and all work on it is complete.
+ When the count of unfinished tasks drops to zero, join() unblocks.
+ """
+ if self._unfinished_tasks > 0:
+ await self._finished.wait()
+
+
+class PriorityQueue(Queue):
+ """A subclass of Queue; retrieves entries in priority order (lowest first).
+
+ Entries are typically tuples of the form: (priority number, data).
+ """
+
+ def _init(self, maxsize):
+ self._queue = []
+
+ def _put(self, item, heappush=heapq.heappush):
+ heappush(self._queue, item)
+
+ def _get(self, heappop=heapq.heappop):
+ return heappop(self._queue)
+
+
+class LifoQueue(Queue):
+ """A subclass of Queue that retrieves most recently added entries first."""
+
+ def _init(self, maxsize):
+ self._queue = []
+
+ def _put(self, item):
+ self._queue.append(item)
+
+ def _get(self):
+ return self._queue.pop()