diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/tools/python3/src/Lib/multiprocessing/context.py | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/tools/python3/src/Lib/multiprocessing/context.py')
-rw-r--r-- | contrib/tools/python3/src/Lib/multiprocessing/context.py | 362 |
1 files changed, 362 insertions, 0 deletions
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/context.py b/contrib/tools/python3/src/Lib/multiprocessing/context.py new file mode 100644 index 0000000000..8d0525d5d6 --- /dev/null +++ b/contrib/tools/python3/src/Lib/multiprocessing/context.py @@ -0,0 +1,362 @@ +import os +import sys +import threading + +from . import process +from . import reduction + +__all__ = () + +# +# Exceptions +# + +class ProcessError(Exception): + pass + +class BufferTooShort(ProcessError): + pass + +class TimeoutError(ProcessError): + pass + +class AuthenticationError(ProcessError): + pass + +# +# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py +# + +class BaseContext(object): + + ProcessError = ProcessError + BufferTooShort = BufferTooShort + TimeoutError = TimeoutError + AuthenticationError = AuthenticationError + + current_process = staticmethod(process.current_process) + parent_process = staticmethod(process.parent_process) + active_children = staticmethod(process.active_children) + + def cpu_count(self): + '''Returns the number of CPUs in the system''' + num = os.cpu_count() + if num is None: + raise NotImplementedError('cannot determine number of cpus') + else: + return num + + def Manager(self): + '''Returns a manager associated with a running server process + + The managers methods such as `Lock()`, `Condition()` and `Queue()` + can be used to create shared objects. + ''' + from .managers import SyncManager + m = SyncManager(ctx=self.get_context()) + m.start() + return m + + def Pipe(self, duplex=True): + '''Returns two connection object connected by a pipe''' + from .connection import Pipe + return Pipe(duplex) + + def Lock(self): + '''Returns a non-recursive lock object''' + from .synchronize import Lock + return Lock(ctx=self.get_context()) + + def RLock(self): + '''Returns a recursive lock object''' + from .synchronize import RLock + return RLock(ctx=self.get_context()) + + def Condition(self, lock=None): + '''Returns a condition object''' + from .synchronize import Condition + return Condition(lock, ctx=self.get_context()) + + def Semaphore(self, value=1): + '''Returns a semaphore object''' + from .synchronize import Semaphore + return Semaphore(value, ctx=self.get_context()) + + def BoundedSemaphore(self, value=1): + '''Returns a bounded semaphore object''' + from .synchronize import BoundedSemaphore + return BoundedSemaphore(value, ctx=self.get_context()) + + def Event(self): + '''Returns an event object''' + from .synchronize import Event + return Event(ctx=self.get_context()) + + def Barrier(self, parties, action=None, timeout=None): + '''Returns a barrier object''' + from .synchronize import Barrier + return Barrier(parties, action, timeout, ctx=self.get_context()) + + def Queue(self, maxsize=0): + '''Returns a queue object''' + from .queues import Queue + return Queue(maxsize, ctx=self.get_context()) + + def JoinableQueue(self, maxsize=0): + '''Returns a queue object''' + from .queues import JoinableQueue + return JoinableQueue(maxsize, ctx=self.get_context()) + + def SimpleQueue(self): + '''Returns a queue object''' + from .queues import SimpleQueue + return SimpleQueue(ctx=self.get_context()) + + def Pool(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None): + '''Returns a process pool object''' + from .pool import Pool + return Pool(processes, initializer, initargs, maxtasksperchild, + context=self.get_context()) + + def RawValue(self, typecode_or_type, *args): + '''Returns a shared object''' + from .sharedctypes import RawValue + return RawValue(typecode_or_type, *args) + + def RawArray(self, typecode_or_type, size_or_initializer): + '''Returns a shared array''' + from .sharedctypes import RawArray + return RawArray(typecode_or_type, size_or_initializer) + + def Value(self, typecode_or_type, *args, lock=True): + '''Returns a synchronized shared object''' + from .sharedctypes import Value + return Value(typecode_or_type, *args, lock=lock, + ctx=self.get_context()) + + def Array(self, typecode_or_type, size_or_initializer, *, lock=True): + '''Returns a synchronized shared array''' + from .sharedctypes import Array + return Array(typecode_or_type, size_or_initializer, lock=lock, + ctx=self.get_context()) + + def freeze_support(self): + '''Check whether this is a fake forked process in a frozen executable. + If so then run code specified by commandline and exit. + ''' + if sys.platform == 'win32' and getattr(sys, 'frozen', False): + from .spawn import freeze_support + freeze_support() + + def get_logger(self): + '''Return package logger -- if it does not already exist then + it is created. + ''' + from .util import get_logger + return get_logger() + + def log_to_stderr(self, level=None): + '''Turn on logging and add a handler which prints to stderr''' + from .util import log_to_stderr + return log_to_stderr(level) + + def allow_connection_pickling(self): + '''Install support for sending connections and sockets + between processes + ''' + # This is undocumented. In previous versions of multiprocessing + # its only effect was to make socket objects inheritable on Windows. + from . import connection + + def set_executable(self, executable): + '''Sets the path to a python.exe or pythonw.exe binary used to run + child processes instead of sys.executable when using the 'spawn' + start method. Useful for people embedding Python. + ''' + from .spawn import set_executable + set_executable(executable) + + def set_forkserver_preload(self, module_names): + '''Set list of module names to try to load in forkserver process. + This is really just a hint. + ''' + from .forkserver import set_forkserver_preload + set_forkserver_preload(module_names) + + def get_context(self, method=None): + if method is None: + return self + try: + ctx = _concrete_contexts[method] + except KeyError: + raise ValueError('cannot find context for %r' % method) from None + ctx._check_available() + return ctx + + def get_start_method(self, allow_none=False): + return self._name + + def set_start_method(self, method, force=False): + raise ValueError('cannot set start method of concrete context') + + @property + def reducer(self): + '''Controls how objects will be reduced to a form that can be + shared with other processes.''' + return globals().get('reduction') + + @reducer.setter + def reducer(self, reduction): + globals()['reduction'] = reduction + + def _check_available(self): + pass + +# +# Type of default context -- underlying context can be set at most once +# + +class Process(process.BaseProcess): + _start_method = None + @staticmethod + def _Popen(process_obj): + return _default_context.get_context().Process._Popen(process_obj) + +class DefaultContext(BaseContext): + Process = Process + + def __init__(self, context): + self._default_context = context + self._actual_context = None + + def get_context(self, method=None): + if method is None: + if self._actual_context is None: + self._actual_context = self._default_context + return self._actual_context + else: + return super().get_context(method) + + def set_start_method(self, method, force=False): + if self._actual_context is not None and not force: + raise RuntimeError('context has already been set') + if method is None and force: + self._actual_context = None + return + self._actual_context = self.get_context(method) + + def get_start_method(self, allow_none=False): + if self._actual_context is None: + if allow_none: + return None + self._actual_context = self._default_context + return self._actual_context._name + + def get_all_start_methods(self): + if sys.platform == 'win32': + return ['spawn'] + else: + methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn'] + if reduction.HAVE_SEND_HANDLE: + methods.append('forkserver') + return methods + + +# +# Context types for fixed start method +# + +if sys.platform != 'win32': + + class ForkProcess(process.BaseProcess): + _start_method = 'fork' + @staticmethod + def _Popen(process_obj): + from .popen_fork import Popen + return Popen(process_obj) + + class SpawnProcess(process.BaseProcess): + _start_method = 'spawn' + @staticmethod + def _Popen(process_obj): + from .popen_spawn_posix import Popen + return Popen(process_obj) + + class ForkServerProcess(process.BaseProcess): + _start_method = 'forkserver' + @staticmethod + def _Popen(process_obj): + from .popen_forkserver import Popen + return Popen(process_obj) + + class ForkContext(BaseContext): + _name = 'fork' + Process = ForkProcess + + class SpawnContext(BaseContext): + _name = 'spawn' + Process = SpawnProcess + + class ForkServerContext(BaseContext): + _name = 'forkserver' + Process = ForkServerProcess + def _check_available(self): + if not reduction.HAVE_SEND_HANDLE: + raise ValueError('forkserver start method not available') + + _concrete_contexts = { + 'fork': ForkContext(), + 'spawn': SpawnContext(), + 'forkserver': ForkServerContext(), + } + if sys.platform == 'darwin': + # bpo-33725: running arbitrary code after fork() is no longer reliable + # on macOS since macOS 10.14 (Mojave). Use spawn by default instead. + _default_context = DefaultContext(_concrete_contexts['spawn']) + else: + _default_context = DefaultContext(_concrete_contexts['fork']) + +else: + + class SpawnProcess(process.BaseProcess): + _start_method = 'spawn' + @staticmethod + def _Popen(process_obj): + from .popen_spawn_win32 import Popen + return Popen(process_obj) + + class SpawnContext(BaseContext): + _name = 'spawn' + Process = SpawnProcess + + _concrete_contexts = { + 'spawn': SpawnContext(), + } + _default_context = DefaultContext(_concrete_contexts['spawn']) + +# +# Force the start method +# + +def _force_start_method(method): + _default_context._actual_context = _concrete_contexts[method] + +# +# Check that the current thread is spawning a child process +# + +_tls = threading.local() + +def get_spawning_popen(): + return getattr(_tls, 'spawning_popen', None) + +def set_spawning_popen(popen): + _tls.spawning_popen = popen + +def assert_spawning(obj): + if get_spawning_popen() is None: + raise RuntimeError( + '%s objects should only be shared between processes' + ' through inheritance' % type(obj).__name__ + ) |