summaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/Lib/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/tools/python3/Lib/queue.py')
-rw-r--r--contrib/tools/python3/Lib/queue.py59
1 files changed, 58 insertions, 1 deletions
diff --git a/contrib/tools/python3/Lib/queue.py b/contrib/tools/python3/Lib/queue.py
index 55f50088460..c0b35987654 100644
--- a/contrib/tools/python3/Lib/queue.py
+++ b/contrib/tools/python3/Lib/queue.py
@@ -10,7 +10,15 @@ try:
except ImportError:
SimpleQueue = None
-__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
+__all__ = [
+ 'Empty',
+ 'Full',
+ 'ShutDown',
+ 'Queue',
+ 'PriorityQueue',
+ 'LifoQueue',
+ 'SimpleQueue',
+]
try:
@@ -25,6 +33,10 @@ class Full(Exception):
pass
+class ShutDown(Exception):
+ '''Raised when put/get with shut-down queue.'''
+
+
class Queue:
'''Create a queue object with a given maximum size.
@@ -54,6 +66,9 @@ class Queue:
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
+ # Queue shutdown state
+ self.is_shutdown = False
+
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
@@ -129,8 +144,12 @@ class Queue:
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
+
+ Raises ShutDown if the queue has been shut down.
'''
with self.not_full:
+ if self.is_shutdown:
+ raise ShutDown
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
@@ -138,6 +157,8 @@ class Queue:
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
+ if self.is_shutdown:
+ raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
@@ -147,6 +168,8 @@ class Queue:
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
+ if self.is_shutdown:
+ raise ShutDown
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
@@ -161,14 +184,21 @@ class Queue:
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
+
+ Raises ShutDown if the queue has been shut down and is empty,
+ or if the queue has been shut down immediately.
'''
with self.not_empty:
+ if self.is_shutdown and not self._qsize():
+ raise ShutDown
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
+ if self.is_shutdown and not self._qsize():
+ raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
@@ -178,6 +208,8 @@ class Queue:
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
+ if self.is_shutdown and not self._qsize():
+ raise ShutDown
item = self._get()
self.not_full.notify()
return item
@@ -198,6 +230,31 @@ class Queue:
'''
return self.get(block=False)
+ def shutdown(self, immediate=False):
+ '''Shut-down the queue, making queue gets and puts raise ShutDown.
+
+ By default, gets will only raise once the queue is empty. Set
+ 'immediate' to True to make gets raise immediately instead.
+
+ All blocked callers of put() and get() will be unblocked.
+
+ If 'immediate', the queue is drained and unfinished tasks
+ is reduced by the number of drained tasks. If unfinished tasks
+ is reduced to zero, callers of Queue.join are unblocked.
+ '''
+ with self.mutex:
+ self.is_shutdown = True
+ if immediate:
+ while self._qsize():
+ self._get()
+ if self.unfinished_tasks > 0:
+ self.unfinished_tasks -= 1
+ # release all blocked threads in `join()`
+ self.all_tasks_done.notify_all()
+ # All getters need to re-check queue-empty to raise ShutDown
+ self.not_empty.notify_all()
+ self.not_full.notify_all()
+
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held