diff options
| author | orivej <[email protected]> | 2022-02-10 16:44:49 +0300 |
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:44:49 +0300 |
| commit | 718c552901d703c502ccbefdfc3c9028d608b947 (patch) | |
| tree | 46534a98bbefcd7b1f3faa5b52c138ab27db75b7 /contrib/tools/python3/src/Lib/queue.py | |
| parent | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/queue.py')
| -rw-r--r-- | contrib/tools/python3/src/Lib/queue.py | 640 |
1 files changed, 320 insertions, 320 deletions
diff --git a/contrib/tools/python3/src/Lib/queue.py b/contrib/tools/python3/src/Lib/queue.py index 10dbcbc18ec..13b8c263bf8 100644 --- a/contrib/tools/python3/src/Lib/queue.py +++ b/contrib/tools/python3/src/Lib/queue.py @@ -1,326 +1,326 @@ -'''A multi-producer, multi-consumer queue.''' - -import threading +'''A multi-producer, multi-consumer queue.''' + +import threading import types -from collections import deque -from heapq import heappush, heappop -from time import monotonic as time -try: - from _queue import SimpleQueue -except ImportError: - SimpleQueue = None - -__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue'] - - -try: - from _queue import Empty +from collections import deque +from heapq import heappush, heappop +from time import monotonic as time +try: + from _queue import SimpleQueue +except ImportError: + SimpleQueue = None + +__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue'] + + +try: + from _queue import Empty except ImportError: - class Empty(Exception): - 'Exception raised by Queue.get(block=0)/get_nowait().' - pass - -class Full(Exception): - 'Exception raised by Queue.put(block=0)/put_nowait().' - pass - - -class Queue: - '''Create a queue object with a given maximum size. - - If maxsize is <= 0, the queue size is infinite. - ''' - - def __init__(self, maxsize=0): - self.maxsize = maxsize - self._init(maxsize) - - # mutex must be held whenever the queue is mutating. All methods - # that acquire mutex must release it before returning. mutex - # is shared between the three conditions, so acquiring and - # releasing the conditions also acquires and releases mutex. - self.mutex = threading.Lock() - - # Notify not_empty whenever an item is added to the queue; a - # thread waiting to get is notified then. - self.not_empty = threading.Condition(self.mutex) - - # Notify not_full whenever an item is removed from the queue; - # a thread waiting to put is notified then. - self.not_full = threading.Condition(self.mutex) - - # Notify all_tasks_done whenever the number of unfinished tasks - # drops to zero; thread waiting to join() is notified to resume - self.all_tasks_done = threading.Condition(self.mutex) - self.unfinished_tasks = 0 - - def task_done(self): - '''Indicate that a formerly enqueued task is complete. - - Used by Queue consumer threads. For each get() used to fetch a task, - a subsequent call to task_done() tells the queue that the processing - on the task is complete. - - If a join() is currently blocking, it will resume when all items - have been processed (meaning that a task_done() call was received - for every item that had been put() into the queue). - - Raises a ValueError if called more times than there were items - placed in the queue. - ''' - with self.all_tasks_done: - unfinished = self.unfinished_tasks - 1 - if unfinished <= 0: - if unfinished < 0: - raise ValueError('task_done() called too many times') - self.all_tasks_done.notify_all() - self.unfinished_tasks = unfinished - - def join(self): - '''Blocks until all items in the Queue have been gotten and processed. - - The count of unfinished tasks goes up whenever an item is added to the - queue. The count goes down whenever a consumer thread calls task_done() - to indicate the item was retrieved and all work on it is complete. - - When the count of unfinished tasks drops to zero, join() unblocks. - ''' - with self.all_tasks_done: - while self.unfinished_tasks: - self.all_tasks_done.wait() - - def qsize(self): - '''Return the approximate size of the queue (not reliable!).''' - with self.mutex: - return self._qsize() - - def empty(self): - '''Return True if the queue is empty, False otherwise (not reliable!). - - This method is likely to be removed at some point. Use qsize() == 0 - as a direct substitute, but be aware that either approach risks a race - condition where a queue can grow before the result of empty() or - qsize() can be used. - - To create code that needs to wait for all queued tasks to be - completed, the preferred technique is to use the join() method. - ''' - with self.mutex: - return not self._qsize() - - def full(self): - '''Return True if the queue is full, False otherwise (not reliable!). - - This method is likely to be removed at some point. Use qsize() >= n - as a direct substitute, but be aware that either approach risks a race - condition where a queue can shrink before the result of full() or - qsize() can be used. - ''' - with self.mutex: - return 0 < self.maxsize <= self._qsize() - - def put(self, item, block=True, timeout=None): - '''Put an item into the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until a free slot is available. If 'timeout' is - a non-negative number, it blocks at most 'timeout' seconds and raises - the Full exception if no free slot was available within that time. - Otherwise ('block' is false), put an item on the queue if a free slot - is immediately available, else raise the Full exception ('timeout' - is ignored in that case). - ''' - with self.not_full: - if self.maxsize > 0: - if not block: - if self._qsize() >= self.maxsize: - raise Full - elif timeout is None: - while self._qsize() >= self.maxsize: - self.not_full.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a non-negative number") - else: - endtime = time() + timeout - while self._qsize() >= self.maxsize: - remaining = endtime - time() - if remaining <= 0.0: - raise Full - self.not_full.wait(remaining) - self._put(item) - self.unfinished_tasks += 1 - self.not_empty.notify() - - def get(self, block=True, timeout=None): - '''Remove and return an item from the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until an item is available. If 'timeout' is - a non-negative number, it blocks at most 'timeout' seconds and raises - the Empty exception if no item was available within that time. - Otherwise ('block' is false), return an item if one is immediately - available, else raise the Empty exception ('timeout' is ignored - in that case). - ''' - with self.not_empty: - if not block: - if not self._qsize(): - raise Empty - elif timeout is None: - while not self._qsize(): - self.not_empty.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a non-negative number") - else: - endtime = time() + timeout - while not self._qsize(): - remaining = endtime - time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) - item = self._get() - self.not_full.notify() - return item - - def put_nowait(self, item): - '''Put an item into the queue without blocking. - - Only enqueue the item if a free slot is immediately available. - Otherwise raise the Full exception. - ''' - return self.put(item, block=False) - - def get_nowait(self): - '''Remove and return an item from the queue without blocking. - - Only get an item if one is immediately available. Otherwise - raise the Empty exception. - ''' - return self.get(block=False) - - # Override these methods to implement other queue organizations - # (e.g. stack or priority queue). - # These will only be called with appropriate locks held - - # Initialize the queue representation - def _init(self, maxsize): - self.queue = deque() - - def _qsize(self): - return len(self.queue) - - # Put a new item in the queue - def _put(self, item): - self.queue.append(item) - - # Get an item from the queue - def _get(self): - return self.queue.popleft() - + class Empty(Exception): + 'Exception raised by Queue.get(block=0)/get_nowait().' + pass + +class Full(Exception): + 'Exception raised by Queue.put(block=0)/put_nowait().' + pass + + +class Queue: + '''Create a queue object with a given maximum size. + + If maxsize is <= 0, the queue size is infinite. + ''' + + def __init__(self, maxsize=0): + self.maxsize = maxsize + self._init(maxsize) + + # mutex must be held whenever the queue is mutating. All methods + # that acquire mutex must release it before returning. mutex + # is shared between the three conditions, so acquiring and + # releasing the conditions also acquires and releases mutex. + self.mutex = threading.Lock() + + # Notify not_empty whenever an item is added to the queue; a + # thread waiting to get is notified then. + self.not_empty = threading.Condition(self.mutex) + + # Notify not_full whenever an item is removed from the queue; + # a thread waiting to put is notified then. + self.not_full = threading.Condition(self.mutex) + + # Notify all_tasks_done whenever the number of unfinished tasks + # drops to zero; thread waiting to join() is notified to resume + self.all_tasks_done = threading.Condition(self.mutex) + self.unfinished_tasks = 0 + + def task_done(self): + '''Indicate that a formerly enqueued task is complete. + + Used by Queue consumer threads. For each get() used to fetch a task, + a subsequent call to task_done() tells the queue that the processing + on the task is complete. + + If a join() is currently blocking, it will resume when all items + have been processed (meaning that a task_done() call was received + for every item that had been put() into the queue). + + Raises a ValueError if called more times than there were items + placed in the queue. + ''' + with self.all_tasks_done: + unfinished = self.unfinished_tasks - 1 + if unfinished <= 0: + if unfinished < 0: + raise ValueError('task_done() called too many times') + self.all_tasks_done.notify_all() + self.unfinished_tasks = unfinished + + def join(self): + '''Blocks until all items in the Queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer thread calls task_done() + to indicate the item was retrieved and all work on it is complete. + + When the count of unfinished tasks drops to zero, join() unblocks. + ''' + with self.all_tasks_done: + while self.unfinished_tasks: + self.all_tasks_done.wait() + + def qsize(self): + '''Return the approximate size of the queue (not reliable!).''' + with self.mutex: + return self._qsize() + + def empty(self): + '''Return True if the queue is empty, False otherwise (not reliable!). + + This method is likely to be removed at some point. Use qsize() == 0 + as a direct substitute, but be aware that either approach risks a race + condition where a queue can grow before the result of empty() or + qsize() can be used. + + To create code that needs to wait for all queued tasks to be + completed, the preferred technique is to use the join() method. + ''' + with self.mutex: + return not self._qsize() + + def full(self): + '''Return True if the queue is full, False otherwise (not reliable!). + + This method is likely to be removed at some point. Use qsize() >= n + as a direct substitute, but be aware that either approach risks a race + condition where a queue can shrink before the result of full() or + qsize() can be used. + ''' + with self.mutex: + return 0 < self.maxsize <= self._qsize() + + def put(self, item, block=True, timeout=None): + '''Put an item into the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Full exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the Full exception ('timeout' + is ignored in that case). + ''' + with self.not_full: + if self.maxsize > 0: + if not block: + if self._qsize() >= self.maxsize: + raise Full + elif timeout is None: + while self._qsize() >= self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while self._qsize() >= self.maxsize: + remaining = endtime - time() + if remaining <= 0.0: + raise Full + self.not_full.wait(remaining) + self._put(item) + self.unfinished_tasks += 1 + self.not_empty.notify() + + def get(self, block=True, timeout=None): + '''Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + ''' + with self.not_empty: + if not block: + if not self._qsize(): + raise Empty + elif timeout is None: + while not self._qsize(): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while not self._qsize(): + remaining = endtime - time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + item = self._get() + self.not_full.notify() + return item + + def put_nowait(self, item): + '''Put an item into the queue without blocking. + + Only enqueue the item if a free slot is immediately available. + Otherwise raise the Full exception. + ''' + return self.put(item, block=False) + + def get_nowait(self): + '''Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + ''' + return self.get(block=False) + + # Override these methods to implement other queue organizations + # (e.g. stack or priority queue). + # These will only be called with appropriate locks held + + # Initialize the queue representation + def _init(self, maxsize): + self.queue = deque() + + def _qsize(self): + return len(self.queue) + + # Put a new item in the queue + def _put(self, item): + self.queue.append(item) + + # Get an item from the queue + def _get(self): + return self.queue.popleft() + __class_getitem__ = classmethod(types.GenericAlias) + - -class PriorityQueue(Queue): - '''Variant of Queue that retrieves open entries in priority order (lowest first). - - Entries are typically tuples of the form: (priority number, data). - ''' - - def _init(self, maxsize): - self.queue = [] - - def _qsize(self): - return len(self.queue) - - def _put(self, item): - heappush(self.queue, item) - - def _get(self): - return heappop(self.queue) - - -class LifoQueue(Queue): - '''Variant of Queue that retrieves most recently added entries first.''' - - def _init(self, maxsize): - self.queue = [] - - def _qsize(self): - return len(self.queue) - - def _put(self, item): - self.queue.append(item) - - def _get(self): - return self.queue.pop() - - -class _PySimpleQueue: - '''Simple, unbounded FIFO queue. - - This pure Python implementation is not reentrant. - ''' - # Note: while this pure Python version provides fairness - # (by using a threading.Semaphore which is itself fair, being based - # on threading.Condition), fairness is not part of the API contract. - # This allows the C version to use a different implementation. - - def __init__(self): - self._queue = deque() - self._count = threading.Semaphore(0) - - def put(self, item, block=True, timeout=None): - '''Put the item on the queue. - - The optional 'block' and 'timeout' arguments are ignored, as this method - never blocks. They are provided for compatibility with the Queue class. - ''' - self._queue.append(item) - self._count.release() - - def get(self, block=True, timeout=None): - '''Remove and return an item from the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until an item is available. If 'timeout' is - a non-negative number, it blocks at most 'timeout' seconds and raises - the Empty exception if no item was available within that time. - Otherwise ('block' is false), return an item if one is immediately - available, else raise the Empty exception ('timeout' is ignored - in that case). - ''' - if timeout is not None and timeout < 0: - raise ValueError("'timeout' must be a non-negative number") - if not self._count.acquire(block, timeout): - raise Empty - return self._queue.popleft() - - def put_nowait(self, item): - '''Put an item into the queue without blocking. - - This is exactly equivalent to `put(item)` and is only provided - for compatibility with the Queue class. - ''' - return self.put(item, block=False) - - def get_nowait(self): - '''Remove and return an item from the queue without blocking. - - Only get an item if one is immediately available. Otherwise - raise the Empty exception. - ''' - return self.get(block=False) - - def empty(self): - '''Return True if the queue is empty, False otherwise (not reliable!).''' - return len(self._queue) == 0 - - def qsize(self): - '''Return the approximate size of the queue (not reliable!).''' - return len(self._queue) - +class PriorityQueue(Queue): + '''Variant of Queue that retrieves open entries in priority order (lowest first). + + Entries are typically tuples of the form: (priority number, data). + ''' + + def _init(self, maxsize): + self.queue = [] + + def _qsize(self): + return len(self.queue) + + def _put(self, item): + heappush(self.queue, item) + + def _get(self): + return heappop(self.queue) + + +class LifoQueue(Queue): + '''Variant of Queue that retrieves most recently added entries first.''' + + def _init(self, maxsize): + self.queue = [] + + def _qsize(self): + return len(self.queue) + + def _put(self, item): + self.queue.append(item) + + def _get(self): + return self.queue.pop() + + +class _PySimpleQueue: + '''Simple, unbounded FIFO queue. + + This pure Python implementation is not reentrant. + ''' + # Note: while this pure Python version provides fairness + # (by using a threading.Semaphore which is itself fair, being based + # on threading.Condition), fairness is not part of the API contract. + # This allows the C version to use a different implementation. + + def __init__(self): + self._queue = deque() + self._count = threading.Semaphore(0) + + def put(self, item, block=True, timeout=None): + '''Put the item on the queue. + + The optional 'block' and 'timeout' arguments are ignored, as this method + never blocks. They are provided for compatibility with the Queue class. + ''' + self._queue.append(item) + self._count.release() + + def get(self, block=True, timeout=None): + '''Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + ''' + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + if not self._count.acquire(block, timeout): + raise Empty + return self._queue.popleft() + + def put_nowait(self, item): + '''Put an item into the queue without blocking. + + This is exactly equivalent to `put(item)` and is only provided + for compatibility with the Queue class. + ''' + return self.put(item, block=False) + + def get_nowait(self): + '''Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + ''' + return self.get(block=False) + + def empty(self): + '''Return True if the queue is empty, False otherwise (not reliable!).''' + return len(self._queue) == 0 + + def qsize(self): + '''Return the approximate size of the queue (not reliable!).''' + return len(self._queue) + __class_getitem__ = classmethod(types.GenericAlias) + - -if SimpleQueue is None: - SimpleQueue = _PySimpleQueue +if SimpleQueue is None: + SimpleQueue = _PySimpleQueue |
