diff options
| author | monster <[email protected]> | 2022-07-07 14:41:37 +0300 |
|---|---|---|
| committer | monster <[email protected]> | 2022-07-07 14:41:37 +0300 |
| commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
| tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /contrib/tools/python3/src/Lib/multiprocessing/synchronize.py | |
| parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
fix ya.make
Diffstat (limited to 'contrib/tools/python3/src/Lib/multiprocessing/synchronize.py')
| -rw-r--r-- | contrib/tools/python3/src/Lib/multiprocessing/synchronize.py | 394 |
1 files changed, 0 insertions, 394 deletions
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py b/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py deleted file mode 100644 index d0be48f1fd7..00000000000 --- a/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py +++ /dev/null @@ -1,394 +0,0 @@ -# -# Module implementing synchronization primitives -# -# multiprocessing/synchronize.py -# -# Copyright (c) 2006-2008, R Oudkerk -# Licensed to PSF under a Contributor Agreement. -# - -__all__ = [ - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' - ] - -import threading -import sys -import tempfile -import _multiprocessing -import time - -from . import context -from . import process -from . import util - -# Try to import the mp.synchronize module cleanly, if it fails -# raise ImportError for platforms lacking a working sem_open implementation. -# See issue 3770 -try: - from _multiprocessing import SemLock, sem_unlink -except (ImportError): - raise ImportError("This platform lacks a functioning sem_open" + - " implementation, therefore, the required" + - " synchronization primitives needed will not" + - " function, see issue 3770.") - -# -# Constants -# - -RECURSIVE_MUTEX, SEMAPHORE = list(range(2)) -SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX - -# -# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` -# - -class SemLock(object): - - _rand = tempfile._RandomNameSequence() - - def __init__(self, kind, value, maxvalue, *, ctx): - if ctx is None: - ctx = context._default_context.get_context() - name = ctx.get_start_method() - unlink_now = sys.platform == 'win32' or name == 'fork' - for i in range(100): - try: - sl = self._semlock = _multiprocessing.SemLock( - kind, value, maxvalue, self._make_name(), - unlink_now) - except FileExistsError: - pass - else: - break - else: - raise FileExistsError('cannot find name for semaphore') - - util.debug('created semlock with handle %s' % sl.handle) - self._make_methods() - - if sys.platform != 'win32': - def _after_fork(obj): - obj._semlock._after_fork() - util.register_after_fork(self, _after_fork) - - if self._semlock.name is not None: - # We only get here if we are on Unix with forking - # disabled. When the object is garbage collected or the - # process shuts down we unlink the semaphore name - from .resource_tracker import register - register(self._semlock.name, "semaphore") - util.Finalize(self, SemLock._cleanup, (self._semlock.name,), - exitpriority=0) - - @staticmethod - def _cleanup(name): - from .resource_tracker import unregister - sem_unlink(name) - unregister(name, "semaphore") - - def _make_methods(self): - self.acquire = self._semlock.acquire - self.release = self._semlock.release - - def __enter__(self): - return self._semlock.__enter__() - - def __exit__(self, *args): - return self._semlock.__exit__(*args) - - def __getstate__(self): - context.assert_spawning(self) - sl = self._semlock - if sys.platform == 'win32': - h = context.get_spawning_popen().duplicate_for_child(sl.handle) - else: - h = sl.handle - return (h, sl.kind, sl.maxvalue, sl.name) - - def __setstate__(self, state): - self._semlock = _multiprocessing.SemLock._rebuild(*state) - util.debug('recreated blocker with handle %r' % state[0]) - self._make_methods() - - @staticmethod - def _make_name(): - return '%s-%s' % (process.current_process()._config['semprefix'], - next(SemLock._rand)) - -# -# Semaphore -# - -class Semaphore(SemLock): - - def __init__(self, value=1, *, ctx): - SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx) - - def get_value(self): - return self._semlock._get_value() - - def __repr__(self): - try: - value = self._semlock._get_value() - except Exception: - value = 'unknown' - return '<%s(value=%s)>' % (self.__class__.__name__, value) - -# -# Bounded semaphore -# - -class BoundedSemaphore(Semaphore): - - def __init__(self, value=1, *, ctx): - SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx) - - def __repr__(self): - try: - value = self._semlock._get_value() - except Exception: - value = 'unknown' - return '<%s(value=%s, maxvalue=%s)>' % \ - (self.__class__.__name__, value, self._semlock.maxvalue) - -# -# Non-recursive lock -# - -class Lock(SemLock): - - def __init__(self, *, ctx): - SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) - - def __repr__(self): - try: - if self._semlock._is_mine(): - name = process.current_process().name - if threading.current_thread().name != 'MainThread': - name += '|' + threading.current_thread().name - elif self._semlock._get_value() == 1: - name = 'None' - elif self._semlock._count() > 0: - name = 'SomeOtherThread' - else: - name = 'SomeOtherProcess' - except Exception: - name = 'unknown' - return '<%s(owner=%s)>' % (self.__class__.__name__, name) - -# -# Recursive lock -# - -class RLock(SemLock): - - def __init__(self, *, ctx): - SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx) - - def __repr__(self): - try: - if self._semlock._is_mine(): - name = process.current_process().name - if threading.current_thread().name != 'MainThread': - name += '|' + threading.current_thread().name - count = self._semlock._count() - elif self._semlock._get_value() == 1: - name, count = 'None', 0 - elif self._semlock._count() > 0: - name, count = 'SomeOtherThread', 'nonzero' - else: - name, count = 'SomeOtherProcess', 'nonzero' - except Exception: - name, count = 'unknown', 'unknown' - return '<%s(%s, %s)>' % (self.__class__.__name__, name, count) - -# -# Condition variable -# - -class Condition(object): - - def __init__(self, lock=None, *, ctx): - self._lock = lock or ctx.RLock() - self._sleeping_count = ctx.Semaphore(0) - self._woken_count = ctx.Semaphore(0) - self._wait_semaphore = ctx.Semaphore(0) - self._make_methods() - - def __getstate__(self): - context.assert_spawning(self) - return (self._lock, self._sleeping_count, - self._woken_count, self._wait_semaphore) - - def __setstate__(self, state): - (self._lock, self._sleeping_count, - self._woken_count, self._wait_semaphore) = state - self._make_methods() - - def __enter__(self): - return self._lock.__enter__() - - def __exit__(self, *args): - return self._lock.__exit__(*args) - - def _make_methods(self): - self.acquire = self._lock.acquire - self.release = self._lock.release - - def __repr__(self): - try: - num_waiters = (self._sleeping_count._semlock._get_value() - - self._woken_count._semlock._get_value()) - except Exception: - num_waiters = 'unknown' - return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters) - - def wait(self, timeout=None): - assert self._lock._semlock._is_mine(), \ - 'must acquire() condition before using wait()' - - # indicate that this thread is going to sleep - self._sleeping_count.release() - - # release lock - count = self._lock._semlock._count() - for i in range(count): - self._lock.release() - - try: - # wait for notification or timeout - return self._wait_semaphore.acquire(True, timeout) - finally: - # indicate that this thread has woken - self._woken_count.release() - - # reacquire lock - for i in range(count): - self._lock.acquire() - - def notify(self, n=1): - assert self._lock._semlock._is_mine(), 'lock is not owned' - assert not self._wait_semaphore.acquire( - False), ('notify: Should not have been able to acquire ' - + '_wait_semaphore') - - # to take account of timeouts since last notify*() we subtract - # woken_count from sleeping_count and rezero woken_count - while self._woken_count.acquire(False): - res = self._sleeping_count.acquire(False) - assert res, ('notify: Bug in sleeping_count.acquire' - + '- res should not be False') - - sleepers = 0 - while sleepers < n and self._sleeping_count.acquire(False): - self._wait_semaphore.release() # wake up one sleeper - sleepers += 1 - - if sleepers: - for i in range(sleepers): - self._woken_count.acquire() # wait for a sleeper to wake - - # rezero wait_semaphore in case some timeouts just happened - while self._wait_semaphore.acquire(False): - pass - - def notify_all(self): - self.notify(n=sys.maxsize) - - def wait_for(self, predicate, timeout=None): - result = predicate() - if result: - return result - if timeout is not None: - endtime = time.monotonic() + timeout - else: - endtime = None - waittime = None - while not result: - if endtime is not None: - waittime = endtime - time.monotonic() - if waittime <= 0: - break - self.wait(waittime) - result = predicate() - return result - -# -# Event -# - -class Event(object): - - def __init__(self, *, ctx): - self._cond = ctx.Condition(ctx.Lock()) - self._flag = ctx.Semaphore(0) - - def is_set(self): - with self._cond: - if self._flag.acquire(False): - self._flag.release() - return True - return False - - def set(self): - with self._cond: - self._flag.acquire(False) - self._flag.release() - self._cond.notify_all() - - def clear(self): - with self._cond: - self._flag.acquire(False) - - def wait(self, timeout=None): - with self._cond: - if self._flag.acquire(False): - self._flag.release() - else: - self._cond.wait(timeout) - - if self._flag.acquire(False): - self._flag.release() - return True - return False - -# -# Barrier -# - -class Barrier(threading.Barrier): - - def __init__(self, parties, action=None, timeout=None, *, ctx): - import struct - from .heap import BufferWrapper - wrapper = BufferWrapper(struct.calcsize('i') * 2) - cond = ctx.Condition() - self.__setstate__((parties, action, timeout, cond, wrapper)) - self._state = 0 - self._count = 0 - - def __setstate__(self, state): - (self._parties, self._action, self._timeout, - self._cond, self._wrapper) = state - self._array = self._wrapper.create_memoryview().cast('i') - - def __getstate__(self): - return (self._parties, self._action, self._timeout, - self._cond, self._wrapper) - - @property - def _state(self): - return self._array[0] - - @_state.setter - def _state(self, value): - self._array[0] = value - - @property - def _count(self): - return self._array[1] - - @_count.setter - def _count(self, value): - self._array[1] = value |
