diff options
author | AlexSm <alex@ydb.tech> | 2024-03-05 10:40:59 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-05 12:40:59 +0300 |
commit | 1ac13c847b5358faba44dbb638a828e24369467b (patch) | |
tree | 07672b4dd3604ad3dee540a02c6494cb7d10dc3d /contrib/tools/python3/Lib/multiprocessing/managers.py | |
parent | ffcca3e7f7958ddc6487b91d3df8c01054bd0638 (diff) | |
download | ydb-1ac13c847b5358faba44dbb638a828e24369467b.tar.gz |
Library import 16 (#2433)
Co-authored-by: robot-piglet <robot-piglet@yandex-team.com>
Co-authored-by: deshevoy <deshevoy@yandex-team.com>
Co-authored-by: robot-contrib <robot-contrib@yandex-team.com>
Co-authored-by: thegeorg <thegeorg@yandex-team.com>
Co-authored-by: robot-ya-builder <robot-ya-builder@yandex-team.com>
Co-authored-by: svidyuk <svidyuk@yandex-team.com>
Co-authored-by: shadchin <shadchin@yandex-team.com>
Co-authored-by: robot-ratatosk <robot-ratatosk@yandex-team.com>
Co-authored-by: innokentii <innokentii@yandex-team.com>
Co-authored-by: arkady-e1ppa <arkady-e1ppa@yandex-team.com>
Co-authored-by: snermolaev <snermolaev@yandex-team.com>
Co-authored-by: dimdim11 <dimdim11@yandex-team.com>
Co-authored-by: kickbutt <kickbutt@yandex-team.com>
Co-authored-by: abdullinsaid <abdullinsaid@yandex-team.com>
Co-authored-by: korsunandrei <korsunandrei@yandex-team.com>
Co-authored-by: petrk <petrk@yandex-team.com>
Co-authored-by: miroslav2 <miroslav2@yandex-team.com>
Co-authored-by: serjflint <serjflint@yandex-team.com>
Co-authored-by: akhropov <akhropov@yandex-team.com>
Co-authored-by: prettyboy <prettyboy@yandex-team.com>
Co-authored-by: ilikepugs <ilikepugs@yandex-team.com>
Co-authored-by: hiddenpath <hiddenpath@yandex-team.com>
Co-authored-by: mikhnenko <mikhnenko@yandex-team.com>
Co-authored-by: spreis <spreis@yandex-team.com>
Co-authored-by: andreyshspb <andreyshspb@yandex-team.com>
Co-authored-by: dimaandreev <dimaandreev@yandex-team.com>
Co-authored-by: rashid <rashid@yandex-team.com>
Co-authored-by: robot-ydb-importer <robot-ydb-importer@yandex-team.com>
Co-authored-by: r-vetrov <r-vetrov@yandex-team.com>
Co-authored-by: ypodlesov <ypodlesov@yandex-team.com>
Co-authored-by: zaverden <zaverden@yandex-team.com>
Co-authored-by: vpozdyayev <vpozdyayev@yandex-team.com>
Co-authored-by: robot-cozmo <robot-cozmo@yandex-team.com>
Co-authored-by: v-korovin <v-korovin@yandex-team.com>
Co-authored-by: arikon <arikon@yandex-team.com>
Co-authored-by: khoden <khoden@yandex-team.com>
Co-authored-by: psydmm <psydmm@yandex-team.com>
Co-authored-by: robot-javacom <robot-javacom@yandex-team.com>
Co-authored-by: dtorilov <dtorilov@yandex-team.com>
Co-authored-by: sennikovmv <sennikovmv@yandex-team.com>
Co-authored-by: hcpp <hcpp@ydb.tech>
Diffstat (limited to 'contrib/tools/python3/Lib/multiprocessing/managers.py')
-rw-r--r-- | contrib/tools/python3/Lib/multiprocessing/managers.py | 1380 |
1 files changed, 1380 insertions, 0 deletions
diff --git a/contrib/tools/python3/Lib/multiprocessing/managers.py b/contrib/tools/python3/Lib/multiprocessing/managers.py new file mode 100644 index 0000000000..75d9c18c20 --- /dev/null +++ b/contrib/tools/python3/Lib/multiprocessing/managers.py @@ -0,0 +1,1380 @@ +# +# Module providing manager classes for dealing +# with shared objects +# +# multiprocessing/managers.py +# +# Copyright (c) 2006-2008, R Oudkerk +# Licensed to PSF under a Contributor Agreement. +# + +__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] + +# +# Imports +# + +import sys +import threading +import signal +import array +import queue +import time +import types +import os +from os import getpid + +from traceback import format_exc + +from . import connection +from .context import reduction, get_spawning_popen, ProcessError +from . import pool +from . import process +from . import util +from . import get_context +try: + from . import shared_memory +except ImportError: + HAS_SHMEM = False +else: + HAS_SHMEM = True + __all__.append('SharedMemoryManager') + +# +# Register some things for pickling +# + +def reduce_array(a): + return array.array, (a.typecode, a.tobytes()) +reduction.register(array.array, reduce_array) + +view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] +def rebuild_as_list(obj): + return list, (list(obj),) +for view_type in view_types: + reduction.register(view_type, rebuild_as_list) +del view_type, view_types + +# +# Type for identifying shared objects +# + +class Token(object): + ''' + Type to uniquely identify a shared object + ''' + __slots__ = ('typeid', 'address', 'id') + + def __init__(self, typeid, address, id): + (self.typeid, self.address, self.id) = (typeid, address, id) + + def __getstate__(self): + return (self.typeid, self.address, self.id) + + def __setstate__(self, state): + (self.typeid, self.address, self.id) = state + + def __repr__(self): + return '%s(typeid=%r, address=%r, id=%r)' % \ + (self.__class__.__name__, self.typeid, self.address, self.id) + +# +# Function for communication with a manager's server process +# + +def dispatch(c, id, methodname, args=(), kwds={}): + ''' + Send a message to manager using connection `c` and return response + ''' + c.send((id, methodname, args, kwds)) + kind, result = c.recv() + if kind == '#RETURN': + return result + raise convert_to_error(kind, result) + +def convert_to_error(kind, result): + if kind == '#ERROR': + return result + elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): + if not isinstance(result, str): + raise TypeError( + "Result {0!r} (kind '{1}') type is {2}, not str".format( + result, kind, type(result))) + if kind == '#UNSERIALIZABLE': + return RemoteError('Unserializable message: %s\n' % result) + else: + return RemoteError(result) + else: + return ValueError('Unrecognized message type {!r}'.format(kind)) + +class RemoteError(Exception): + def __str__(self): + return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) + +# +# Functions for finding the method names of an object +# + +def all_methods(obj): + ''' + Return a list of names of methods of `obj` + ''' + temp = [] + for name in dir(obj): + func = getattr(obj, name) + if callable(func): + temp.append(name) + return temp + +def public_methods(obj): + ''' + Return a list of names of methods of `obj` which do not start with '_' + ''' + return [name for name in all_methods(obj) if name[0] != '_'] + +# +# Server which is run in a process controlled by a manager +# + +class Server(object): + ''' + Server class which runs in a process controlled by a manager object + ''' + public = ['shutdown', 'create', 'accept_connection', 'get_methods', + 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] + + def __init__(self, registry, address, authkey, serializer): + if not isinstance(authkey, bytes): + raise TypeError( + "Authkey {0!r} is type {1!s}, not bytes".format( + authkey, type(authkey))) + self.registry = registry + self.authkey = process.AuthenticationString(authkey) + Listener, Client = listener_client[serializer] + + # do authentication later + self.listener = Listener(address=address, backlog=128) + self.address = self.listener.address + + self.id_to_obj = {'0': (None, ())} + self.id_to_refcount = {} + self.id_to_local_proxy_obj = {} + self.mutex = threading.Lock() + + def serve_forever(self): + ''' + Run the server forever + ''' + self.stop_event = threading.Event() + process.current_process()._manager_server = self + try: + accepter = threading.Thread(target=self.accepter) + accepter.daemon = True + accepter.start() + try: + while not self.stop_event.is_set(): + self.stop_event.wait(1) + except (KeyboardInterrupt, SystemExit): + pass + finally: + if sys.stdout != sys.__stdout__: # what about stderr? + util.debug('resetting stdout, stderr') + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + sys.exit(0) + + def accepter(self): + while True: + try: + c = self.listener.accept() + except OSError: + continue + t = threading.Thread(target=self.handle_request, args=(c,)) + t.daemon = True + t.start() + + def _handle_request(self, c): + request = None + try: + connection.deliver_challenge(c, self.authkey) + connection.answer_challenge(c, self.authkey) + request = c.recv() + ignore, funcname, args, kwds = request + assert funcname in self.public, '%r unrecognized' % funcname + func = getattr(self, funcname) + except Exception: + msg = ('#TRACEBACK', format_exc()) + else: + try: + result = func(c, *args, **kwds) + except Exception: + msg = ('#TRACEBACK', format_exc()) + else: + msg = ('#RETURN', result) + + try: + c.send(msg) + except Exception as e: + try: + c.send(('#TRACEBACK', format_exc())) + except Exception: + pass + util.info('Failure to send message: %r', msg) + util.info(' ... request was %r', request) + util.info(' ... exception was %r', e) + + def handle_request(self, conn): + ''' + Handle a new connection + ''' + try: + self._handle_request(conn) + except SystemExit: + # Server.serve_client() calls sys.exit(0) on EOF + pass + finally: + conn.close() + + def serve_client(self, conn): + ''' + Handle requests from the proxies in a particular process/thread + ''' + util.debug('starting server thread to service %r', + threading.current_thread().name) + + recv = conn.recv + send = conn.send + id_to_obj = self.id_to_obj + + while not self.stop_event.is_set(): + + try: + methodname = obj = None + request = recv() + ident, methodname, args, kwds = request + try: + obj, exposed, gettypeid = id_to_obj[ident] + except KeyError as ke: + try: + obj, exposed, gettypeid = \ + self.id_to_local_proxy_obj[ident] + except KeyError: + raise ke + + if methodname not in exposed: + raise AttributeError( + 'method %r of %r object is not in exposed=%r' % + (methodname, type(obj), exposed) + ) + + function = getattr(obj, methodname) + + try: + res = function(*args, **kwds) + except Exception as e: + msg = ('#ERROR', e) + else: + typeid = gettypeid and gettypeid.get(methodname, None) + if typeid: + rident, rexposed = self.create(conn, typeid, res) + token = Token(typeid, self.address, rident) + msg = ('#PROXY', (rexposed, token)) + else: + msg = ('#RETURN', res) + + except AttributeError: + if methodname is None: + msg = ('#TRACEBACK', format_exc()) + else: + try: + fallback_func = self.fallback_mapping[methodname] + result = fallback_func( + self, conn, ident, obj, *args, **kwds + ) + msg = ('#RETURN', result) + except Exception: + msg = ('#TRACEBACK', format_exc()) + + except EOFError: + util.debug('got EOF -- exiting thread serving %r', + threading.current_thread().name) + sys.exit(0) + + except Exception: + msg = ('#TRACEBACK', format_exc()) + + try: + try: + send(msg) + except Exception: + send(('#UNSERIALIZABLE', format_exc())) + except Exception as e: + util.info('exception in thread serving %r', + threading.current_thread().name) + util.info(' ... message was %r', msg) + util.info(' ... exception was %r', e) + conn.close() + sys.exit(1) + + def fallback_getvalue(self, conn, ident, obj): + return obj + + def fallback_str(self, conn, ident, obj): + return str(obj) + + def fallback_repr(self, conn, ident, obj): + return repr(obj) + + fallback_mapping = { + '__str__':fallback_str, + '__repr__':fallback_repr, + '#GETVALUE':fallback_getvalue + } + + def dummy(self, c): + pass + + def debug_info(self, c): + ''' + Return some info --- useful to spot problems with refcounting + ''' + # Perhaps include debug info about 'c'? + with self.mutex: + result = [] + keys = list(self.id_to_refcount.keys()) + keys.sort() + for ident in keys: + if ident != '0': + result.append(' %s: refcount=%s\n %s' % + (ident, self.id_to_refcount[ident], + str(self.id_to_obj[ident][0])[:75])) + return '\n'.join(result) + + def number_of_objects(self, c): + ''' + Number of shared objects + ''' + # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' + return len(self.id_to_refcount) + + def shutdown(self, c): + ''' + Shutdown this process + ''' + try: + util.debug('manager received shutdown message') + c.send(('#RETURN', None)) + except: + import traceback + traceback.print_exc() + finally: + self.stop_event.set() + + def create(self, c, typeid, /, *args, **kwds): + ''' + Create a new shared object and return its id + ''' + with self.mutex: + callable, exposed, method_to_typeid, proxytype = \ + self.registry[typeid] + + if callable is None: + if kwds or (len(args) != 1): + raise ValueError( + "Without callable, must have one non-keyword argument") + obj = args[0] + else: + obj = callable(*args, **kwds) + + if exposed is None: + exposed = public_methods(obj) + if method_to_typeid is not None: + if not isinstance(method_to_typeid, dict): + raise TypeError( + "Method_to_typeid {0!r}: type {1!s}, not dict".format( + method_to_typeid, type(method_to_typeid))) + exposed = list(exposed) + list(method_to_typeid) + + ident = '%x' % id(obj) # convert to string because xmlrpclib + # only has 32 bit signed integers + util.debug('%r callable returned object with id %r', typeid, ident) + + self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) + if ident not in self.id_to_refcount: + self.id_to_refcount[ident] = 0 + + self.incref(c, ident) + return ident, tuple(exposed) + + def get_methods(self, c, token): + ''' + Return the methods of the shared object indicated by token + ''' + return tuple(self.id_to_obj[token.id][1]) + + def accept_connection(self, c, name): + ''' + Spawn a new thread to serve this connection + ''' + threading.current_thread().name = name + c.send(('#RETURN', None)) + self.serve_client(c) + + def incref(self, c, ident): + with self.mutex: + try: + self.id_to_refcount[ident] += 1 + except KeyError as ke: + # If no external references exist but an internal (to the + # manager) still does and a new external reference is created + # from it, restore the manager's tracking of it from the + # previously stashed internal ref. + if ident in self.id_to_local_proxy_obj: + self.id_to_refcount[ident] = 1 + self.id_to_obj[ident] = \ + self.id_to_local_proxy_obj[ident] + util.debug('Server re-enabled tracking & INCREF %r', ident) + else: + raise ke + + def decref(self, c, ident): + if ident not in self.id_to_refcount and \ + ident in self.id_to_local_proxy_obj: + util.debug('Server DECREF skipping %r', ident) + return + + with self.mutex: + if self.id_to_refcount[ident] <= 0: + raise AssertionError( + "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( + ident, self.id_to_obj[ident], + self.id_to_refcount[ident])) + self.id_to_refcount[ident] -= 1 + if self.id_to_refcount[ident] == 0: + del self.id_to_refcount[ident] + + if ident not in self.id_to_refcount: + # Two-step process in case the object turns out to contain other + # proxy objects (e.g. a managed list of managed lists). + # Otherwise, deleting self.id_to_obj[ident] would trigger the + # deleting of the stored value (another managed object) which would + # in turn attempt to acquire the mutex that is already held here. + self.id_to_obj[ident] = (None, (), None) # thread-safe + util.debug('disposing of obj with id %r', ident) + with self.mutex: + del self.id_to_obj[ident] + + +# +# Class to represent state of a manager +# + +class State(object): + __slots__ = ['value'] + INITIAL = 0 + STARTED = 1 + SHUTDOWN = 2 + +# +# Mapping from serializer name to Listener and Client types +# + +listener_client = { + 'pickle' : (connection.Listener, connection.Client), + 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) + } + +# +# Definition of BaseManager +# + +class BaseManager(object): + ''' + Base class for managers + ''' + _registry = {} + _Server = Server + + def __init__(self, address=None, authkey=None, serializer='pickle', + ctx=None, *, shutdown_timeout=1.0): + if authkey is None: + authkey = process.current_process().authkey + self._address = address # XXX not final address if eg ('', 0) + self._authkey = process.AuthenticationString(authkey) + self._state = State() + self._state.value = State.INITIAL + self._serializer = serializer + self._Listener, self._Client = listener_client[serializer] + self._ctx = ctx or get_context() + self._shutdown_timeout = shutdown_timeout + + def get_server(self): + ''' + Return server object with serve_forever() method and address attribute + ''' + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + return Server(self._registry, self._address, + self._authkey, self._serializer) + + def connect(self): + ''' + Connect manager object to the server process + ''' + Listener, Client = listener_client[self._serializer] + conn = Client(self._address, authkey=self._authkey) + dispatch(conn, None, 'dummy') + self._state.value = State.STARTED + + def start(self, initializer=None, initargs=()): + ''' + Spawn a server process for this manager object + ''' + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + + if initializer is not None and not callable(initializer): + raise TypeError('initializer must be a callable') + + # pipe over which we will retrieve address of server + reader, writer = connection.Pipe(duplex=False) + + # spawn process which runs a server + self._process = self._ctx.Process( + target=type(self)._run_server, + args=(self._registry, self._address, self._authkey, + self._serializer, writer, initializer, initargs), + ) + ident = ':'.join(str(i) for i in self._process._identity) + self._process.name = type(self).__name__ + '-' + ident + self._process.start() + + # get address of server + writer.close() + self._address = reader.recv() + reader.close() + + # register a finalizer + self._state.value = State.STARTED + self.shutdown = util.Finalize( + self, type(self)._finalize_manager, + args=(self._process, self._address, self._authkey, self._state, + self._Client, self._shutdown_timeout), + exitpriority=0 + ) + + @classmethod + def _run_server(cls, registry, address, authkey, serializer, writer, + initializer=None, initargs=()): + ''' + Create a server, report its address and run it + ''' + # bpo-36368: protect server process from KeyboardInterrupt signals + signal.signal(signal.SIGINT, signal.SIG_IGN) + + if initializer is not None: + initializer(*initargs) + + # create server + server = cls._Server(registry, address, authkey, serializer) + + # inform parent process of the server's address + writer.send(server.address) + writer.close() + + # run the manager + util.info('manager serving at %r', server.address) + server.serve_forever() + + def _create(self, typeid, /, *args, **kwds): + ''' + Create a new shared object; return the token and exposed tuple + ''' + assert self._state.value == State.STARTED, 'server not yet started' + conn = self._Client(self._address, authkey=self._authkey) + try: + id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) + finally: + conn.close() + return Token(typeid, self._address, id), exposed + + def join(self, timeout=None): + ''' + Join the manager process (if it has been spawned) + ''' + if self._process is not None: + self._process.join(timeout) + if not self._process.is_alive(): + self._process = None + + def _debug_info(self): + ''' + Return some info about the servers shared objects and connections + ''' + conn = self._Client(self._address, authkey=self._authkey) + try: + return dispatch(conn, None, 'debug_info') + finally: + conn.close() + + def _number_of_objects(self): + ''' + Return the number of shared objects + ''' + conn = self._Client(self._address, authkey=self._authkey) + try: + return dispatch(conn, None, 'number_of_objects') + finally: + conn.close() + + def __enter__(self): + if self._state.value == State.INITIAL: + self.start() + if self._state.value != State.STARTED: + if self._state.value == State.INITIAL: + raise ProcessError("Unable to start server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + + @staticmethod + def _finalize_manager(process, address, authkey, state, _Client, + shutdown_timeout): + ''' + Shutdown the manager process; will be registered as a finalizer + ''' + if process.is_alive(): + util.info('sending shutdown message to manager') + try: + conn = _Client(address, authkey=authkey) + try: + dispatch(conn, None, 'shutdown') + finally: + conn.close() + except Exception: + pass + + process.join(timeout=shutdown_timeout) + if process.is_alive(): + util.info('manager still alive') + if hasattr(process, 'terminate'): + util.info('trying to `terminate()` manager process') + process.terminate() + process.join(timeout=shutdown_timeout) + if process.is_alive(): + util.info('manager still alive after terminate') + process.kill() + process.join() + + state.value = State.SHUTDOWN + try: + del BaseProxy._address_to_local[address] + except KeyError: + pass + + @property + def address(self): + return self._address + + @classmethod + def register(cls, typeid, callable=None, proxytype=None, exposed=None, + method_to_typeid=None, create_method=True): + ''' + Register a typeid with the manager type + ''' + if '_registry' not in cls.__dict__: + cls._registry = cls._registry.copy() + + if proxytype is None: + proxytype = AutoProxy + + exposed = exposed or getattr(proxytype, '_exposed_', None) + + method_to_typeid = method_to_typeid or \ + getattr(proxytype, '_method_to_typeid_', None) + + if method_to_typeid: + for key, value in list(method_to_typeid.items()): # isinstance? + assert type(key) is str, '%r is not a string' % key + assert type(value) is str, '%r is not a string' % value + + cls._registry[typeid] = ( + callable, exposed, method_to_typeid, proxytype + ) + + if create_method: + def temp(self, /, *args, **kwds): + util.debug('requesting creation of a shared %r object', typeid) + token, exp = self._create(typeid, *args, **kwds) + proxy = proxytype( + token, self._serializer, manager=self, + authkey=self._authkey, exposed=exp + ) + conn = self._Client(token.address, authkey=self._authkey) + dispatch(conn, None, 'decref', (token.id,)) + return proxy + temp.__name__ = typeid + setattr(cls, typeid, temp) + +# +# Subclass of set which get cleared after a fork +# + +class ProcessLocalSet(set): + def __init__(self): + util.register_after_fork(self, lambda obj: obj.clear()) + def __reduce__(self): + return type(self), () + +# +# Definition of BaseProxy +# + +class BaseProxy(object): + ''' + A base for proxies of shared objects + ''' + _address_to_local = {} + _mutex = util.ForkAwareThreadLock() + + def __init__(self, token, serializer, manager=None, + authkey=None, exposed=None, incref=True, manager_owned=False): + with BaseProxy._mutex: + tls_idset = BaseProxy._address_to_local.get(token.address, None) + if tls_idset is None: + tls_idset = util.ForkAwareLocal(), ProcessLocalSet() + BaseProxy._address_to_local[token.address] = tls_idset + + # self._tls is used to record the connection used by this + # thread to communicate with the manager at token.address + self._tls = tls_idset[0] + + # self._idset is used to record the identities of all shared + # objects for which the current process owns references and + # which are in the manager at token.address + self._idset = tls_idset[1] + + self._token = token + self._id = self._token.id + self._manager = manager + self._serializer = serializer + self._Client = listener_client[serializer][1] + + # Should be set to True only when a proxy object is being created + # on the manager server; primary use case: nested proxy objects. + # RebuildProxy detects when a proxy is being created on the manager + # and sets this value appropriately. + self._owned_by_manager = manager_owned + + if authkey is not None: + self._authkey = process.AuthenticationString(authkey) + elif self._manager is not None: + self._authkey = self._manager._authkey + else: + self._authkey = process.current_process().authkey + + if incref: + self._incref() + + util.register_after_fork(self, BaseProxy._after_fork) + + def _connect(self): + util.debug('making connection to manager') + name = process.current_process().name + if threading.current_thread().name != 'MainThread': + name += '|' + threading.current_thread().name + conn = self._Client(self._token.address, authkey=self._authkey) + dispatch(conn, None, 'accept_connection', (name,)) + self._tls.connection = conn + + def _callmethod(self, methodname, args=(), kwds={}): + ''' + Try to call a method of the referent and return a copy of the result + ''' + try: + conn = self._tls.connection + except AttributeError: + util.debug('thread %r does not own a connection', + threading.current_thread().name) + self._connect() + conn = self._tls.connection + + conn.send((self._id, methodname, args, kwds)) + kind, result = conn.recv() + + if kind == '#RETURN': + return result + elif kind == '#PROXY': + exposed, token = result + proxytype = self._manager._registry[token.typeid][-1] + token.address = self._token.address + proxy = proxytype( + token, self._serializer, manager=self._manager, + authkey=self._authkey, exposed=exposed + ) + conn = self._Client(token.address, authkey=self._authkey) + dispatch(conn, None, 'decref', (token.id,)) + return proxy + raise convert_to_error(kind, result) + + def _getvalue(self): + ''' + Get a copy of the value of the referent + ''' + return self._callmethod('#GETVALUE') + + def _incref(self): + if self._owned_by_manager: + util.debug('owned_by_manager skipped INCREF of %r', self._token.id) + return + + conn = self._Client(self._token.address, authkey=self._authkey) + dispatch(conn, None, 'incref', (self._id,)) + util.debug('INCREF %r', self._token.id) + + self._idset.add(self._id) + + state = self._manager and self._manager._state + + self._close = util.Finalize( + self, BaseProxy._decref, + args=(self._token, self._authkey, state, + self._tls, self._idset, self._Client), + exitpriority=10 + ) + + @staticmethod + def _decref(token, authkey, state, tls, idset, _Client): + idset.discard(token.id) + + # check whether manager is still alive + if state is None or state.value == State.STARTED: + # tell manager this process no longer cares about referent + try: + util.debug('DECREF %r', token.id) + conn = _Client(token.address, authkey=authkey) + dispatch(conn, None, 'decref', (token.id,)) + except Exception as e: + util.debug('... decref failed %s', e) + + else: + util.debug('DECREF %r -- manager already shutdown', token.id) + + # check whether we can close this thread's connection because + # the process owns no more references to objects for this manager + if not idset and hasattr(tls, 'connection'): + util.debug('thread %r has no more proxies so closing conn', + threading.current_thread().name) + tls.connection.close() + del tls.connection + + def _after_fork(self): + self._manager = None + try: + self._incref() + except Exception as e: + # the proxy may just be for a manager which has shutdown + util.info('incref failed: %s' % e) + + def __reduce__(self): + kwds = {} + if get_spawning_popen() is not None: + kwds['authkey'] = self._authkey + + if getattr(self, '_isauto', False): + kwds['exposed'] = self._exposed_ + return (RebuildProxy, + (AutoProxy, self._token, self._serializer, kwds)) + else: + return (RebuildProxy, + (type(self), self._token, self._serializer, kwds)) + + def __deepcopy__(self, memo): + return self._getvalue() + + def __repr__(self): + return '<%s object, typeid %r at %#x>' % \ + (type(self).__name__, self._token.typeid, id(self)) + + def __str__(self): + ''' + Return representation of the referent (or a fall-back if that fails) + ''' + try: + return self._callmethod('__repr__') + except Exception: + return repr(self)[:-1] + "; '__str__()' failed>" + +# +# Function used for unpickling +# + +def RebuildProxy(func, token, serializer, kwds): + ''' + Function used for unpickling proxy objects. + ''' + server = getattr(process.current_process(), '_manager_server', None) + if server and server.address == token.address: + util.debug('Rebuild a proxy owned by manager, token=%r', token) + kwds['manager_owned'] = True + if token.id not in server.id_to_local_proxy_obj: + server.id_to_local_proxy_obj[token.id] = \ + server.id_to_obj[token.id] + incref = ( + kwds.pop('incref', True) and + not getattr(process.current_process(), '_inheriting', False) + ) + return func(token, serializer, incref=incref, **kwds) + +# +# Functions to create proxies and proxy types +# + +def MakeProxyType(name, exposed, _cache={}): + ''' + Return a proxy type whose methods are given by `exposed` + ''' + exposed = tuple(exposed) + try: + return _cache[(name, exposed)] + except KeyError: + pass + + dic = {} + + for meth in exposed: + exec('''def %s(self, /, *args, **kwds): + return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) + + ProxyType = type(name, (BaseProxy,), dic) + ProxyType._exposed_ = exposed + _cache[(name, exposed)] = ProxyType + return ProxyType + + +def AutoProxy(token, serializer, manager=None, authkey=None, + exposed=None, incref=True, manager_owned=False): + ''' + Return an auto-proxy for `token` + ''' + _Client = listener_client[serializer][1] + + if exposed is None: + conn = _Client(token.address, authkey=authkey) + try: + exposed = dispatch(conn, None, 'get_methods', (token,)) + finally: + conn.close() + + if authkey is None and manager is not None: + authkey = manager._authkey + if authkey is None: + authkey = process.current_process().authkey + + ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) + proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, + incref=incref, manager_owned=manager_owned) + proxy._isauto = True + return proxy + +# +# Types/callables which we will register with SyncManager +# + +class Namespace(object): + def __init__(self, /, **kwds): + self.__dict__.update(kwds) + def __repr__(self): + items = list(self.__dict__.items()) + temp = [] + for name, value in items: + if not name.startswith('_'): + temp.append('%s=%r' % (name, value)) + temp.sort() + return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) + +class Value(object): + def __init__(self, typecode, value, lock=True): + self._typecode = typecode + self._value = value + def get(self): + return self._value + def set(self, value): + self._value = value + def __repr__(self): + return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) + value = property(get, set) + +def Array(typecode, sequence, lock=True): + return array.array(typecode, sequence) + +# +# Proxy types used by SyncManager +# + +class IteratorProxy(BaseProxy): + _exposed_ = ('__next__', 'send', 'throw', 'close') + def __iter__(self): + return self + def __next__(self, *args): + return self._callmethod('__next__', args) + def send(self, *args): + return self._callmethod('send', args) + def throw(self, *args): + return self._callmethod('throw', args) + def close(self, *args): + return self._callmethod('close', args) + + +class AcquirerProxy(BaseProxy): + _exposed_ = ('acquire', 'release') + def acquire(self, blocking=True, timeout=None): + args = (blocking,) if timeout is None else (blocking, timeout) + return self._callmethod('acquire', args) + def release(self): + return self._callmethod('release') + def __enter__(self): + return self._callmethod('acquire') + def __exit__(self, exc_type, exc_val, exc_tb): + return self._callmethod('release') + + +class ConditionProxy(AcquirerProxy): + _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + def notify(self, n=1): + return self._callmethod('notify', (n,)) + def notify_all(self): + return self._callmethod('notify_all') + def wait_for(self, predicate, timeout=None): + result = predicate() + if result: + return result + if timeout is not None: + endtime = time.monotonic() + timeout + else: + endtime = None + waittime = None + while not result: + if endtime is not None: + waittime = endtime - time.monotonic() + if waittime <= 0: + break + self.wait(waittime) + result = predicate() + return result + + +class EventProxy(BaseProxy): + _exposed_ = ('is_set', 'set', 'clear', 'wait') + def is_set(self): + return self._callmethod('is_set') + def set(self): + return self._callmethod('set') + def clear(self): + return self._callmethod('clear') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + + +class BarrierProxy(BaseProxy): + _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + def abort(self): + return self._callmethod('abort') + def reset(self): + return self._callmethod('reset') + @property + def parties(self): + return self._callmethod('__getattribute__', ('parties',)) + @property + def n_waiting(self): + return self._callmethod('__getattribute__', ('n_waiting',)) + @property + def broken(self): + return self._callmethod('__getattribute__', ('broken',)) + + +class NamespaceProxy(BaseProxy): + _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') + def __getattr__(self, key): + if key[0] == '_': + return object.__getattribute__(self, key) + callmethod = object.__getattribute__(self, '_callmethod') + return callmethod('__getattribute__', (key,)) + def __setattr__(self, key, value): + if key[0] == '_': + return object.__setattr__(self, key, value) + callmethod = object.__getattribute__(self, '_callmethod') + return callmethod('__setattr__', (key, value)) + def __delattr__(self, key): + if key[0] == '_': + return object.__delattr__(self, key) + callmethod = object.__getattribute__(self, '_callmethod') + return callmethod('__delattr__', (key,)) + + +class ValueProxy(BaseProxy): + _exposed_ = ('get', 'set') + def get(self): + return self._callmethod('get') + def set(self, value): + return self._callmethod('set', (value,)) + value = property(get, set) + + __class_getitem__ = classmethod(types.GenericAlias) + + +BaseListProxy = MakeProxyType('BaseListProxy', ( + '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', + '__mul__', '__reversed__', '__rmul__', '__setitem__', + 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', + 'reverse', 'sort', '__imul__' + )) +class ListProxy(BaseListProxy): + def __iadd__(self, value): + self._callmethod('extend', (value,)) + return self + def __imul__(self, value): + self._callmethod('__imul__', (value,)) + return self + + +DictProxy = MakeProxyType('DictProxy', ( + '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', + '__setitem__', 'clear', 'copy', 'get', 'items', + 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' + )) +DictProxy._method_to_typeid_ = { + '__iter__': 'Iterator', + } + + +ArrayProxy = MakeProxyType('ArrayProxy', ( + '__len__', '__getitem__', '__setitem__' + )) + + +BasePoolProxy = MakeProxyType('PoolProxy', ( + 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', + 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', + )) +BasePoolProxy._method_to_typeid_ = { + 'apply_async': 'AsyncResult', + 'map_async': 'AsyncResult', + 'starmap_async': 'AsyncResult', + 'imap': 'Iterator', + 'imap_unordered': 'Iterator' + } +class PoolProxy(BasePoolProxy): + def __enter__(self): + return self + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + +# +# Definition of SyncManager +# + +class SyncManager(BaseManager): + ''' + Subclass of `BaseManager` which supports a number of shared object types. + + The types registered are those intended for the synchronization + of threads, plus `dict`, `list` and `Namespace`. + + The `multiprocessing.Manager()` function creates started instances of + this class. + ''' + +SyncManager.register('Queue', queue.Queue) +SyncManager.register('JoinableQueue', queue.Queue) +SyncManager.register('Event', threading.Event, EventProxy) +SyncManager.register('Lock', threading.Lock, AcquirerProxy) +SyncManager.register('RLock', threading.RLock, AcquirerProxy) +SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) +SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, + AcquirerProxy) +SyncManager.register('Condition', threading.Condition, ConditionProxy) +SyncManager.register('Barrier', threading.Barrier, BarrierProxy) +SyncManager.register('Pool', pool.Pool, PoolProxy) +SyncManager.register('list', list, ListProxy) +SyncManager.register('dict', dict, DictProxy) +SyncManager.register('Value', Value, ValueProxy) +SyncManager.register('Array', Array, ArrayProxy) +SyncManager.register('Namespace', Namespace, NamespaceProxy) + +# types returned by methods of PoolProxy +SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) +SyncManager.register('AsyncResult', create_method=False) + +# +# Definition of SharedMemoryManager and SharedMemoryServer +# + +if HAS_SHMEM: + class _SharedMemoryTracker: + "Manages one or more shared memory segments." + + def __init__(self, name, segment_names=[]): + self.shared_memory_context_name = name + self.segment_names = segment_names + + def register_segment(self, segment_name): + "Adds the supplied shared memory block name to tracker." + util.debug(f"Register segment {segment_name!r} in pid {getpid()}") + self.segment_names.append(segment_name) + + def destroy_segment(self, segment_name): + """Calls unlink() on the shared memory block with the supplied name + and removes it from the list of blocks being tracked.""" + util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") + self.segment_names.remove(segment_name) + segment = shared_memory.SharedMemory(segment_name) + segment.close() + segment.unlink() + + def unlink(self): + "Calls destroy_segment() on all tracked shared memory blocks." + for segment_name in self.segment_names[:]: + self.destroy_segment(segment_name) + + def __del__(self): + util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") + self.unlink() + + def __getstate__(self): + return (self.shared_memory_context_name, self.segment_names) + + def __setstate__(self, state): + self.__init__(*state) + + + class SharedMemoryServer(Server): + + public = Server.public + \ + ['track_segment', 'release_segment', 'list_segments'] + + def __init__(self, *args, **kwargs): + Server.__init__(self, *args, **kwargs) + address = self.address + # The address of Linux abstract namespaces can be bytes + if isinstance(address, bytes): + address = os.fsdecode(address) + self.shared_memory_context = \ + _SharedMemoryTracker(f"shm_{address}_{getpid()}") + util.debug(f"SharedMemoryServer started by pid {getpid()}") + + def create(self, c, typeid, /, *args, **kwargs): + """Create a new distributed-shared object (not backed by a shared + memory block) and return its id to be used in a Proxy Object.""" + # Unless set up as a shared proxy, don't make shared_memory_context + # a standard part of kwargs. This makes things easier for supplying + # simple functions. + if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): + kwargs['shared_memory_context'] = self.shared_memory_context + return Server.create(self, c, typeid, *args, **kwargs) + + def shutdown(self, c): + "Call unlink() on all tracked shared memory, terminate the Server." + self.shared_memory_context.unlink() + return Server.shutdown(self, c) + + def track_segment(self, c, segment_name): + "Adds the supplied shared memory block name to Server's tracker." + self.shared_memory_context.register_segment(segment_name) + + def release_segment(self, c, segment_name): + """Calls unlink() on the shared memory block with the supplied name + and removes it from the tracker instance inside the Server.""" + self.shared_memory_context.destroy_segment(segment_name) + + def list_segments(self, c): + """Returns a list of names of shared memory blocks that the Server + is currently tracking.""" + return self.shared_memory_context.segment_names + + + class SharedMemoryManager(BaseManager): + """Like SyncManager but uses SharedMemoryServer instead of Server. + + It provides methods for creating and returning SharedMemory instances + and for creating a list-like object (ShareableList) backed by shared + memory. It also provides methods that create and return Proxy Objects + that support synchronization across processes (i.e. multi-process-safe + locks and semaphores). + """ + + _Server = SharedMemoryServer + + def __init__(self, *args, **kwargs): + if os.name == "posix": + # bpo-36867: Ensure the resource_tracker is running before + # launching the manager process, so that concurrent + # shared_memory manipulation both in the manager and in the + # current process does not create two resource_tracker + # processes. + from . import resource_tracker + resource_tracker.ensure_running() + BaseManager.__init__(self, *args, **kwargs) + util.debug(f"{self.__class__.__name__} created by pid {getpid()}") + + def __del__(self): + util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") + + def get_server(self): + 'Better than monkeypatching for now; merge into Server ultimately' + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started SharedMemoryServer") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("SharedMemoryManager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + return self._Server(self._registry, self._address, + self._authkey, self._serializer) + + def SharedMemory(self, size): + """Returns a new SharedMemory instance with the specified size in + bytes, to be tracked by the manager.""" + with self._Client(self._address, authkey=self._authkey) as conn: + sms = shared_memory.SharedMemory(None, create=True, size=size) + try: + dispatch(conn, None, 'track_segment', (sms.name,)) + except BaseException as e: + sms.unlink() + raise e + return sms + + def ShareableList(self, sequence): + """Returns a new ShareableList instance populated with the values + from the input sequence, to be tracked by the manager.""" + with self._Client(self._address, authkey=self._authkey) as conn: + sl = shared_memory.ShareableList(sequence) + try: + dispatch(conn, None, 'track_segment', (sl.shm.name,)) + except BaseException as e: + sl.shm.unlink() + raise e + return sl |