diff options
author | orivej <orivej@yandex-team.ru> | 2022-02-10 16:45:01 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:01 +0300 |
commit | 2d37894b1b037cf24231090eda8589bbb44fb6fc (patch) | |
tree | be835aa92c6248212e705f25388ebafcf84bc7a1 /contrib/tools/python3/src/Lib/multiprocessing/managers.py | |
parent | 718c552901d703c502ccbefdfc3c9028d608b947 (diff) | |
download | ydb-2d37894b1b037cf24231090eda8589bbb44fb6fc.tar.gz |
Restoring authorship annotation for <orivej@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/multiprocessing/managers.py')
-rw-r--r-- | contrib/tools/python3/src/Lib/multiprocessing/managers.py | 2380 |
1 files changed, 1190 insertions, 1190 deletions
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/managers.py b/contrib/tools/python3/src/Lib/multiprocessing/managers.py index 263dcf53fc4..dfa566c6fc3 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/managers.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/managers.py @@ -1,37 +1,37 @@ -# +# # Module providing manager classes for dealing -# with shared objects -# -# multiprocessing/managers.py -# -# Copyright (c) 2006-2008, R Oudkerk -# Licensed to PSF under a Contributor Agreement. -# - +# with shared objects +# +# multiprocessing/managers.py +# +# Copyright (c) 2006-2008, R Oudkerk +# Licensed to PSF under a Contributor Agreement. +# + __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] - -# -# Imports -# - -import sys -import threading + +# +# Imports +# + +import sys +import threading import signal -import array -import queue -import time +import array +import queue +import time import types import os from os import getpid - -from traceback import format_exc - -from . import connection -from .context import reduction, get_spawning_popen, ProcessError -from . import pool -from . import process -from . import util -from . import get_context + +from traceback import format_exc + +from . import connection +from .context import reduction, get_spawning_popen, ProcessError +from . import pool +from . import process +from . import util +from . import get_context try: from . import shared_memory except ImportError: @@ -39,1183 +39,1183 @@ except ImportError: else: HAS_SHMEM = True __all__.append('SharedMemoryManager') - -# -# Register some things for pickling -# - -def reduce_array(a): - return array.array, (a.typecode, a.tobytes()) -reduction.register(array.array, reduce_array) - -view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] -if view_types[0] is not list: # only needed in Py3.0 - def rebuild_as_list(obj): - return list, (list(obj),) - for view_type in view_types: - reduction.register(view_type, rebuild_as_list) - -# -# Type for identifying shared objects -# - -class Token(object): - ''' + +# +# Register some things for pickling +# + +def reduce_array(a): + return array.array, (a.typecode, a.tobytes()) +reduction.register(array.array, reduce_array) + +view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] +if view_types[0] is not list: # only needed in Py3.0 + def rebuild_as_list(obj): + return list, (list(obj),) + for view_type in view_types: + reduction.register(view_type, rebuild_as_list) + +# +# Type for identifying shared objects +# + +class Token(object): + ''' Type to uniquely identify a shared object - ''' - __slots__ = ('typeid', 'address', 'id') - - def __init__(self, typeid, address, id): - (self.typeid, self.address, self.id) = (typeid, address, id) - - def __getstate__(self): - return (self.typeid, self.address, self.id) - - def __setstate__(self, state): - (self.typeid, self.address, self.id) = state - - def __repr__(self): - return '%s(typeid=%r, address=%r, id=%r)' % \ - (self.__class__.__name__, self.typeid, self.address, self.id) - -# -# Function for communication with a manager's server process -# - -def dispatch(c, id, methodname, args=(), kwds={}): - ''' - Send a message to manager using connection `c` and return response - ''' - c.send((id, methodname, args, kwds)) - kind, result = c.recv() - if kind == '#RETURN': - return result - raise convert_to_error(kind, result) - -def convert_to_error(kind, result): - if kind == '#ERROR': - return result - elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): - if not isinstance(result, str): - raise TypeError( - "Result {0!r} (kind '{1}') type is {2}, not str".format( - result, kind, type(result))) - if kind == '#UNSERIALIZABLE': - return RemoteError('Unserializable message: %s\n' % result) - else: - return RemoteError(result) - else: - return ValueError('Unrecognized message type {!r}'.format(kind)) - -class RemoteError(Exception): - def __str__(self): - return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) - -# -# Functions for finding the method names of an object -# - -def all_methods(obj): - ''' - Return a list of names of methods of `obj` - ''' - temp = [] - for name in dir(obj): - func = getattr(obj, name) - if callable(func): - temp.append(name) - return temp - -def public_methods(obj): - ''' - Return a list of names of methods of `obj` which do not start with '_' - ''' - return [name for name in all_methods(obj) if name[0] != '_'] - -# -# Server which is run in a process controlled by a manager -# - -class Server(object): - ''' - Server class which runs in a process controlled by a manager object - ''' - public = ['shutdown', 'create', 'accept_connection', 'get_methods', - 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] - - def __init__(self, registry, address, authkey, serializer): - if not isinstance(authkey, bytes): - raise TypeError( - "Authkey {0!r} is type {1!s}, not bytes".format( - authkey, type(authkey))) - self.registry = registry - self.authkey = process.AuthenticationString(authkey) - Listener, Client = listener_client[serializer] - - # do authentication later - self.listener = Listener(address=address, backlog=16) - self.address = self.listener.address - - self.id_to_obj = {'0': (None, ())} - self.id_to_refcount = {} - self.id_to_local_proxy_obj = {} - self.mutex = threading.Lock() - - def serve_forever(self): - ''' - Run the server forever - ''' - self.stop_event = threading.Event() - process.current_process()._manager_server = self - try: - accepter = threading.Thread(target=self.accepter) - accepter.daemon = True - accepter.start() - try: - while not self.stop_event.is_set(): - self.stop_event.wait(1) - except (KeyboardInterrupt, SystemExit): - pass - finally: - if sys.stdout != sys.__stdout__: # what about stderr? - util.debug('resetting stdout, stderr') - sys.stdout = sys.__stdout__ - sys.stderr = sys.__stderr__ - sys.exit(0) - - def accepter(self): - while True: - try: - c = self.listener.accept() - except OSError: - continue - t = threading.Thread(target=self.handle_request, args=(c,)) - t.daemon = True - t.start() - - def handle_request(self, c): - ''' - Handle a new connection - ''' - funcname = result = request = None - try: - connection.deliver_challenge(c, self.authkey) - connection.answer_challenge(c, self.authkey) - request = c.recv() - ignore, funcname, args, kwds = request - assert funcname in self.public, '%r unrecognized' % funcname - func = getattr(self, funcname) - except Exception: - msg = ('#TRACEBACK', format_exc()) - else: - try: - result = func(c, *args, **kwds) - except Exception: - msg = ('#TRACEBACK', format_exc()) - else: - msg = ('#RETURN', result) - try: - c.send(msg) - except Exception as e: - try: - c.send(('#TRACEBACK', format_exc())) - except Exception: - pass - util.info('Failure to send message: %r', msg) - util.info(' ... request was %r', request) - util.info(' ... exception was %r', e) - - c.close() - - def serve_client(self, conn): - ''' - Handle requests from the proxies in a particular process/thread - ''' - util.debug('starting server thread to service %r', - threading.current_thread().name) - - recv = conn.recv - send = conn.send - id_to_obj = self.id_to_obj - - while not self.stop_event.is_set(): - - try: - methodname = obj = None - request = recv() - ident, methodname, args, kwds = request - try: - obj, exposed, gettypeid = id_to_obj[ident] - except KeyError as ke: - try: - obj, exposed, gettypeid = \ - self.id_to_local_proxy_obj[ident] + ''' + __slots__ = ('typeid', 'address', 'id') + + def __init__(self, typeid, address, id): + (self.typeid, self.address, self.id) = (typeid, address, id) + + def __getstate__(self): + return (self.typeid, self.address, self.id) + + def __setstate__(self, state): + (self.typeid, self.address, self.id) = state + + def __repr__(self): + return '%s(typeid=%r, address=%r, id=%r)' % \ + (self.__class__.__name__, self.typeid, self.address, self.id) + +# +# Function for communication with a manager's server process +# + +def dispatch(c, id, methodname, args=(), kwds={}): + ''' + Send a message to manager using connection `c` and return response + ''' + c.send((id, methodname, args, kwds)) + kind, result = c.recv() + if kind == '#RETURN': + return result + raise convert_to_error(kind, result) + +def convert_to_error(kind, result): + if kind == '#ERROR': + return result + elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): + if not isinstance(result, str): + raise TypeError( + "Result {0!r} (kind '{1}') type is {2}, not str".format( + result, kind, type(result))) + if kind == '#UNSERIALIZABLE': + return RemoteError('Unserializable message: %s\n' % result) + else: + return RemoteError(result) + else: + return ValueError('Unrecognized message type {!r}'.format(kind)) + +class RemoteError(Exception): + def __str__(self): + return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) + +# +# Functions for finding the method names of an object +# + +def all_methods(obj): + ''' + Return a list of names of methods of `obj` + ''' + temp = [] + for name in dir(obj): + func = getattr(obj, name) + if callable(func): + temp.append(name) + return temp + +def public_methods(obj): + ''' + Return a list of names of methods of `obj` which do not start with '_' + ''' + return [name for name in all_methods(obj) if name[0] != '_'] + +# +# Server which is run in a process controlled by a manager +# + +class Server(object): + ''' + Server class which runs in a process controlled by a manager object + ''' + public = ['shutdown', 'create', 'accept_connection', 'get_methods', + 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] + + def __init__(self, registry, address, authkey, serializer): + if not isinstance(authkey, bytes): + raise TypeError( + "Authkey {0!r} is type {1!s}, not bytes".format( + authkey, type(authkey))) + self.registry = registry + self.authkey = process.AuthenticationString(authkey) + Listener, Client = listener_client[serializer] + + # do authentication later + self.listener = Listener(address=address, backlog=16) + self.address = self.listener.address + + self.id_to_obj = {'0': (None, ())} + self.id_to_refcount = {} + self.id_to_local_proxy_obj = {} + self.mutex = threading.Lock() + + def serve_forever(self): + ''' + Run the server forever + ''' + self.stop_event = threading.Event() + process.current_process()._manager_server = self + try: + accepter = threading.Thread(target=self.accepter) + accepter.daemon = True + accepter.start() + try: + while not self.stop_event.is_set(): + self.stop_event.wait(1) + except (KeyboardInterrupt, SystemExit): + pass + finally: + if sys.stdout != sys.__stdout__: # what about stderr? + util.debug('resetting stdout, stderr') + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + sys.exit(0) + + def accepter(self): + while True: + try: + c = self.listener.accept() + except OSError: + continue + t = threading.Thread(target=self.handle_request, args=(c,)) + t.daemon = True + t.start() + + def handle_request(self, c): + ''' + Handle a new connection + ''' + funcname = result = request = None + try: + connection.deliver_challenge(c, self.authkey) + connection.answer_challenge(c, self.authkey) + request = c.recv() + ignore, funcname, args, kwds = request + assert funcname in self.public, '%r unrecognized' % funcname + func = getattr(self, funcname) + except Exception: + msg = ('#TRACEBACK', format_exc()) + else: + try: + result = func(c, *args, **kwds) + except Exception: + msg = ('#TRACEBACK', format_exc()) + else: + msg = ('#RETURN', result) + try: + c.send(msg) + except Exception as e: + try: + c.send(('#TRACEBACK', format_exc())) + except Exception: + pass + util.info('Failure to send message: %r', msg) + util.info(' ... request was %r', request) + util.info(' ... exception was %r', e) + + c.close() + + def serve_client(self, conn): + ''' + Handle requests from the proxies in a particular process/thread + ''' + util.debug('starting server thread to service %r', + threading.current_thread().name) + + recv = conn.recv + send = conn.send + id_to_obj = self.id_to_obj + + while not self.stop_event.is_set(): + + try: + methodname = obj = None + request = recv() + ident, methodname, args, kwds = request + try: + obj, exposed, gettypeid = id_to_obj[ident] + except KeyError as ke: + try: + obj, exposed, gettypeid = \ + self.id_to_local_proxy_obj[ident] except KeyError: - raise ke - - if methodname not in exposed: - raise AttributeError( - 'method %r of %r object is not in exposed=%r' % - (methodname, type(obj), exposed) - ) - - function = getattr(obj, methodname) - - try: - res = function(*args, **kwds) - except Exception as e: - msg = ('#ERROR', e) - else: - typeid = gettypeid and gettypeid.get(methodname, None) - if typeid: - rident, rexposed = self.create(conn, typeid, res) - token = Token(typeid, self.address, rident) - msg = ('#PROXY', (rexposed, token)) - else: - msg = ('#RETURN', res) - - except AttributeError: - if methodname is None: - msg = ('#TRACEBACK', format_exc()) - else: - try: - fallback_func = self.fallback_mapping[methodname] - result = fallback_func( - self, conn, ident, obj, *args, **kwds - ) - msg = ('#RETURN', result) - except Exception: - msg = ('#TRACEBACK', format_exc()) - - except EOFError: - util.debug('got EOF -- exiting thread serving %r', - threading.current_thread().name) - sys.exit(0) - - except Exception: - msg = ('#TRACEBACK', format_exc()) - - try: - try: - send(msg) + raise ke + + if methodname not in exposed: + raise AttributeError( + 'method %r of %r object is not in exposed=%r' % + (methodname, type(obj), exposed) + ) + + function = getattr(obj, methodname) + + try: + res = function(*args, **kwds) + except Exception as e: + msg = ('#ERROR', e) + else: + typeid = gettypeid and gettypeid.get(methodname, None) + if typeid: + rident, rexposed = self.create(conn, typeid, res) + token = Token(typeid, self.address, rident) + msg = ('#PROXY', (rexposed, token)) + else: + msg = ('#RETURN', res) + + except AttributeError: + if methodname is None: + msg = ('#TRACEBACK', format_exc()) + else: + try: + fallback_func = self.fallback_mapping[methodname] + result = fallback_func( + self, conn, ident, obj, *args, **kwds + ) + msg = ('#RETURN', result) + except Exception: + msg = ('#TRACEBACK', format_exc()) + + except EOFError: + util.debug('got EOF -- exiting thread serving %r', + threading.current_thread().name) + sys.exit(0) + + except Exception: + msg = ('#TRACEBACK', format_exc()) + + try: + try: + send(msg) except Exception: - send(('#UNSERIALIZABLE', format_exc())) - except Exception as e: - util.info('exception in thread serving %r', - threading.current_thread().name) - util.info(' ... message was %r', msg) - util.info(' ... exception was %r', e) - conn.close() - sys.exit(1) - - def fallback_getvalue(self, conn, ident, obj): - return obj - - def fallback_str(self, conn, ident, obj): - return str(obj) - - def fallback_repr(self, conn, ident, obj): - return repr(obj) - - fallback_mapping = { - '__str__':fallback_str, - '__repr__':fallback_repr, - '#GETVALUE':fallback_getvalue - } - - def dummy(self, c): - pass - - def debug_info(self, c): - ''' - Return some info --- useful to spot problems with refcounting - ''' - # Perhaps include debug info about 'c'? - with self.mutex: - result = [] - keys = list(self.id_to_refcount.keys()) - keys.sort() - for ident in keys: - if ident != '0': - result.append(' %s: refcount=%s\n %s' % - (ident, self.id_to_refcount[ident], - str(self.id_to_obj[ident][0])[:75])) - return '\n'.join(result) - - def number_of_objects(self, c): - ''' - Number of shared objects - ''' - # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' - return len(self.id_to_refcount) - - def shutdown(self, c): - ''' - Shutdown this process - ''' - try: - util.debug('manager received shutdown message') - c.send(('#RETURN', None)) - except: - import traceback - traceback.print_exc() - finally: - self.stop_event.set() - + send(('#UNSERIALIZABLE', format_exc())) + except Exception as e: + util.info('exception in thread serving %r', + threading.current_thread().name) + util.info(' ... message was %r', msg) + util.info(' ... exception was %r', e) + conn.close() + sys.exit(1) + + def fallback_getvalue(self, conn, ident, obj): + return obj + + def fallback_str(self, conn, ident, obj): + return str(obj) + + def fallback_repr(self, conn, ident, obj): + return repr(obj) + + fallback_mapping = { + '__str__':fallback_str, + '__repr__':fallback_repr, + '#GETVALUE':fallback_getvalue + } + + def dummy(self, c): + pass + + def debug_info(self, c): + ''' + Return some info --- useful to spot problems with refcounting + ''' + # Perhaps include debug info about 'c'? + with self.mutex: + result = [] + keys = list(self.id_to_refcount.keys()) + keys.sort() + for ident in keys: + if ident != '0': + result.append(' %s: refcount=%s\n %s' % + (ident, self.id_to_refcount[ident], + str(self.id_to_obj[ident][0])[:75])) + return '\n'.join(result) + + def number_of_objects(self, c): + ''' + Number of shared objects + ''' + # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' + return len(self.id_to_refcount) + + def shutdown(self, c): + ''' + Shutdown this process + ''' + try: + util.debug('manager received shutdown message') + c.send(('#RETURN', None)) + except: + import traceback + traceback.print_exc() + finally: + self.stop_event.set() + def create(self, c, typeid, /, *args, **kwds): - ''' - Create a new shared object and return its id - ''' - with self.mutex: - callable, exposed, method_to_typeid, proxytype = \ - self.registry[typeid] - - if callable is None: - if kwds or (len(args) != 1): - raise ValueError( - "Without callable, must have one non-keyword argument") - obj = args[0] - else: - obj = callable(*args, **kwds) - - if exposed is None: - exposed = public_methods(obj) - if method_to_typeid is not None: - if not isinstance(method_to_typeid, dict): - raise TypeError( - "Method_to_typeid {0!r}: type {1!s}, not dict".format( - method_to_typeid, type(method_to_typeid))) - exposed = list(exposed) + list(method_to_typeid) - - ident = '%x' % id(obj) # convert to string because xmlrpclib - # only has 32 bit signed integers - util.debug('%r callable returned object with id %r', typeid, ident) - - self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) - if ident not in self.id_to_refcount: - self.id_to_refcount[ident] = 0 - - self.incref(c, ident) - return ident, tuple(exposed) - - def get_methods(self, c, token): - ''' - Return the methods of the shared object indicated by token - ''' - return tuple(self.id_to_obj[token.id][1]) - - def accept_connection(self, c, name): - ''' - Spawn a new thread to serve this connection - ''' - threading.current_thread().name = name - c.send(('#RETURN', None)) - self.serve_client(c) - - def incref(self, c, ident): - with self.mutex: - try: - self.id_to_refcount[ident] += 1 - except KeyError as ke: - # If no external references exist but an internal (to the - # manager) still does and a new external reference is created - # from it, restore the manager's tracking of it from the - # previously stashed internal ref. - if ident in self.id_to_local_proxy_obj: - self.id_to_refcount[ident] = 1 - self.id_to_obj[ident] = \ - self.id_to_local_proxy_obj[ident] - obj, exposed, gettypeid = self.id_to_obj[ident] - util.debug('Server re-enabled tracking & INCREF %r', ident) - else: - raise ke - - def decref(self, c, ident): - if ident not in self.id_to_refcount and \ - ident in self.id_to_local_proxy_obj: - util.debug('Server DECREF skipping %r', ident) - return - - with self.mutex: - if self.id_to_refcount[ident] <= 0: - raise AssertionError( - "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( - ident, self.id_to_obj[ident], - self.id_to_refcount[ident])) - self.id_to_refcount[ident] -= 1 - if self.id_to_refcount[ident] == 0: - del self.id_to_refcount[ident] - - if ident not in self.id_to_refcount: - # Two-step process in case the object turns out to contain other - # proxy objects (e.g. a managed list of managed lists). - # Otherwise, deleting self.id_to_obj[ident] would trigger the - # deleting of the stored value (another managed object) which would - # in turn attempt to acquire the mutex that is already held here. - self.id_to_obj[ident] = (None, (), None) # thread-safe - util.debug('disposing of obj with id %r', ident) - with self.mutex: - del self.id_to_obj[ident] - - -# -# Class to represent state of a manager -# - -class State(object): - __slots__ = ['value'] - INITIAL = 0 - STARTED = 1 - SHUTDOWN = 2 - -# -# Mapping from serializer name to Listener and Client types -# - -listener_client = { - 'pickle' : (connection.Listener, connection.Client), - 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) - } - -# -# Definition of BaseManager -# - -class BaseManager(object): - ''' - Base class for managers - ''' - _registry = {} - _Server = Server - - def __init__(self, address=None, authkey=None, serializer='pickle', - ctx=None): - if authkey is None: - authkey = process.current_process().authkey - self._address = address # XXX not final address if eg ('', 0) - self._authkey = process.AuthenticationString(authkey) - self._state = State() - self._state.value = State.INITIAL - self._serializer = serializer - self._Listener, self._Client = listener_client[serializer] - self._ctx = ctx or get_context() - - def get_server(self): - ''' - Return server object with serve_forever() method and address attribute - ''' - if self._state.value != State.INITIAL: - if self._state.value == State.STARTED: - raise ProcessError("Already started server") - elif self._state.value == State.SHUTDOWN: - raise ProcessError("Manager has shut down") - else: - raise ProcessError( - "Unknown state {!r}".format(self._state.value)) - return Server(self._registry, self._address, - self._authkey, self._serializer) - - def connect(self): - ''' - Connect manager object to the server process - ''' - Listener, Client = listener_client[self._serializer] - conn = Client(self._address, authkey=self._authkey) - dispatch(conn, None, 'dummy') - self._state.value = State.STARTED - - def start(self, initializer=None, initargs=()): - ''' - Spawn a server process for this manager object - ''' - if self._state.value != State.INITIAL: - if self._state.value == State.STARTED: - raise ProcessError("Already started server") - elif self._state.value == State.SHUTDOWN: - raise ProcessError("Manager has shut down") - else: - raise ProcessError( - "Unknown state {!r}".format(self._state.value)) - - if initializer is not None and not callable(initializer): - raise TypeError('initializer must be a callable') - - # pipe over which we will retrieve address of server - reader, writer = connection.Pipe(duplex=False) - - # spawn process which runs a server - self._process = self._ctx.Process( - target=type(self)._run_server, - args=(self._registry, self._address, self._authkey, - self._serializer, writer, initializer, initargs), - ) - ident = ':'.join(str(i) for i in self._process._identity) - self._process.name = type(self).__name__ + '-' + ident - self._process.start() - - # get address of server - writer.close() - self._address = reader.recv() - reader.close() - - # register a finalizer - self._state.value = State.STARTED - self.shutdown = util.Finalize( - self, type(self)._finalize_manager, - args=(self._process, self._address, self._authkey, - self._state, self._Client), - exitpriority=0 - ) - - @classmethod - def _run_server(cls, registry, address, authkey, serializer, writer, - initializer=None, initargs=()): - ''' - Create a server, report its address and run it - ''' + ''' + Create a new shared object and return its id + ''' + with self.mutex: + callable, exposed, method_to_typeid, proxytype = \ + self.registry[typeid] + + if callable is None: + if kwds or (len(args) != 1): + raise ValueError( + "Without callable, must have one non-keyword argument") + obj = args[0] + else: + obj = callable(*args, **kwds) + + if exposed is None: + exposed = public_methods(obj) + if method_to_typeid is not None: + if not isinstance(method_to_typeid, dict): + raise TypeError( + "Method_to_typeid {0!r}: type {1!s}, not dict".format( + method_to_typeid, type(method_to_typeid))) + exposed = list(exposed) + list(method_to_typeid) + + ident = '%x' % id(obj) # convert to string because xmlrpclib + # only has 32 bit signed integers + util.debug('%r callable returned object with id %r', typeid, ident) + + self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) + if ident not in self.id_to_refcount: + self.id_to_refcount[ident] = 0 + + self.incref(c, ident) + return ident, tuple(exposed) + + def get_methods(self, c, token): + ''' + Return the methods of the shared object indicated by token + ''' + return tuple(self.id_to_obj[token.id][1]) + + def accept_connection(self, c, name): + ''' + Spawn a new thread to serve this connection + ''' + threading.current_thread().name = name + c.send(('#RETURN', None)) + self.serve_client(c) + + def incref(self, c, ident): + with self.mutex: + try: + self.id_to_refcount[ident] += 1 + except KeyError as ke: + # If no external references exist but an internal (to the + # manager) still does and a new external reference is created + # from it, restore the manager's tracking of it from the + # previously stashed internal ref. + if ident in self.id_to_local_proxy_obj: + self.id_to_refcount[ident] = 1 + self.id_to_obj[ident] = \ + self.id_to_local_proxy_obj[ident] + obj, exposed, gettypeid = self.id_to_obj[ident] + util.debug('Server re-enabled tracking & INCREF %r', ident) + else: + raise ke + + def decref(self, c, ident): + if ident not in self.id_to_refcount and \ + ident in self.id_to_local_proxy_obj: + util.debug('Server DECREF skipping %r', ident) + return + + with self.mutex: + if self.id_to_refcount[ident] <= 0: + raise AssertionError( + "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( + ident, self.id_to_obj[ident], + self.id_to_refcount[ident])) + self.id_to_refcount[ident] -= 1 + if self.id_to_refcount[ident] == 0: + del self.id_to_refcount[ident] + + if ident not in self.id_to_refcount: + # Two-step process in case the object turns out to contain other + # proxy objects (e.g. a managed list of managed lists). + # Otherwise, deleting self.id_to_obj[ident] would trigger the + # deleting of the stored value (another managed object) which would + # in turn attempt to acquire the mutex that is already held here. + self.id_to_obj[ident] = (None, (), None) # thread-safe + util.debug('disposing of obj with id %r', ident) + with self.mutex: + del self.id_to_obj[ident] + + +# +# Class to represent state of a manager +# + +class State(object): + __slots__ = ['value'] + INITIAL = 0 + STARTED = 1 + SHUTDOWN = 2 + +# +# Mapping from serializer name to Listener and Client types +# + +listener_client = { + 'pickle' : (connection.Listener, connection.Client), + 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) + } + +# +# Definition of BaseManager +# + +class BaseManager(object): + ''' + Base class for managers + ''' + _registry = {} + _Server = Server + + def __init__(self, address=None, authkey=None, serializer='pickle', + ctx=None): + if authkey is None: + authkey = process.current_process().authkey + self._address = address # XXX not final address if eg ('', 0) + self._authkey = process.AuthenticationString(authkey) + self._state = State() + self._state.value = State.INITIAL + self._serializer = serializer + self._Listener, self._Client = listener_client[serializer] + self._ctx = ctx or get_context() + + def get_server(self): + ''' + Return server object with serve_forever() method and address attribute + ''' + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + return Server(self._registry, self._address, + self._authkey, self._serializer) + + def connect(self): + ''' + Connect manager object to the server process + ''' + Listener, Client = listener_client[self._serializer] + conn = Client(self._address, authkey=self._authkey) + dispatch(conn, None, 'dummy') + self._state.value = State.STARTED + + def start(self, initializer=None, initargs=()): + ''' + Spawn a server process for this manager object + ''' + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + + if initializer is not None and not callable(initializer): + raise TypeError('initializer must be a callable') + + # pipe over which we will retrieve address of server + reader, writer = connection.Pipe(duplex=False) + + # spawn process which runs a server + self._process = self._ctx.Process( + target=type(self)._run_server, + args=(self._registry, self._address, self._authkey, + self._serializer, writer, initializer, initargs), + ) + ident = ':'.join(str(i) for i in self._process._identity) + self._process.name = type(self).__name__ + '-' + ident + self._process.start() + + # get address of server + writer.close() + self._address = reader.recv() + reader.close() + + # register a finalizer + self._state.value = State.STARTED + self.shutdown = util.Finalize( + self, type(self)._finalize_manager, + args=(self._process, self._address, self._authkey, + self._state, self._Client), + exitpriority=0 + ) + + @classmethod + def _run_server(cls, registry, address, authkey, serializer, writer, + initializer=None, initargs=()): + ''' + Create a server, report its address and run it + ''' # bpo-36368: protect server process from KeyboardInterrupt signals signal.signal(signal.SIGINT, signal.SIG_IGN) - if initializer is not None: - initializer(*initargs) - - # create server - server = cls._Server(registry, address, authkey, serializer) - - # inform parent process of the server's address - writer.send(server.address) - writer.close() - - # run the manager - util.info('manager serving at %r', server.address) - server.serve_forever() - + if initializer is not None: + initializer(*initargs) + + # create server + server = cls._Server(registry, address, authkey, serializer) + + # inform parent process of the server's address + writer.send(server.address) + writer.close() + + # run the manager + util.info('manager serving at %r', server.address) + server.serve_forever() + def _create(self, typeid, /, *args, **kwds): - ''' - Create a new shared object; return the token and exposed tuple - ''' - assert self._state.value == State.STARTED, 'server not yet started' - conn = self._Client(self._address, authkey=self._authkey) - try: - id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) - finally: - conn.close() - return Token(typeid, self._address, id), exposed - - def join(self, timeout=None): - ''' - Join the manager process (if it has been spawned) - ''' - if self._process is not None: - self._process.join(timeout) - if not self._process.is_alive(): - self._process = None - - def _debug_info(self): - ''' - Return some info about the servers shared objects and connections - ''' - conn = self._Client(self._address, authkey=self._authkey) - try: - return dispatch(conn, None, 'debug_info') - finally: - conn.close() - - def _number_of_objects(self): - ''' - Return the number of shared objects - ''' - conn = self._Client(self._address, authkey=self._authkey) - try: - return dispatch(conn, None, 'number_of_objects') - finally: - conn.close() - - def __enter__(self): - if self._state.value == State.INITIAL: - self.start() - if self._state.value != State.STARTED: - if self._state.value == State.INITIAL: - raise ProcessError("Unable to start server") - elif self._state.value == State.SHUTDOWN: - raise ProcessError("Manager has shut down") - else: - raise ProcessError( - "Unknown state {!r}".format(self._state.value)) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown() - - @staticmethod - def _finalize_manager(process, address, authkey, state, _Client): - ''' - Shutdown the manager process; will be registered as a finalizer - ''' - if process.is_alive(): - util.info('sending shutdown message to manager') - try: - conn = _Client(address, authkey=authkey) - try: - dispatch(conn, None, 'shutdown') - finally: - conn.close() - except Exception: - pass - - process.join(timeout=1.0) - if process.is_alive(): - util.info('manager still alive') - if hasattr(process, 'terminate'): - util.info('trying to `terminate()` manager process') - process.terminate() - process.join(timeout=0.1) - if process.is_alive(): - util.info('manager still alive after terminate') - - state.value = State.SHUTDOWN - try: - del BaseProxy._address_to_local[address] - except KeyError: - pass - - @property - def address(self): - return self._address - - @classmethod - def register(cls, typeid, callable=None, proxytype=None, exposed=None, - method_to_typeid=None, create_method=True): - ''' - Register a typeid with the manager type - ''' - if '_registry' not in cls.__dict__: - cls._registry = cls._registry.copy() - - if proxytype is None: - proxytype = AutoProxy - - exposed = exposed or getattr(proxytype, '_exposed_', None) - - method_to_typeid = method_to_typeid or \ - getattr(proxytype, '_method_to_typeid_', None) - - if method_to_typeid: - for key, value in list(method_to_typeid.items()): # isinstance? - assert type(key) is str, '%r is not a string' % key - assert type(value) is str, '%r is not a string' % value - - cls._registry[typeid] = ( - callable, exposed, method_to_typeid, proxytype - ) - - if create_method: + ''' + Create a new shared object; return the token and exposed tuple + ''' + assert self._state.value == State.STARTED, 'server not yet started' + conn = self._Client(self._address, authkey=self._authkey) + try: + id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) + finally: + conn.close() + return Token(typeid, self._address, id), exposed + + def join(self, timeout=None): + ''' + Join the manager process (if it has been spawned) + ''' + if self._process is not None: + self._process.join(timeout) + if not self._process.is_alive(): + self._process = None + + def _debug_info(self): + ''' + Return some info about the servers shared objects and connections + ''' + conn = self._Client(self._address, authkey=self._authkey) + try: + return dispatch(conn, None, 'debug_info') + finally: + conn.close() + + def _number_of_objects(self): + ''' + Return the number of shared objects + ''' + conn = self._Client(self._address, authkey=self._authkey) + try: + return dispatch(conn, None, 'number_of_objects') + finally: + conn.close() + + def __enter__(self): + if self._state.value == State.INITIAL: + self.start() + if self._state.value != State.STARTED: + if self._state.value == State.INITIAL: + raise ProcessError("Unable to start server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + + @staticmethod + def _finalize_manager(process, address, authkey, state, _Client): + ''' + Shutdown the manager process; will be registered as a finalizer + ''' + if process.is_alive(): + util.info('sending shutdown message to manager') + try: + conn = _Client(address, authkey=authkey) + try: + dispatch(conn, None, 'shutdown') + finally: + conn.close() + except Exception: + pass + + process.join(timeout=1.0) + if process.is_alive(): + util.info('manager still alive') + if hasattr(process, 'terminate'): + util.info('trying to `terminate()` manager process') + process.terminate() + process.join(timeout=0.1) + if process.is_alive(): + util.info('manager still alive after terminate') + + state.value = State.SHUTDOWN + try: + del BaseProxy._address_to_local[address] + except KeyError: + pass + + @property + def address(self): + return self._address + + @classmethod + def register(cls, typeid, callable=None, proxytype=None, exposed=None, + method_to_typeid=None, create_method=True): + ''' + Register a typeid with the manager type + ''' + if '_registry' not in cls.__dict__: + cls._registry = cls._registry.copy() + + if proxytype is None: + proxytype = AutoProxy + + exposed = exposed or getattr(proxytype, '_exposed_', None) + + method_to_typeid = method_to_typeid or \ + getattr(proxytype, '_method_to_typeid_', None) + + if method_to_typeid: + for key, value in list(method_to_typeid.items()): # isinstance? + assert type(key) is str, '%r is not a string' % key + assert type(value) is str, '%r is not a string' % value + + cls._registry[typeid] = ( + callable, exposed, method_to_typeid, proxytype + ) + + if create_method: def temp(self, /, *args, **kwds): - util.debug('requesting creation of a shared %r object', typeid) - token, exp = self._create(typeid, *args, **kwds) - proxy = proxytype( - token, self._serializer, manager=self, - authkey=self._authkey, exposed=exp - ) - conn = self._Client(token.address, authkey=self._authkey) - dispatch(conn, None, 'decref', (token.id,)) - return proxy - temp.__name__ = typeid - setattr(cls, typeid, temp) - -# -# Subclass of set which get cleared after a fork -# - -class ProcessLocalSet(set): - def __init__(self): - util.register_after_fork(self, lambda obj: obj.clear()) - def __reduce__(self): - return type(self), () - -# -# Definition of BaseProxy -# - -class BaseProxy(object): - ''' - A base for proxies of shared objects - ''' - _address_to_local = {} - _mutex = util.ForkAwareThreadLock() - - def __init__(self, token, serializer, manager=None, - authkey=None, exposed=None, incref=True, manager_owned=False): - with BaseProxy._mutex: - tls_idset = BaseProxy._address_to_local.get(token.address, None) - if tls_idset is None: - tls_idset = util.ForkAwareLocal(), ProcessLocalSet() - BaseProxy._address_to_local[token.address] = tls_idset - - # self._tls is used to record the connection used by this - # thread to communicate with the manager at token.address - self._tls = tls_idset[0] - - # self._idset is used to record the identities of all shared - # objects for which the current process owns references and - # which are in the manager at token.address - self._idset = tls_idset[1] - - self._token = token - self._id = self._token.id - self._manager = manager - self._serializer = serializer - self._Client = listener_client[serializer][1] - - # Should be set to True only when a proxy object is being created - # on the manager server; primary use case: nested proxy objects. - # RebuildProxy detects when a proxy is being created on the manager - # and sets this value appropriately. - self._owned_by_manager = manager_owned - - if authkey is not None: - self._authkey = process.AuthenticationString(authkey) - elif self._manager is not None: - self._authkey = self._manager._authkey - else: - self._authkey = process.current_process().authkey - - if incref: - self._incref() - - util.register_after_fork(self, BaseProxy._after_fork) - - def _connect(self): - util.debug('making connection to manager') - name = process.current_process().name - if threading.current_thread().name != 'MainThread': - name += '|' + threading.current_thread().name - conn = self._Client(self._token.address, authkey=self._authkey) - dispatch(conn, None, 'accept_connection', (name,)) - self._tls.connection = conn - - def _callmethod(self, methodname, args=(), kwds={}): - ''' + util.debug('requesting creation of a shared %r object', typeid) + token, exp = self._create(typeid, *args, **kwds) + proxy = proxytype( + token, self._serializer, manager=self, + authkey=self._authkey, exposed=exp + ) + conn = self._Client(token.address, authkey=self._authkey) + dispatch(conn, None, 'decref', (token.id,)) + return proxy + temp.__name__ = typeid + setattr(cls, typeid, temp) + +# +# Subclass of set which get cleared after a fork +# + +class ProcessLocalSet(set): + def __init__(self): + util.register_after_fork(self, lambda obj: obj.clear()) + def __reduce__(self): + return type(self), () + +# +# Definition of BaseProxy +# + +class BaseProxy(object): + ''' + A base for proxies of shared objects + ''' + _address_to_local = {} + _mutex = util.ForkAwareThreadLock() + + def __init__(self, token, serializer, manager=None, + authkey=None, exposed=None, incref=True, manager_owned=False): + with BaseProxy._mutex: + tls_idset = BaseProxy._address_to_local.get(token.address, None) + if tls_idset is None: + tls_idset = util.ForkAwareLocal(), ProcessLocalSet() + BaseProxy._address_to_local[token.address] = tls_idset + + # self._tls is used to record the connection used by this + # thread to communicate with the manager at token.address + self._tls = tls_idset[0] + + # self._idset is used to record the identities of all shared + # objects for which the current process owns references and + # which are in the manager at token.address + self._idset = tls_idset[1] + + self._token = token + self._id = self._token.id + self._manager = manager + self._serializer = serializer + self._Client = listener_client[serializer][1] + + # Should be set to True only when a proxy object is being created + # on the manager server; primary use case: nested proxy objects. + # RebuildProxy detects when a proxy is being created on the manager + # and sets this value appropriately. + self._owned_by_manager = manager_owned + + if authkey is not None: + self._authkey = process.AuthenticationString(authkey) + elif self._manager is not None: + self._authkey = self._manager._authkey + else: + self._authkey = process.current_process().authkey + + if incref: + self._incref() + + util.register_after_fork(self, BaseProxy._after_fork) + + def _connect(self): + util.debug('making connection to manager') + name = process.current_process().name + if threading.current_thread().name != 'MainThread': + name += '|' + threading.current_thread().name + conn = self._Client(self._token.address, authkey=self._authkey) + dispatch(conn, None, 'accept_connection', (name,)) + self._tls.connection = conn + + def _callmethod(self, methodname, args=(), kwds={}): + ''' Try to call a method of the referent and return a copy of the result - ''' - try: - conn = self._tls.connection - except AttributeError: - util.debug('thread %r does not own a connection', - threading.current_thread().name) - self._connect() - conn = self._tls.connection - - conn.send((self._id, methodname, args, kwds)) - kind, result = conn.recv() - - if kind == '#RETURN': - return result - elif kind == '#PROXY': - exposed, token = result - proxytype = self._manager._registry[token.typeid][-1] - token.address = self._token.address - proxy = proxytype( - token, self._serializer, manager=self._manager, - authkey=self._authkey, exposed=exposed - ) - conn = self._Client(token.address, authkey=self._authkey) - dispatch(conn, None, 'decref', (token.id,)) - return proxy - raise convert_to_error(kind, result) - - def _getvalue(self): - ''' - Get a copy of the value of the referent - ''' - return self._callmethod('#GETVALUE') - - def _incref(self): - if self._owned_by_manager: - util.debug('owned_by_manager skipped INCREF of %r', self._token.id) - return - - conn = self._Client(self._token.address, authkey=self._authkey) - dispatch(conn, None, 'incref', (self._id,)) - util.debug('INCREF %r', self._token.id) - - self._idset.add(self._id) - - state = self._manager and self._manager._state - - self._close = util.Finalize( - self, BaseProxy._decref, - args=(self._token, self._authkey, state, - self._tls, self._idset, self._Client), - exitpriority=10 - ) - - @staticmethod - def _decref(token, authkey, state, tls, idset, _Client): - idset.discard(token.id) - - # check whether manager is still alive - if state is None or state.value == State.STARTED: - # tell manager this process no longer cares about referent - try: - util.debug('DECREF %r', token.id) - conn = _Client(token.address, authkey=authkey) - dispatch(conn, None, 'decref', (token.id,)) - except Exception as e: - util.debug('... decref failed %s', e) - - else: - util.debug('DECREF %r -- manager already shutdown', token.id) - - # check whether we can close this thread's connection because - # the process owns no more references to objects for this manager - if not idset and hasattr(tls, 'connection'): - util.debug('thread %r has no more proxies so closing conn', - threading.current_thread().name) - tls.connection.close() - del tls.connection - - def _after_fork(self): - self._manager = None - try: - self._incref() - except Exception as e: - # the proxy may just be for a manager which has shutdown - util.info('incref failed: %s' % e) - - def __reduce__(self): - kwds = {} - if get_spawning_popen() is not None: - kwds['authkey'] = self._authkey - - if getattr(self, '_isauto', False): - kwds['exposed'] = self._exposed_ - return (RebuildProxy, - (AutoProxy, self._token, self._serializer, kwds)) - else: - return (RebuildProxy, - (type(self), self._token, self._serializer, kwds)) - - def __deepcopy__(self, memo): - return self._getvalue() - - def __repr__(self): - return '<%s object, typeid %r at %#x>' % \ - (type(self).__name__, self._token.typeid, id(self)) - - def __str__(self): - ''' - Return representation of the referent (or a fall-back if that fails) - ''' - try: - return self._callmethod('__repr__') - except Exception: - return repr(self)[:-1] + "; '__str__()' failed>" - -# -# Function used for unpickling -# - -def RebuildProxy(func, token, serializer, kwds): - ''' - Function used for unpickling proxy objects. - ''' - server = getattr(process.current_process(), '_manager_server', None) - if server and server.address == token.address: - util.debug('Rebuild a proxy owned by manager, token=%r', token) - kwds['manager_owned'] = True - if token.id not in server.id_to_local_proxy_obj: - server.id_to_local_proxy_obj[token.id] = \ - server.id_to_obj[token.id] - incref = ( - kwds.pop('incref', True) and - not getattr(process.current_process(), '_inheriting', False) - ) - return func(token, serializer, incref=incref, **kwds) - -# -# Functions to create proxies and proxy types -# - -def MakeProxyType(name, exposed, _cache={}): - ''' - Return a proxy type whose methods are given by `exposed` - ''' - exposed = tuple(exposed) - try: - return _cache[(name, exposed)] - except KeyError: - pass - - dic = {} - - for meth in exposed: + ''' + try: + conn = self._tls.connection + except AttributeError: + util.debug('thread %r does not own a connection', + threading.current_thread().name) + self._connect() + conn = self._tls.connection + + conn.send((self._id, methodname, args, kwds)) + kind, result = conn.recv() + + if kind == '#RETURN': + return result + elif kind == '#PROXY': + exposed, token = result + proxytype = self._manager._registry[token.typeid][-1] + token.address = self._token.address + proxy = proxytype( + token, self._serializer, manager=self._manager, + authkey=self._authkey, exposed=exposed + ) + conn = self._Client(token.address, authkey=self._authkey) + dispatch(conn, None, 'decref', (token.id,)) + return proxy + raise convert_to_error(kind, result) + + def _getvalue(self): + ''' + Get a copy of the value of the referent + ''' + return self._callmethod('#GETVALUE') + + def _incref(self): + if self._owned_by_manager: + util.debug('owned_by_manager skipped INCREF of %r', self._token.id) + return + + conn = self._Client(self._token.address, authkey=self._authkey) + dispatch(conn, None, 'incref', (self._id,)) + util.debug('INCREF %r', self._token.id) + + self._idset.add(self._id) + + state = self._manager and self._manager._state + + self._close = util.Finalize( + self, BaseProxy._decref, + args=(self._token, self._authkey, state, + self._tls, self._idset, self._Client), + exitpriority=10 + ) + + @staticmethod + def _decref(token, authkey, state, tls, idset, _Client): + idset.discard(token.id) + + # check whether manager is still alive + if state is None or state.value == State.STARTED: + # tell manager this process no longer cares about referent + try: + util.debug('DECREF %r', token.id) + conn = _Client(token.address, authkey=authkey) + dispatch(conn, None, 'decref', (token.id,)) + except Exception as e: + util.debug('... decref failed %s', e) + + else: + util.debug('DECREF %r -- manager already shutdown', token.id) + + # check whether we can close this thread's connection because + # the process owns no more references to objects for this manager + if not idset and hasattr(tls, 'connection'): + util.debug('thread %r has no more proxies so closing conn', + threading.current_thread().name) + tls.connection.close() + del tls.connection + + def _after_fork(self): + self._manager = None + try: + self._incref() + except Exception as e: + # the proxy may just be for a manager which has shutdown + util.info('incref failed: %s' % e) + + def __reduce__(self): + kwds = {} + if get_spawning_popen() is not None: + kwds['authkey'] = self._authkey + + if getattr(self, '_isauto', False): + kwds['exposed'] = self._exposed_ + return (RebuildProxy, + (AutoProxy, self._token, self._serializer, kwds)) + else: + return (RebuildProxy, + (type(self), self._token, self._serializer, kwds)) + + def __deepcopy__(self, memo): + return self._getvalue() + + def __repr__(self): + return '<%s object, typeid %r at %#x>' % \ + (type(self).__name__, self._token.typeid, id(self)) + + def __str__(self): + ''' + Return representation of the referent (or a fall-back if that fails) + ''' + try: + return self._callmethod('__repr__') + except Exception: + return repr(self)[:-1] + "; '__str__()' failed>" + +# +# Function used for unpickling +# + +def RebuildProxy(func, token, serializer, kwds): + ''' + Function used for unpickling proxy objects. + ''' + server = getattr(process.current_process(), '_manager_server', None) + if server and server.address == token.address: + util.debug('Rebuild a proxy owned by manager, token=%r', token) + kwds['manager_owned'] = True + if token.id not in server.id_to_local_proxy_obj: + server.id_to_local_proxy_obj[token.id] = \ + server.id_to_obj[token.id] + incref = ( + kwds.pop('incref', True) and + not getattr(process.current_process(), '_inheriting', False) + ) + return func(token, serializer, incref=incref, **kwds) + +# +# Functions to create proxies and proxy types +# + +def MakeProxyType(name, exposed, _cache={}): + ''' + Return a proxy type whose methods are given by `exposed` + ''' + exposed = tuple(exposed) + try: + return _cache[(name, exposed)] + except KeyError: + pass + + dic = {} + + for meth in exposed: exec('''def %s(self, /, *args, **kwds): - return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) - - ProxyType = type(name, (BaseProxy,), dic) - ProxyType._exposed_ = exposed - _cache[(name, exposed)] = ProxyType - return ProxyType - - -def AutoProxy(token, serializer, manager=None, authkey=None, + return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) + + ProxyType = type(name, (BaseProxy,), dic) + ProxyType._exposed_ = exposed + _cache[(name, exposed)] = ProxyType + return ProxyType + + +def AutoProxy(token, serializer, manager=None, authkey=None, exposed=None, incref=True, manager_owned=False): - ''' - Return an auto-proxy for `token` - ''' - _Client = listener_client[serializer][1] - - if exposed is None: - conn = _Client(token.address, authkey=authkey) - try: - exposed = dispatch(conn, None, 'get_methods', (token,)) - finally: - conn.close() - - if authkey is None and manager is not None: - authkey = manager._authkey - if authkey is None: - authkey = process.current_process().authkey - - ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) - proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, + ''' + Return an auto-proxy for `token` + ''' + _Client = listener_client[serializer][1] + + if exposed is None: + conn = _Client(token.address, authkey=authkey) + try: + exposed = dispatch(conn, None, 'get_methods', (token,)) + finally: + conn.close() + + if authkey is None and manager is not None: + authkey = manager._authkey + if authkey is None: + authkey = process.current_process().authkey + + ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) + proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, incref=incref, manager_owned=manager_owned) - proxy._isauto = True - return proxy - -# -# Types/callables which we will register with SyncManager -# - -class Namespace(object): + proxy._isauto = True + return proxy + +# +# Types/callables which we will register with SyncManager +# + +class Namespace(object): def __init__(self, /, **kwds): - self.__dict__.update(kwds) - def __repr__(self): - items = list(self.__dict__.items()) - temp = [] - for name, value in items: - if not name.startswith('_'): - temp.append('%s=%r' % (name, value)) - temp.sort() - return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) - -class Value(object): - def __init__(self, typecode, value, lock=True): - self._typecode = typecode - self._value = value - def get(self): - return self._value - def set(self, value): - self._value = value - def __repr__(self): - return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) - value = property(get, set) - -def Array(typecode, sequence, lock=True): - return array.array(typecode, sequence) - -# -# Proxy types used by SyncManager -# - -class IteratorProxy(BaseProxy): - _exposed_ = ('__next__', 'send', 'throw', 'close') - def __iter__(self): - return self - def __next__(self, *args): - return self._callmethod('__next__', args) - def send(self, *args): - return self._callmethod('send', args) - def throw(self, *args): - return self._callmethod('throw', args) - def close(self, *args): - return self._callmethod('close', args) - - -class AcquirerProxy(BaseProxy): - _exposed_ = ('acquire', 'release') - def acquire(self, blocking=True, timeout=None): - args = (blocking,) if timeout is None else (blocking, timeout) - return self._callmethod('acquire', args) - def release(self): - return self._callmethod('release') - def __enter__(self): - return self._callmethod('acquire') - def __exit__(self, exc_type, exc_val, exc_tb): - return self._callmethod('release') - - -class ConditionProxy(AcquirerProxy): - _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') - def wait(self, timeout=None): - return self._callmethod('wait', (timeout,)) - def notify(self, n=1): - return self._callmethod('notify', (n,)) - def notify_all(self): - return self._callmethod('notify_all') - def wait_for(self, predicate, timeout=None): - result = predicate() - if result: - return result - if timeout is not None: - endtime = time.monotonic() + timeout - else: - endtime = None - waittime = None - while not result: - if endtime is not None: - waittime = endtime - time.monotonic() - if waittime <= 0: - break - self.wait(waittime) - result = predicate() - return result - - -class EventProxy(BaseProxy): - _exposed_ = ('is_set', 'set', 'clear', 'wait') - def is_set(self): - return self._callmethod('is_set') - def set(self): - return self._callmethod('set') - def clear(self): - return self._callmethod('clear') - def wait(self, timeout=None): - return self._callmethod('wait', (timeout,)) - - -class BarrierProxy(BaseProxy): - _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') - def wait(self, timeout=None): - return self._callmethod('wait', (timeout,)) - def abort(self): - return self._callmethod('abort') - def reset(self): - return self._callmethod('reset') - @property - def parties(self): - return self._callmethod('__getattribute__', ('parties',)) - @property - def n_waiting(self): - return self._callmethod('__getattribute__', ('n_waiting',)) - @property - def broken(self): - return self._callmethod('__getattribute__', ('broken',)) - - -class NamespaceProxy(BaseProxy): - _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') - def __getattr__(self, key): - if key[0] == '_': - return object.__getattribute__(self, key) - callmethod = object.__getattribute__(self, '_callmethod') - return callmethod('__getattribute__', (key,)) - def __setattr__(self, key, value): - if key[0] == '_': - return object.__setattr__(self, key, value) - callmethod = object.__getattribute__(self, '_callmethod') - return callmethod('__setattr__', (key, value)) - def __delattr__(self, key): - if key[0] == '_': - return object.__delattr__(self, key) - callmethod = object.__getattribute__(self, '_callmethod') - return callmethod('__delattr__', (key,)) - - -class ValueProxy(BaseProxy): - _exposed_ = ('get', 'set') - def get(self): - return self._callmethod('get') - def set(self, value): - return self._callmethod('set', (value,)) - value = property(get, set) - + self.__dict__.update(kwds) + def __repr__(self): + items = list(self.__dict__.items()) + temp = [] + for name, value in items: + if not name.startswith('_'): + temp.append('%s=%r' % (name, value)) + temp.sort() + return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) + +class Value(object): + def __init__(self, typecode, value, lock=True): + self._typecode = typecode + self._value = value + def get(self): + return self._value + def set(self, value): + self._value = value + def __repr__(self): + return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) + value = property(get, set) + +def Array(typecode, sequence, lock=True): + return array.array(typecode, sequence) + +# +# Proxy types used by SyncManager +# + +class IteratorProxy(BaseProxy): + _exposed_ = ('__next__', 'send', 'throw', 'close') + def __iter__(self): + return self + def __next__(self, *args): + return self._callmethod('__next__', args) + def send(self, *args): + return self._callmethod('send', args) + def throw(self, *args): + return self._callmethod('throw', args) + def close(self, *args): + return self._callmethod('close', args) + + +class AcquirerProxy(BaseProxy): + _exposed_ = ('acquire', 'release') + def acquire(self, blocking=True, timeout=None): + args = (blocking,) if timeout is None else (blocking, timeout) + return self._callmethod('acquire', args) + def release(self): + return self._callmethod('release') + def __enter__(self): + return self._callmethod('acquire') + def __exit__(self, exc_type, exc_val, exc_tb): + return self._callmethod('release') + + +class ConditionProxy(AcquirerProxy): + _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + def notify(self, n=1): + return self._callmethod('notify', (n,)) + def notify_all(self): + return self._callmethod('notify_all') + def wait_for(self, predicate, timeout=None): + result = predicate() + if result: + return result + if timeout is not None: + endtime = time.monotonic() + timeout + else: + endtime = None + waittime = None + while not result: + if endtime is not None: + waittime = endtime - time.monotonic() + if waittime <= 0: + break + self.wait(waittime) + result = predicate() + return result + + +class EventProxy(BaseProxy): + _exposed_ = ('is_set', 'set', 'clear', 'wait') + def is_set(self): + return self._callmethod('is_set') + def set(self): + return self._callmethod('set') + def clear(self): + return self._callmethod('clear') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + + +class BarrierProxy(BaseProxy): + _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + def abort(self): + return self._callmethod('abort') + def reset(self): + return self._callmethod('reset') + @property + def parties(self): + return self._callmethod('__getattribute__', ('parties',)) + @property + def n_waiting(self): + return self._callmethod('__getattribute__', ('n_waiting',)) + @property + def broken(self): + return self._callmethod('__getattribute__', ('broken',)) + + +class NamespaceProxy(BaseProxy): + _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') + def __getattr__(self, key): + if key[0] == '_': + return object.__getattribute__(self, key) + callmethod = object.__getattribute__(self, '_callmethod') + return callmethod('__getattribute__', (key,)) + def __setattr__(self, key, value): + if key[0] == '_': + return object.__setattr__(self, key, value) + callmethod = object.__getattribute__(self, '_callmethod') + return callmethod('__setattr__', (key, value)) + def __delattr__(self, key): + if key[0] == '_': + return object.__delattr__(self, key) + callmethod = object.__getattribute__(self, '_callmethod') + return callmethod('__delattr__', (key,)) + + +class ValueProxy(BaseProxy): + _exposed_ = ('get', 'set') + def get(self): + return self._callmethod('get') + def set(self, value): + return self._callmethod('set', (value,)) + value = property(get, set) + __class_getitem__ = classmethod(types.GenericAlias) - - -BaseListProxy = MakeProxyType('BaseListProxy', ( - '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', - '__mul__', '__reversed__', '__rmul__', '__setitem__', - 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', - 'reverse', 'sort', '__imul__' - )) -class ListProxy(BaseListProxy): - def __iadd__(self, value): - self._callmethod('extend', (value,)) - return self - def __imul__(self, value): - self._callmethod('__imul__', (value,)) - return self - - -DictProxy = MakeProxyType('DictProxy', ( - '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', - '__setitem__', 'clear', 'copy', 'get', 'items', - 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' - )) -DictProxy._method_to_typeid_ = { - '__iter__': 'Iterator', - } - - -ArrayProxy = MakeProxyType('ArrayProxy', ( - '__len__', '__getitem__', '__setitem__' - )) - - -BasePoolProxy = MakeProxyType('PoolProxy', ( - 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', - 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', - )) -BasePoolProxy._method_to_typeid_ = { - 'apply_async': 'AsyncResult', - 'map_async': 'AsyncResult', - 'starmap_async': 'AsyncResult', - 'imap': 'Iterator', - 'imap_unordered': 'Iterator' - } -class PoolProxy(BasePoolProxy): - def __enter__(self): - return self - def __exit__(self, exc_type, exc_val, exc_tb): - self.terminate() - -# -# Definition of SyncManager -# - -class SyncManager(BaseManager): - ''' - Subclass of `BaseManager` which supports a number of shared object types. - - The types registered are those intended for the synchronization - of threads, plus `dict`, `list` and `Namespace`. - - The `multiprocessing.Manager()` function creates started instances of - this class. - ''' - -SyncManager.register('Queue', queue.Queue) -SyncManager.register('JoinableQueue', queue.Queue) -SyncManager.register('Event', threading.Event, EventProxy) -SyncManager.register('Lock', threading.Lock, AcquirerProxy) -SyncManager.register('RLock', threading.RLock, AcquirerProxy) -SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) -SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, - AcquirerProxy) -SyncManager.register('Condition', threading.Condition, ConditionProxy) -SyncManager.register('Barrier', threading.Barrier, BarrierProxy) -SyncManager.register('Pool', pool.Pool, PoolProxy) -SyncManager.register('list', list, ListProxy) -SyncManager.register('dict', dict, DictProxy) -SyncManager.register('Value', Value, ValueProxy) -SyncManager.register('Array', Array, ArrayProxy) -SyncManager.register('Namespace', Namespace, NamespaceProxy) - -# types returned by methods of PoolProxy -SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) -SyncManager.register('AsyncResult', create_method=False) + + +BaseListProxy = MakeProxyType('BaseListProxy', ( + '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', + '__mul__', '__reversed__', '__rmul__', '__setitem__', + 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', + 'reverse', 'sort', '__imul__' + )) +class ListProxy(BaseListProxy): + def __iadd__(self, value): + self._callmethod('extend', (value,)) + return self + def __imul__(self, value): + self._callmethod('__imul__', (value,)) + return self + + +DictProxy = MakeProxyType('DictProxy', ( + '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', + '__setitem__', 'clear', 'copy', 'get', 'items', + 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' + )) +DictProxy._method_to_typeid_ = { + '__iter__': 'Iterator', + } + + +ArrayProxy = MakeProxyType('ArrayProxy', ( + '__len__', '__getitem__', '__setitem__' + )) + + +BasePoolProxy = MakeProxyType('PoolProxy', ( + 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', + 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', + )) +BasePoolProxy._method_to_typeid_ = { + 'apply_async': 'AsyncResult', + 'map_async': 'AsyncResult', + 'starmap_async': 'AsyncResult', + 'imap': 'Iterator', + 'imap_unordered': 'Iterator' + } +class PoolProxy(BasePoolProxy): + def __enter__(self): + return self + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + +# +# Definition of SyncManager +# + +class SyncManager(BaseManager): + ''' + Subclass of `BaseManager` which supports a number of shared object types. + + The types registered are those intended for the synchronization + of threads, plus `dict`, `list` and `Namespace`. + + The `multiprocessing.Manager()` function creates started instances of + this class. + ''' + +SyncManager.register('Queue', queue.Queue) +SyncManager.register('JoinableQueue', queue.Queue) +SyncManager.register('Event', threading.Event, EventProxy) +SyncManager.register('Lock', threading.Lock, AcquirerProxy) +SyncManager.register('RLock', threading.RLock, AcquirerProxy) +SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) +SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, + AcquirerProxy) +SyncManager.register('Condition', threading.Condition, ConditionProxy) +SyncManager.register('Barrier', threading.Barrier, BarrierProxy) +SyncManager.register('Pool', pool.Pool, PoolProxy) +SyncManager.register('list', list, ListProxy) +SyncManager.register('dict', dict, DictProxy) +SyncManager.register('Value', Value, ValueProxy) +SyncManager.register('Array', Array, ArrayProxy) +SyncManager.register('Namespace', Namespace, NamespaceProxy) + +# types returned by methods of PoolProxy +SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) +SyncManager.register('AsyncResult', create_method=False) # # Definition of SharedMemoryManager and SharedMemoryServer |