aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/spread
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/spread
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/spread')
-rw-r--r--contrib/python/Twisted/py3/twisted/spread/__init__.py8
-rw-r--r--contrib/python/Twisted/py3/twisted/spread/banana.py403
-rw-r--r--contrib/python/Twisted/py3/twisted/spread/flavors.py651
-rw-r--r--contrib/python/Twisted/py3/twisted/spread/interfaces.py30
-rw-r--r--contrib/python/Twisted/py3/twisted/spread/jelly.py1092
-rw-r--r--contrib/python/Twisted/py3/twisted/spread/pb.py1674
-rw-r--r--contrib/python/Twisted/py3/twisted/spread/publish.py144
-rw-r--r--contrib/python/Twisted/py3/twisted/spread/util.py217
8 files changed, 4219 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/spread/__init__.py b/contrib/python/Twisted/py3/twisted/spread/__init__.py
new file mode 100644
index 0000000000..ab3881055d
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/spread/__init__.py
@@ -0,0 +1,8 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Twisted Spread: Spreadable (Distributed) Computing.
+
+@author: Glyph Lefkowitz
+"""
diff --git a/contrib/python/Twisted/py3/twisted/spread/banana.py b/contrib/python/Twisted/py3/twisted/spread/banana.py
new file mode 100644
index 0000000000..ee54c2e2a9
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/spread/banana.py
@@ -0,0 +1,403 @@
+# -*- test-case-name: twisted.spread.test.test_banana -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Banana -- s-exp based protocol.
+
+Future Plans: This module is almost entirely stable. The same caveat applies
+to it as applies to L{twisted.spread.jelly}, however. Read its future plans
+for more details.
+
+@author: Glyph Lefkowitz
+"""
+
+
+import copy
+import struct
+from io import BytesIO
+
+from twisted.internet import protocol
+from twisted.persisted import styles
+from twisted.python import log
+from twisted.python.compat import iterbytes
+from twisted.python.reflect import fullyQualifiedName
+
+
+class BananaError(Exception):
+ pass
+
+
+def int2b128(integer, stream):
+ if integer == 0:
+ stream(b"\0")
+ return
+ assert integer > 0, "can only encode positive integers"
+ while integer:
+ stream(bytes((integer & 0x7F,)))
+ integer = integer >> 7
+
+
+def b1282int(st):
+ """
+ Convert an integer represented as a base 128 string into an L{int}.
+
+ @param st: The integer encoded in a byte string.
+ @type st: L{bytes}
+
+ @return: The integer value extracted from the byte string.
+ @rtype: L{int}
+ """
+ e = 1
+ i = 0
+ for char in iterbytes(st):
+ n = ord(char)
+ i += n * e
+ e <<= 7
+ return i
+
+
+# delimiter characters.
+LIST = b"\x80"
+INT = b"\x81"
+STRING = b"\x82"
+NEG = b"\x83"
+FLOAT = b"\x84"
+# "optional" -- these might be refused by a low-level implementation.
+LONGINT = b"\x85"
+LONGNEG = b"\x86"
+# really optional; this is part of the 'pb' vocabulary
+VOCAB = b"\x87"
+
+HIGH_BIT_SET = b"\x80"
+
+
+def setPrefixLimit(limit):
+ """
+ Set the limit on the prefix length for all Banana connections
+ established after this call.
+
+ The prefix length limit determines how many bytes of prefix a banana
+ decoder will allow before rejecting a potential object as too large.
+
+ @type limit: L{int}
+ @param limit: The number of bytes of prefix for banana to allow when
+ decoding.
+ """
+ global _PREFIX_LIMIT
+ _PREFIX_LIMIT = limit
+
+
+_PREFIX_LIMIT = None
+setPrefixLimit(64)
+
+SIZE_LIMIT = 640 * 1024 # 640k is all you'll ever need :-)
+
+
+class Banana(protocol.Protocol, styles.Ephemeral):
+ """
+ L{Banana} implements the I{Banana} s-expression protocol, client and
+ server.
+
+ @ivar knownDialects: These are the profiles supported by this Banana
+ implementation.
+ @type knownDialects: L{list} of L{bytes}
+ """
+
+ # The specification calls these profiles but this implementation calls them
+ # dialects instead.
+ knownDialects = [b"pb", b"none"]
+
+ prefixLimit = None
+ sizeLimit = SIZE_LIMIT
+
+ def setPrefixLimit(self, limit):
+ """
+ Set the prefix limit for decoding done by this protocol instance.
+
+ @see: L{setPrefixLimit}
+ """
+ self.prefixLimit = limit
+ self._smallestLongInt = -(2 ** (limit * 7)) + 1
+ self._smallestInt = -(2**31)
+ self._largestInt = 2**31 - 1
+ self._largestLongInt = 2 ** (limit * 7) - 1
+
+ def connectionReady(self):
+ """Surrogate for connectionMade
+ Called after protocol negotiation.
+ """
+
+ def _selectDialect(self, dialect):
+ self.currentDialect = dialect
+ self.connectionReady()
+
+ def callExpressionReceived(self, obj):
+ if self.currentDialect:
+ self.expressionReceived(obj)
+ else:
+ # this is the first message we've received
+ if self.isClient:
+ # if I'm a client I have to respond
+ for serverVer in obj:
+ if serverVer in self.knownDialects:
+ self.sendEncoded(serverVer)
+ self._selectDialect(serverVer)
+ break
+ else:
+ # I can't speak any of those dialects.
+ log.msg(
+ "The client doesn't speak any of the protocols "
+ "offered by the server: disconnecting."
+ )
+ self.transport.loseConnection()
+ else:
+ if obj in self.knownDialects:
+ self._selectDialect(obj)
+ else:
+ # the client just selected a protocol that I did not suggest.
+ log.msg(
+ "The client selected a protocol the server didn't "
+ "suggest and doesn't know: disconnecting."
+ )
+ self.transport.loseConnection()
+
+ def connectionMade(self):
+ self.setPrefixLimit(_PREFIX_LIMIT)
+ self.currentDialect = None
+ if not self.isClient:
+ self.sendEncoded(self.knownDialects)
+
+ def gotItem(self, item):
+ l = self.listStack
+ if l:
+ l[-1][1].append(item)
+ else:
+ self.callExpressionReceived(item)
+
+ buffer = b""
+
+ def dataReceived(self, chunk):
+ buffer = self.buffer + chunk
+ listStack = self.listStack
+ gotItem = self.gotItem
+ while buffer:
+ assert self.buffer != buffer, "This ain't right: {} {}".format(
+ repr(self.buffer),
+ repr(buffer),
+ )
+ self.buffer = buffer
+ pos = 0
+ for ch in iterbytes(buffer):
+ if ch >= HIGH_BIT_SET:
+ break
+ pos = pos + 1
+ else:
+ if pos > self.prefixLimit:
+ raise BananaError(
+ "Security precaution: more than %d bytes of prefix"
+ % (self.prefixLimit,)
+ )
+ return
+ num = buffer[:pos]
+ typebyte = buffer[pos : pos + 1]
+ rest = buffer[pos + 1 :]
+ if len(num) > self.prefixLimit:
+ raise BananaError(
+ "Security precaution: longer than %d bytes worth of prefix"
+ % (self.prefixLimit,)
+ )
+ if typebyte == LIST:
+ num = b1282int(num)
+ if num > SIZE_LIMIT:
+ raise BananaError("Security precaution: List too long.")
+ listStack.append((num, []))
+ buffer = rest
+ elif typebyte == STRING:
+ num = b1282int(num)
+ if num > SIZE_LIMIT:
+ raise BananaError("Security precaution: String too long.")
+ if len(rest) >= num:
+ buffer = rest[num:]
+ gotItem(rest[:num])
+ else:
+ return
+ elif typebyte == INT:
+ buffer = rest
+ num = b1282int(num)
+ gotItem(num)
+ elif typebyte == LONGINT:
+ buffer = rest
+ num = b1282int(num)
+ gotItem(num)
+ elif typebyte == LONGNEG:
+ buffer = rest
+ num = b1282int(num)
+ gotItem(-num)
+ elif typebyte == NEG:
+ buffer = rest
+ num = -b1282int(num)
+ gotItem(num)
+ elif typebyte == VOCAB:
+ buffer = rest
+ num = b1282int(num)
+ item = self.incomingVocabulary[num]
+ if self.currentDialect == b"pb":
+ # the sender issues VOCAB only for dialect pb
+ gotItem(item)
+ else:
+ raise NotImplementedError(f"Invalid item for pb protocol {item!r}")
+ elif typebyte == FLOAT:
+ if len(rest) >= 8:
+ buffer = rest[8:]
+ gotItem(struct.unpack("!d", rest[:8])[0])
+ else:
+ return
+ else:
+ raise NotImplementedError(f"Invalid Type Byte {typebyte!r}")
+ while listStack and (len(listStack[-1][1]) == listStack[-1][0]):
+ item = listStack.pop()[1]
+ gotItem(item)
+ self.buffer = b""
+
+ def expressionReceived(self, lst):
+ """Called when an expression (list, string, or int) is received."""
+ raise NotImplementedError()
+
+ outgoingVocabulary = {
+ # Jelly Data Types
+ b"None": 1,
+ b"class": 2,
+ b"dereference": 3,
+ b"reference": 4,
+ b"dictionary": 5,
+ b"function": 6,
+ b"instance": 7,
+ b"list": 8,
+ b"module": 9,
+ b"persistent": 10,
+ b"tuple": 11,
+ b"unpersistable": 12,
+ # PB Data Types
+ b"copy": 13,
+ b"cache": 14,
+ b"cached": 15,
+ b"remote": 16,
+ b"local": 17,
+ b"lcache": 18,
+ # PB Protocol Messages
+ b"version": 19,
+ b"login": 20,
+ b"password": 21,
+ b"challenge": 22,
+ b"logged_in": 23,
+ b"not_logged_in": 24,
+ b"cachemessage": 25,
+ b"message": 26,
+ b"answer": 27,
+ b"error": 28,
+ b"decref": 29,
+ b"decache": 30,
+ b"uncache": 31,
+ }
+
+ incomingVocabulary = {}
+ for k, v in outgoingVocabulary.items():
+ incomingVocabulary[v] = k
+
+ def __init__(self, isClient=1):
+ self.listStack = []
+ self.outgoingSymbols = copy.copy(self.outgoingVocabulary)
+ self.outgoingSymbolCount = 0
+ self.isClient = isClient
+
+ def sendEncoded(self, obj):
+ """
+ Send the encoded representation of the given object:
+
+ @param obj: An object to encode and send.
+
+ @raise BananaError: If the given object is not an instance of one of
+ the types supported by Banana.
+
+ @return: L{None}
+ """
+ encodeStream = BytesIO()
+ self._encode(obj, encodeStream.write)
+ value = encodeStream.getvalue()
+ self.transport.write(value)
+
+ def _encode(self, obj, write):
+ if isinstance(obj, (list, tuple)):
+ if len(obj) > SIZE_LIMIT:
+ raise BananaError("list/tuple is too long to send (%d)" % (len(obj),))
+ int2b128(len(obj), write)
+ write(LIST)
+ for elem in obj:
+ self._encode(elem, write)
+ elif isinstance(obj, int):
+ if obj < self._smallestLongInt or obj > self._largestLongInt:
+ raise BananaError("int is too large to send (%d)" % (obj,))
+ if obj < self._smallestInt:
+ int2b128(-obj, write)
+ write(LONGNEG)
+ elif obj < 0:
+ int2b128(-obj, write)
+ write(NEG)
+ elif obj <= self._largestInt:
+ int2b128(obj, write)
+ write(INT)
+ else:
+ int2b128(obj, write)
+ write(LONGINT)
+ elif isinstance(obj, float):
+ write(FLOAT)
+ write(struct.pack("!d", obj))
+ elif isinstance(obj, bytes):
+ # TODO: an API for extending banana...
+ if self.currentDialect == b"pb" and obj in self.outgoingSymbols:
+ symbolID = self.outgoingSymbols[obj]
+ int2b128(symbolID, write)
+ write(VOCAB)
+ else:
+ if len(obj) > SIZE_LIMIT:
+ raise BananaError(
+ "byte string is too long to send (%d)" % (len(obj),)
+ )
+ int2b128(len(obj), write)
+ write(STRING)
+ write(obj)
+ else:
+ raise BananaError(
+ "Banana cannot send {} objects: {!r}".format(
+ fullyQualifiedName(type(obj)), obj
+ )
+ )
+
+
+# For use from the interactive interpreter
+_i = Banana()
+_i.connectionMade()
+_i._selectDialect(b"none")
+
+
+def encode(lst):
+ """Encode a list s-expression."""
+ encodeStream = BytesIO()
+ _i.transport = encodeStream
+ _i.sendEncoded(lst)
+ return encodeStream.getvalue()
+
+
+def decode(st):
+ """
+ Decode a banana-encoded string.
+ """
+ l = []
+ _i.expressionReceived = l.append
+ try:
+ _i.dataReceived(st)
+ finally:
+ _i.buffer = b""
+ del _i.expressionReceived
+ return l[0]
diff --git a/contrib/python/Twisted/py3/twisted/spread/flavors.py b/contrib/python/Twisted/py3/twisted/spread/flavors.py
new file mode 100644
index 0000000000..ef98fee272
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/spread/flavors.py
@@ -0,0 +1,651 @@
+# -*- test-case-name: twisted.spread.test.test_pb -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+This module represents flavors of remotely accessible objects.
+
+Currently this is only objects accessible through Perspective Broker, but will
+hopefully encompass all forms of remote access which can emulate subsets of PB
+(such as XMLRPC or SOAP).
+
+Future Plans: Optimization. Exploitation of new-style object model.
+Optimizations to this module should not affect external-use semantics at all,
+but may have a small impact on users who subclass and override methods.
+
+@author: Glyph Lefkowitz
+"""
+
+
+# NOTE: this module should NOT import pb; it is supposed to be a module which
+# abstractly defines remotely accessible types. Many of these types expect to
+# be serialized by Jelly, but they ought to be accessible through other
+# mechanisms (like XMLRPC)
+
+import sys
+
+from zope.interface import Interface, implementer
+
+from twisted.python import log, reflect
+from twisted.python.compat import cmp, comparable
+from .jelly import (
+ Jellyable,
+ Unjellyable,
+ _createBlank,
+ getInstanceState,
+ setInstanceState,
+ setUnjellyableFactoryForClass,
+ setUnjellyableForClass,
+ setUnjellyableForClassTree,
+ unjellyableRegistry,
+)
+
+# compatibility
+setCopierForClass = setUnjellyableForClass
+setCopierForClassTree = setUnjellyableForClassTree
+setFactoryForClass = setUnjellyableFactoryForClass
+copyTags = unjellyableRegistry
+
+copy_atom = b"copy"
+cache_atom = b"cache"
+cached_atom = b"cached"
+remote_atom = b"remote"
+
+
+class NoSuchMethod(AttributeError):
+ """Raised if there is no such remote method"""
+
+
+class IPBRoot(Interface):
+ """Factory for root Referenceable objects for PB servers."""
+
+ def rootObject(broker):
+ """Return root Referenceable for broker."""
+
+
+class Serializable(Jellyable):
+ """An object that can be passed remotely.
+
+ I am a style of object which can be serialized by Perspective
+ Broker. Objects which wish to be referenceable or copied remotely
+ have to subclass Serializable. However, clients of Perspective
+ Broker will probably not want to directly subclass Serializable; the
+ Flavors of transferable objects are listed below.
+
+ What it means to be \"Serializable\" is that an object can be
+ passed to or returned from a remote method. Certain basic types
+ (dictionaries, lists, tuples, numbers, strings) are serializable by
+ default; however, classes need to choose a specific serialization
+ style: L{Referenceable}, L{Viewable}, L{Copyable} or L{Cacheable}.
+
+ You may also pass C{[lists, dictionaries, tuples]} of L{Serializable}
+ instances to or return them from remote methods, as many levels deep
+ as you like.
+ """
+
+ def processUniqueID(self):
+ """Return an ID which uniquely represents this object for this process.
+
+ By default, this uses the 'id' builtin, but can be overridden to
+ indicate that two values are identity-equivalent (such as proxies
+ for the same object).
+ """
+
+ return id(self)
+
+
+class Referenceable(Serializable):
+ perspective = None
+ """I am an object sent remotely as a direct reference.
+
+ When one of my subclasses is sent as an argument to or returned
+ from a remote method call, I will be serialized by default as a
+ direct reference.
+
+ This means that the peer will be able to call methods on me;
+ a method call xxx() from my peer will be resolved to methods
+ of the name remote_xxx.
+ """
+
+ def remoteMessageReceived(self, broker, message, args, kw):
+ """A remote message has been received. Dispatch it appropriately.
+
+ The default implementation is to dispatch to a method called
+ 'remote_messagename' and call it with the same arguments.
+ """
+ args = broker.unserialize(args)
+ kw = broker.unserialize(kw)
+ # Need this to interoperate with Python 2 clients
+ # which may try to send use keywords where keys are of type
+ # bytes.
+ if [key for key in kw.keys() if isinstance(key, bytes)]:
+ kw = {k.decode("utf8"): v for k, v in kw.items()}
+
+ if not isinstance(message, str):
+ message = message.decode("utf8")
+
+ method = getattr(self, "remote_%s" % message, None)
+ if method is None:
+ raise NoSuchMethod(f"No such method: remote_{message}")
+ try:
+ state = method(*args, **kw)
+ except TypeError:
+ log.msg(f"{method} didn't accept {args} and {kw}")
+ raise
+ return broker.serialize(state, self.perspective)
+
+ def jellyFor(self, jellier):
+ """(internal)
+
+ Return a tuple which will be used as the s-expression to
+ serialize this to a peer.
+ """
+
+ return [b"remote", jellier.invoker.registerReference(self)]
+
+
+@implementer(IPBRoot)
+class Root(Referenceable):
+ """I provide a root object to L{pb.Broker}s for a L{pb.PBClientFactory} or
+ L{pb.PBServerFactory}.
+
+ When a factory produces a L{pb.Broker}, it supplies that
+ L{pb.Broker} with an object named \"root\". That object is obtained
+ by calling my rootObject method.
+ """
+
+ def rootObject(self, broker):
+ """A factory is requesting to publish me as a root object.
+
+ When a factory is sending me as the root object, this
+ method will be invoked to allow per-broker versions of an
+ object. By default I return myself.
+ """
+ return self
+
+
+class ViewPoint(Referenceable):
+ """
+ I act as an indirect reference to an object accessed through a
+ L{pb.IPerspective}.
+
+ Simply put, I combine an object with a perspective so that when a
+ peer calls methods on the object I refer to, the method will be
+ invoked with that perspective as a first argument, so that it can
+ know who is calling it.
+
+ While L{Viewable} objects will be converted to ViewPoints by default
+ when they are returned from or sent as arguments to a remote
+ method, any object may be manually proxied as well. (XXX: Now that
+ this class is no longer named C{Proxy}, this is the only occurrence
+ of the term 'proxied' in this docstring, and may be unclear.)
+
+ This can be useful when dealing with L{pb.IPerspective}s, L{Copyable}s,
+ and L{Cacheable}s. It is legal to implement a method as such on
+ a perspective::
+
+ | def perspective_getViewPointForOther(self, name):
+ | defr = self.service.getPerspectiveRequest(name)
+ | defr.addCallbacks(lambda x, self=self: ViewPoint(self, x), log.msg)
+ | return defr
+
+ This will allow you to have references to Perspective objects in two
+ different ways. One is through the initial 'attach' call -- each
+ peer will have a L{pb.RemoteReference} to their perspective directly. The
+ other is through this method; each peer can get a L{pb.RemoteReference} to
+ all other perspectives in the service; but that L{pb.RemoteReference} will
+ be to a L{ViewPoint}, not directly to the object.
+
+ The practical offshoot of this is that you can implement 2 varieties
+ of remotely callable methods on this Perspective; view_xxx and
+ C{perspective_xxx}. C{view_xxx} methods will follow the rules for
+ ViewPoint methods (see ViewPoint.L{remoteMessageReceived}), and
+ C{perspective_xxx} methods will follow the rules for Perspective
+ methods.
+ """
+
+ def __init__(self, perspective, object):
+ """Initialize me with a Perspective and an Object."""
+ self.perspective = perspective
+ self.object = object
+
+ def processUniqueID(self):
+ """Return an ID unique to a proxy for this perspective+object combination."""
+ return (id(self.perspective), id(self.object))
+
+ def remoteMessageReceived(self, broker, message, args, kw):
+ """A remote message has been received. Dispatch it appropriately.
+
+ The default implementation is to dispatch to a method called
+ 'C{view_messagename}' to my Object and call it on my object with
+ the same arguments, modified by inserting my Perspective as
+ the first argument.
+ """
+ args = broker.unserialize(args, self.perspective)
+ kw = broker.unserialize(kw, self.perspective)
+
+ if not isinstance(message, str):
+ message = message.decode("utf8")
+
+ method = getattr(self.object, "view_%s" % message)
+ try:
+ state = method(*(self.perspective,) + args, **kw)
+ except TypeError:
+ log.msg(f"{method} didn't accept {args} and {kw}")
+ raise
+ rv = broker.serialize(state, self.perspective, method, args, kw)
+ return rv
+
+
+class Viewable(Serializable):
+ """I will be converted to a L{ViewPoint} when passed to or returned from a remote method.
+
+ The beginning of a peer's interaction with a PB Service is always
+ through a perspective. However, if a C{perspective_xxx} method returns
+ a Viewable, it will be serialized to the peer as a response to that
+ method.
+ """
+
+ def jellyFor(self, jellier):
+ """Serialize a L{ViewPoint} for me and the perspective of the given broker."""
+ return ViewPoint(jellier.invoker.serializingPerspective, self).jellyFor(jellier)
+
+
+class Copyable(Serializable):
+ """Subclass me to get copied each time you are returned from or passed to a remote method.
+
+ When I am returned from or passed to a remote method call, I will be
+ converted into data via a set of callbacks (see my methods for more
+ info). That data will then be serialized using Jelly, and sent to
+ the peer.
+
+ The peer will then look up the type to represent this with; see
+ L{RemoteCopy} for details.
+ """
+
+ def getStateToCopy(self):
+ """Gather state to send when I am serialized for a peer.
+
+ I will default to returning self.__dict__. Override this to
+ customize this behavior.
+ """
+
+ return self.__dict__
+
+ def getStateToCopyFor(self, perspective):
+ """
+ Gather state to send when I am serialized for a particular
+ perspective.
+
+ I will default to calling L{getStateToCopy}. Override this to
+ customize this behavior.
+ """
+
+ return self.getStateToCopy()
+
+ def getTypeToCopy(self):
+ """Determine what type tag to send for me.
+
+ By default, send the string representation of my class
+ (package.module.Class); normally this is adequate, but
+ you may override this to change it.
+ """
+
+ return reflect.qual(self.__class__).encode("utf-8")
+
+ def getTypeToCopyFor(self, perspective):
+ """Determine what type tag to send for me.
+
+ By default, defer to self.L{getTypeToCopy}() normally this is
+ adequate, but you may override this to change it.
+ """
+
+ return self.getTypeToCopy()
+
+ def jellyFor(self, jellier):
+ """Assemble type tag and state to copy for this broker.
+
+ This will call L{getTypeToCopyFor} and L{getStateToCopy}, and
+ return an appropriate s-expression to represent me.
+ """
+
+ if jellier.invoker is None:
+ return getInstanceState(self, jellier)
+ p = jellier.invoker.serializingPerspective
+ t = self.getTypeToCopyFor(p)
+ state = self.getStateToCopyFor(p)
+ sxp = jellier.prepare(self)
+ sxp.extend([t, jellier.jelly(state)])
+ return jellier.preserve(self, sxp)
+
+
+class Cacheable(Copyable):
+ """A cached instance.
+
+ This means that it's copied; but there is some logic to make sure
+ that it's only copied once. Additionally, when state is retrieved,
+ it is passed a "proto-reference" to the state as it will exist on
+ the client.
+
+ XXX: The documentation for this class needs work, but it's the most
+ complex part of PB and it is inherently difficult to explain.
+ """
+
+ def getStateToCacheAndObserveFor(self, perspective, observer):
+ """
+ Get state to cache on the client and client-cache reference
+ to observe locally.
+
+ This is similar to getStateToCopyFor, but it additionally
+ passes in a reference to the client-side RemoteCache instance
+ that will be created when it is unserialized. This allows
+ Cacheable instances to keep their RemoteCaches up to date when
+ they change, such that no changes can occur between the point
+ at which the state is initially copied and the client receives
+ it that are not propagated.
+ """
+
+ return self.getStateToCopyFor(perspective)
+
+ def jellyFor(self, jellier):
+ """Return an appropriate tuple to serialize me.
+
+ Depending on whether this broker has cached me or not, this may
+ return either a full state or a reference to an existing cache.
+ """
+ if jellier.invoker is None:
+ return getInstanceState(self, jellier)
+ luid = jellier.invoker.cachedRemotelyAs(self, 1)
+ if luid is None:
+ luid = jellier.invoker.cacheRemotely(self)
+ p = jellier.invoker.serializingPerspective
+ type_ = self.getTypeToCopyFor(p)
+ observer = RemoteCacheObserver(jellier.invoker, self, p)
+ state = self.getStateToCacheAndObserveFor(p, observer)
+ l = jellier.prepare(self)
+ jstate = jellier.jelly(state)
+ l.extend([type_, luid, jstate])
+ return jellier.preserve(self, l)
+ else:
+ return cached_atom, luid
+
+ def stoppedObserving(self, perspective, observer):
+ """This method is called when a client has stopped observing me.
+
+ The 'observer' argument is the same as that passed in to
+ getStateToCacheAndObserveFor.
+ """
+
+
+class RemoteCopy(Unjellyable):
+ """I am a remote copy of a Copyable object.
+
+ When the state from a L{Copyable} object is received, an instance will
+ be created based on the copy tags table (see setUnjellyableForClass) and
+ sent the L{setCopyableState} message. I provide a reasonable default
+ implementation of that message; subclass me if you wish to serve as
+ a copier for remote data.
+
+ NOTE: copiers are invoked with no arguments. Do not implement a
+ constructor which requires args in a subclass of L{RemoteCopy}!
+ """
+
+ def setCopyableState(self, state):
+ """I will be invoked with the state to copy locally.
+
+ 'state' is the data returned from the remote object's
+ 'getStateToCopyFor' method, which will often be the remote
+ object's dictionary (or a filtered approximation of it depending
+ on my peer's perspective).
+ """
+ if not state:
+ state = {}
+ state = {
+ x.decode("utf8") if isinstance(x, bytes) else x: y for x, y in state.items()
+ }
+ self.__dict__ = state
+
+ def unjellyFor(self, unjellier, jellyList):
+ if unjellier.invoker is None:
+ return setInstanceState(self, unjellier, jellyList)
+ self.setCopyableState(unjellier.unjelly(jellyList[1]))
+ return self
+
+
+class RemoteCache(RemoteCopy, Serializable):
+ """A cache is a local representation of a remote L{Cacheable} object.
+
+ This represents the last known state of this object. It may
+ also have methods invoked on it -- in order to update caches,
+ the cached class generates a L{pb.RemoteReference} to this object as
+ it is originally sent.
+
+ Much like copy, I will be invoked with no arguments. Do not
+ implement a constructor that requires arguments in one of my
+ subclasses.
+ """
+
+ def remoteMessageReceived(self, broker, message, args, kw):
+ """A remote message has been received. Dispatch it appropriately.
+
+ The default implementation is to dispatch to a method called
+ 'C{observe_messagename}' and call it on my with the same arguments.
+ """
+ if not isinstance(message, str):
+ message = message.decode("utf8")
+
+ args = broker.unserialize(args)
+ kw = broker.unserialize(kw)
+ method = getattr(self, "observe_%s" % message)
+ try:
+ state = method(*args, **kw)
+ except TypeError:
+ log.msg(f"{method} didn't accept {args} and {kw}")
+ raise
+ return broker.serialize(state, None, method, args, kw)
+
+ def jellyFor(self, jellier):
+ """serialize me (only for the broker I'm for) as the original cached reference"""
+ if jellier.invoker is None:
+ return getInstanceState(self, jellier)
+ assert (
+ jellier.invoker is self.broker
+ ), "You cannot exchange cached proxies between brokers."
+ return b"lcache", self.luid
+
+ def unjellyFor(self, unjellier, jellyList):
+ if unjellier.invoker is None:
+ return setInstanceState(self, unjellier, jellyList)
+ self.broker = unjellier.invoker
+ self.luid = jellyList[1]
+ borgCopy = self._borgify()
+ # XXX questionable whether this was a good design idea...
+ init = getattr(borgCopy, "__init__", None)
+ if init:
+ init()
+ unjellier.invoker.cacheLocally(jellyList[1], self)
+ borgCopy.setCopyableState(unjellier.unjelly(jellyList[2]))
+ # Might have changed due to setCopyableState method; we'll assume that
+ # it's bad form to do so afterwards.
+ self.__dict__ = borgCopy.__dict__
+ # chomp, chomp -- some existing code uses "self.__dict__ =", some uses
+ # "__dict__.update". This is here in order to handle both cases.
+ self.broker = unjellier.invoker
+ self.luid = jellyList[1]
+ return borgCopy
+
+ ## def __really_del__(self):
+ ## """Final finalization call, made after all remote references have been lost.
+ ## """
+
+ def __cmp__(self, other):
+ """Compare me [to another RemoteCache."""
+ if isinstance(other, self.__class__):
+ return cmp(id(self.__dict__), id(other.__dict__))
+ else:
+ return cmp(id(self.__dict__), other)
+
+ def __hash__(self):
+ """Hash me."""
+ return int(id(self.__dict__) % sys.maxsize)
+
+ broker = None
+ luid = None
+
+ def __del__(self):
+ """Do distributed reference counting on finalize."""
+ try:
+ # log.msg( ' --- decache: %s %s' % (self, self.luid) )
+ if self.broker:
+ self.broker.decCacheRef(self.luid)
+ except BaseException:
+ log.deferr()
+
+ def _borgify(self):
+ """
+ Create a new object that shares its state (i.e. its C{__dict__}) and
+ type with this object, but does not share its identity.
+
+ This is an instance of U{the Borg design pattern
+ <https://code.activestate.com/recipes/66531/>} originally described by
+ Alex Martelli, but unlike the example given there, this is not a
+ replacement for a Singleton. Instead, it is for lifecycle tracking
+ (and distributed garbage collection). The purpose of these separate
+ objects is to have a separate object tracking each application-level
+ reference to the root L{RemoteCache} object being tracked by the
+ broker, and to have their C{__del__} methods be invoked.
+
+ This may be achievable via a weak value dictionary to track the root
+ L{RemoteCache} instances instead, but this implementation strategy
+ predates the availability of weak references in Python.
+
+ @return: The new instance.
+ @rtype: C{self.__class__}
+ """
+ blank = _createBlank(self.__class__)
+ blank.__dict__ = self.__dict__
+ return blank
+
+
+def unjellyCached(unjellier, unjellyList):
+ luid = unjellyList[1]
+ return unjellier.invoker.cachedLocallyAs(luid)._borgify()
+
+
+setUnjellyableForClass("cached", unjellyCached)
+
+
+def unjellyLCache(unjellier, unjellyList):
+ luid = unjellyList[1]
+ obj = unjellier.invoker.remotelyCachedForLUID(luid)
+ return obj
+
+
+setUnjellyableForClass("lcache", unjellyLCache)
+
+
+def unjellyLocal(unjellier, unjellyList):
+ obj = unjellier.invoker.localObjectForID(unjellyList[1])
+ return obj
+
+
+setUnjellyableForClass("local", unjellyLocal)
+
+
+@comparable
+class RemoteCacheMethod:
+ """A method on a reference to a L{RemoteCache}."""
+
+ def __init__(self, name, broker, cached, perspective):
+ """(internal) initialize."""
+ self.name = name
+ self.broker = broker
+ self.perspective = perspective
+ self.cached = cached
+
+ def __cmp__(self, other):
+ return cmp((self.name, self.broker, self.perspective, self.cached), other)
+
+ def __hash__(self):
+ return hash((self.name, self.broker, self.perspective, self.cached))
+
+ def __call__(self, *args, **kw):
+ """(internal) action method."""
+ cacheID = self.broker.cachedRemotelyAs(self.cached)
+ if cacheID is None:
+ from pb import ProtocolError # type: ignore[import]
+
+ raise ProtocolError(
+ "You can't call a cached method when the object hasn't been given to the peer yet."
+ )
+ return self.broker._sendMessage(
+ b"cache", self.perspective, cacheID, self.name, args, kw
+ )
+
+
+@comparable
+class RemoteCacheObserver:
+ """I am a reverse-reference to the peer's L{RemoteCache}.
+
+ I am generated automatically when a cache is serialized. I
+ represent a reference to the client's L{RemoteCache} object that
+ will represent a particular L{Cacheable}; I am the additional
+ object passed to getStateToCacheAndObserveFor.
+ """
+
+ def __init__(self, broker, cached, perspective):
+ """(internal) Initialize me.
+
+ @param broker: a L{pb.Broker} instance.
+
+ @param cached: a L{Cacheable} instance that this L{RemoteCacheObserver}
+ corresponds to.
+
+ @param perspective: a reference to the perspective who is observing this.
+ """
+
+ self.broker = broker
+ self.cached = cached
+ self.perspective = perspective
+
+ def __repr__(self) -> str:
+ return "<RemoteCacheObserver({}, {}, {}) at {}>".format(
+ self.broker,
+ self.cached,
+ self.perspective,
+ id(self),
+ )
+
+ def __hash__(self):
+ """Generate a hash unique to all L{RemoteCacheObserver}s for this broker/perspective/cached triplet"""
+
+ return (
+ (hash(self.broker) % 2**10)
+ + (hash(self.perspective) % 2**10)
+ + (hash(self.cached) % 2**10)
+ )
+
+ def __cmp__(self, other):
+ """Compare me to another L{RemoteCacheObserver}."""
+
+ return cmp((self.broker, self.perspective, self.cached), other)
+
+ def callRemote(self, _name, *args, **kw):
+ """(internal) action method."""
+ cacheID = self.broker.cachedRemotelyAs(self.cached)
+ if isinstance(_name, str):
+ _name = _name.encode("utf-8")
+ if cacheID is None:
+ from pb import ProtocolError
+
+ raise ProtocolError(
+ "You can't call a cached method when the "
+ "object hasn't been given to the peer yet."
+ )
+ return self.broker._sendMessage(
+ b"cache", self.perspective, cacheID, _name, args, kw
+ )
+
+ def remoteMethod(self, key):
+ """Get a L{pb.RemoteMethod} for this key."""
+ return RemoteCacheMethod(key, self.broker, self.cached, self.perspective)
diff --git a/contrib/python/Twisted/py3/twisted/spread/interfaces.py b/contrib/python/Twisted/py3/twisted/spread/interfaces.py
new file mode 100644
index 0000000000..4b8b28935b
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/spread/interfaces.py
@@ -0,0 +1,30 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Twisted Spread Interfaces.
+"""
+
+from zope.interface import Interface
+
+
+class IJellyable(Interface):
+ def jellyFor(jellier):
+ """
+ Jelly myself for jellier.
+ """
+
+
+class IUnjellyable(Interface):
+ def unjellyFor(jellier, jellyList):
+ """
+ Unjelly myself for the jellier.
+
+ @param jellier: A stateful object which exists for the lifetime of a
+ single call to L{unjelly}.
+
+ @param jellyList: The C{list} which represents the jellied state of the
+ object to be unjellied.
+
+ @return: The object which results from unjellying.
+ """
diff --git a/contrib/python/Twisted/py3/twisted/spread/jelly.py b/contrib/python/Twisted/py3/twisted/spread/jelly.py
new file mode 100644
index 0000000000..46cda17844
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/spread/jelly.py
@@ -0,0 +1,1092 @@
+# -*- test-case-name: twisted.spread.test.test_jelly -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+S-expression-based persistence of python objects.
+
+It does something very much like L{Pickle<pickle>}; however, pickle's main goal
+seems to be efficiency (both in space and time); jelly's main goals are
+security, human readability, and portability to other environments.
+
+This is how Jelly converts various objects to s-expressions.
+
+Boolean::
+ True --> ['boolean', 'true']
+
+Integer::
+ 1 --> 1
+
+List::
+ [1, 2] --> ['list', 1, 2]
+
+String::
+ \"hello\" --> \"hello\"
+
+Float::
+ 2.3 --> 2.3
+
+Dictionary::
+ {'a': 1, 'b': 'c'} --> ['dictionary', ['b', 'c'], ['a', 1]]
+
+Module::
+ UserString --> ['module', 'UserString']
+
+Class::
+ UserString.UserString --> ['class', ['module', 'UserString'], 'UserString']
+
+Function::
+ string.join --> ['function', 'join', ['module', 'string']]
+
+Instance: s is an instance of UserString.UserString, with a __dict__
+{'data': 'hello'}::
+ [\"UserString.UserString\", ['dictionary', ['data', 'hello']]]
+
+Class Method: UserString.UserString.center::
+ ['method', 'center', ['None'], ['class', ['module', 'UserString'],
+ 'UserString']]
+
+Instance Method: s.center, where s is an instance of UserString.UserString::
+ ['method', 'center', ['instance', ['reference', 1, ['class',
+ ['module', 'UserString'], 'UserString']], ['dictionary', ['data', 'd']]],
+ ['dereference', 1]]
+
+The Python 2.x C{sets.Set} and C{sets.ImmutableSet} classes are
+serialized to the same thing as the builtin C{set} and C{frozenset}
+classes. (This is only relevant if you are communicating with a
+version of jelly running on an older version of Python.)
+
+@author: Glyph Lefkowitz
+
+"""
+
+import copy
+import datetime
+import decimal
+
+# System Imports
+import types
+import warnings
+from functools import reduce
+
+from zope.interface import implementer
+
+from incremental import Version
+
+from twisted.persisted.crefutil import (
+ NotKnown,
+ _Container,
+ _Dereference,
+ _DictKeyAndValue,
+ _InstanceMethod,
+ _Tuple,
+)
+
+# Twisted Imports
+from twisted.python.compat import nativeString
+from twisted.python.deprecate import deprecatedModuleAttribute
+from twisted.python.reflect import namedAny, namedObject, qual
+from twisted.spread.interfaces import IJellyable, IUnjellyable
+
+DictTypes = (dict,)
+
+None_atom = b"None" # N
+# code
+class_atom = b"class" # c
+module_atom = b"module" # m
+function_atom = b"function" # f
+
+# references
+dereference_atom = b"dereference" # D
+persistent_atom = b"persistent" # p
+reference_atom = b"reference" # r
+
+# mutable collections
+dictionary_atom = b"dictionary" # d
+list_atom = b"list" # l
+set_atom = b"set"
+
+# immutable collections
+# (assignment to __dict__ and __class__ still might go away!)
+tuple_atom = b"tuple" # t
+instance_atom = b"instance" # i
+frozenset_atom = b"frozenset"
+
+
+deprecatedModuleAttribute(
+ Version("Twisted", 15, 0, 0),
+ "instance_atom is unused within Twisted.",
+ "twisted.spread.jelly",
+ "instance_atom",
+)
+
+# errors
+unpersistable_atom = b"unpersistable" # u
+unjellyableRegistry = {}
+unjellyableFactoryRegistry = {}
+
+
+def _createBlank(cls):
+ """
+ Given an object, if that object is a type, return a new, blank instance
+ of that type which has not had C{__init__} called on it. If the object
+ is not a type, return L{None}.
+
+ @param cls: The type (or class) to create an instance of.
+ @type cls: L{type} or something else that cannot be
+ instantiated.
+
+ @return: a new blank instance or L{None} if C{cls} is not a class or type.
+ """
+ if isinstance(cls, type):
+ return cls.__new__(cls)
+
+
+def _newInstance(cls, state):
+ """
+ Make a new instance of a class without calling its __init__ method.
+
+ @param state: A C{dict} used to update C{inst.__dict__} either directly or
+ via C{__setstate__}, if available.
+
+ @return: A new instance of C{cls}.
+ """
+ instance = _createBlank(cls)
+
+ def defaultSetter(state):
+ if isinstance(state, dict):
+ instance.__dict__ = state or {}
+
+ setter = getattr(instance, "__setstate__", defaultSetter)
+ setter(state)
+ return instance
+
+
+def _maybeClass(classnamep):
+ isObject = isinstance(classnamep, type)
+
+ if isObject:
+ classnamep = qual(classnamep)
+
+ if not isinstance(classnamep, bytes):
+ classnamep = classnamep.encode("utf-8")
+
+ return classnamep
+
+
+def setUnjellyableForClass(classname, unjellyable):
+ """
+ Set which local class will represent a remote type.
+
+ If you have written a Copyable class that you expect your client to be
+ receiving, write a local "copy" class to represent it, then call::
+
+ jellier.setUnjellyableForClass('module.package.Class', MyCopier).
+
+ Call this at the module level immediately after its class
+ definition. MyCopier should be a subclass of RemoteCopy.
+
+ The classname may be a special tag returned by
+ 'Copyable.getTypeToCopyFor' rather than an actual classname.
+
+ This call is also for cached classes, since there will be no
+ overlap. The rules are the same.
+ """
+
+ global unjellyableRegistry
+ classname = _maybeClass(classname)
+ unjellyableRegistry[classname] = unjellyable
+ globalSecurity.allowTypes(classname)
+
+
+def setUnjellyableFactoryForClass(classname, copyFactory):
+ """
+ Set the factory to construct a remote instance of a type::
+
+ jellier.setUnjellyableFactoryForClass('module.package.Class', MyFactory)
+
+ Call this at the module level immediately after its class definition.
+ C{copyFactory} should return an instance or subclass of
+ L{RemoteCopy<pb.RemoteCopy>}.
+
+ Similar to L{setUnjellyableForClass} except it uses a factory instead
+ of creating an instance.
+ """
+
+ global unjellyableFactoryRegistry
+ classname = _maybeClass(classname)
+ unjellyableFactoryRegistry[classname] = copyFactory
+ globalSecurity.allowTypes(classname)
+
+
+def setUnjellyableForClassTree(module, baseClass, prefix=None):
+ """
+ Set all classes in a module derived from C{baseClass} as copiers for
+ a corresponding remote class.
+
+ When you have a hierarchy of Copyable (or Cacheable) classes on one
+ side, and a mirror structure of Copied (or RemoteCache) classes on the
+ other, use this to setUnjellyableForClass all your Copieds for the
+ Copyables.
+
+ Each copyTag (the \"classname\" argument to getTypeToCopyFor, and
+ what the Copyable's getTypeToCopyFor returns) is formed from
+ adding a prefix to the Copied's class name. The prefix defaults
+ to module.__name__. If you wish the copy tag to consist of solely
+ the classname, pass the empty string \'\'.
+
+ @param module: a module object from which to pull the Copied classes.
+ (passing sys.modules[__name__] might be useful)
+
+ @param baseClass: the base class from which all your Copied classes derive.
+
+ @param prefix: the string prefixed to classnames to form the
+ unjellyableRegistry.
+ """
+ if prefix is None:
+ prefix = module.__name__
+
+ if prefix:
+ prefix = "%s." % prefix
+
+ for name in dir(module):
+ loaded = getattr(module, name)
+ try:
+ yes = issubclass(loaded, baseClass)
+ except TypeError:
+ "It's not a class."
+ else:
+ if yes:
+ setUnjellyableForClass(f"{prefix}{name}", loaded)
+
+
+def getInstanceState(inst, jellier):
+ """
+ Utility method to default to 'normal' state rules in serialization.
+ """
+ if hasattr(inst, "__getstate__"):
+ state = inst.__getstate__()
+ else:
+ state = inst.__dict__
+ sxp = jellier.prepare(inst)
+ sxp.extend([qual(inst.__class__).encode("utf-8"), jellier.jelly(state)])
+ return jellier.preserve(inst, sxp)
+
+
+def setInstanceState(inst, unjellier, jellyList):
+ """
+ Utility method to default to 'normal' state rules in unserialization.
+ """
+ state = unjellier.unjelly(jellyList[1])
+ if hasattr(inst, "__setstate__"):
+ inst.__setstate__(state)
+ else:
+ inst.__dict__ = state
+ return inst
+
+
+class Unpersistable:
+ """
+ This is an instance of a class that comes back when something couldn't be
+ unpersisted.
+ """
+
+ def __init__(self, reason):
+ """
+ Initialize an unpersistable object with a descriptive C{reason} string.
+ """
+ self.reason = reason
+
+ def __repr__(self) -> str:
+ return "Unpersistable(%s)" % repr(self.reason)
+
+
+@implementer(IJellyable)
+class Jellyable:
+ """
+ Inherit from me to Jelly yourself directly with the `getStateFor'
+ convenience method.
+ """
+
+ def getStateFor(self, jellier):
+ return self.__dict__
+
+ def jellyFor(self, jellier):
+ """
+ @see: L{twisted.spread.interfaces.IJellyable.jellyFor}
+ """
+ sxp = jellier.prepare(self)
+ sxp.extend(
+ [
+ qual(self.__class__).encode("utf-8"),
+ jellier.jelly(self.getStateFor(jellier)),
+ ]
+ )
+ return jellier.preserve(self, sxp)
+
+
+@implementer(IUnjellyable)
+class Unjellyable:
+ """
+ Inherit from me to Unjelly yourself directly with the
+ C{setStateFor} convenience method.
+ """
+
+ def setStateFor(self, unjellier, state):
+ self.__dict__ = state
+
+ def unjellyFor(self, unjellier, jellyList):
+ """
+ Perform the inverse operation of L{Jellyable.jellyFor}.
+
+ @see: L{twisted.spread.interfaces.IUnjellyable.unjellyFor}
+ """
+ state = unjellier.unjelly(jellyList[1])
+ self.setStateFor(unjellier, state)
+ return self
+
+
+class _Jellier:
+ """
+ (Internal) This class manages state for a call to jelly()
+ """
+
+ def __init__(self, taster, persistentStore, invoker):
+ """
+ Initialize.
+ """
+ self.taster = taster
+ # `preserved' is a dict of previously seen instances.
+ self.preserved = {}
+ # `cooked' is a dict of previously backreferenced instances to their
+ # `ref' lists.
+ self.cooked = {}
+ self.cooker = {}
+ self._ref_id = 1
+ self.persistentStore = persistentStore
+ self.invoker = invoker
+
+ def _cook(self, object):
+ """
+ (internal) Backreference an object.
+
+ Notes on this method for the hapless future maintainer: If I've already
+ gone through the prepare/preserve cycle on the specified object (it is
+ being referenced after the serializer is \"done with\" it, e.g. this
+ reference is NOT circular), the copy-in-place of aList is relevant,
+ since the list being modified is the actual, pre-existing jelly
+ expression that was returned for that object. If not, it's technically
+ superfluous, since the value in self.preserved didn't need to be set,
+ but the invariant that self.preserved[id(object)] is a list is
+ convenient because that means we don't have to test and create it or
+ not create it here, creating fewer code-paths. that's why
+ self.preserved is always set to a list.
+
+ Sorry that this code is so hard to follow, but Python objects are
+ tricky to persist correctly. -glyph
+ """
+ aList = self.preserved[id(object)]
+ newList = copy.copy(aList)
+ # make a new reference ID
+ refid = self._ref_id
+ self._ref_id = self._ref_id + 1
+ # replace the old list in-place, so that we don't have to track the
+ # previous reference to it.
+ aList[:] = [reference_atom, refid, newList]
+ self.cooked[id(object)] = [dereference_atom, refid]
+ return aList
+
+ def prepare(self, object):
+ """
+ (internal) Create a list for persisting an object to. This will allow
+ backreferences to be made internal to the object. (circular
+ references).
+
+ The reason this needs to happen is that we don't generate an ID for
+ every object, so we won't necessarily know which ID the object will
+ have in the future. When it is 'cooked' ( see _cook ), it will be
+ assigned an ID, and the temporary placeholder list created here will be
+ modified in-place to create an expression that gives this object an ID:
+ [reference id# [object-jelly]].
+ """
+
+ # create a placeholder list to be preserved
+ self.preserved[id(object)] = []
+ # keep a reference to this object around, so it doesn't disappear!
+ # (This isn't always necessary, but for cases where the objects are
+ # dynamically generated by __getstate__ or getStateToCopyFor calls, it
+ # is; id() will return the same value for a different object if it gets
+ # garbage collected. This may be optimized later.)
+ self.cooker[id(object)] = object
+ return []
+
+ def preserve(self, object, sexp):
+ """
+ (internal) Mark an object's persistent list for later referral.
+ """
+ # if I've been cooked in the meanwhile,
+ if id(object) in self.cooked:
+ # replace the placeholder empty list with the real one
+ self.preserved[id(object)][2] = sexp
+ # but give this one back.
+ sexp = self.preserved[id(object)]
+ else:
+ self.preserved[id(object)] = sexp
+ return sexp
+
+ def _checkMutable(self, obj):
+ objId = id(obj)
+ if objId in self.cooked:
+ return self.cooked[objId]
+ if objId in self.preserved:
+ self._cook(obj)
+ return self.cooked[objId]
+
+ def jelly(self, obj):
+ if isinstance(obj, Jellyable):
+ preRef = self._checkMutable(obj)
+ if preRef:
+ return preRef
+ return obj.jellyFor(self)
+ objType = type(obj)
+ if self.taster.isTypeAllowed(qual(objType).encode("utf-8")):
+ # "Immutable" Types
+ if objType in (bytes, int, float):
+ return obj
+ elif isinstance(obj, types.MethodType):
+ aSelf = obj.__self__
+ aFunc = obj.__func__
+ aClass = aSelf.__class__
+ return [
+ b"method",
+ aFunc.__name__,
+ self.jelly(aSelf),
+ self.jelly(aClass),
+ ]
+ elif objType is str:
+ return [b"unicode", obj.encode("UTF-8")]
+ elif isinstance(obj, type(None)):
+ return [b"None"]
+ elif isinstance(obj, types.FunctionType):
+ return [b"function", obj.__module__ + "." + obj.__qualname__]
+ elif isinstance(obj, types.ModuleType):
+ return [b"module", obj.__name__]
+ elif objType is bool:
+ return [b"boolean", obj and b"true" or b"false"]
+ elif objType is datetime.datetime:
+ if obj.tzinfo:
+ raise NotImplementedError(
+ "Currently can't jelly datetime objects with tzinfo"
+ )
+ return [
+ b"datetime",
+ " ".join(
+ [
+ str(x)
+ for x in (
+ obj.year,
+ obj.month,
+ obj.day,
+ obj.hour,
+ obj.minute,
+ obj.second,
+ obj.microsecond,
+ )
+ ]
+ ).encode("utf-8"),
+ ]
+ elif objType is datetime.time:
+ if obj.tzinfo:
+ raise NotImplementedError(
+ "Currently can't jelly datetime objects with tzinfo"
+ )
+ return [
+ b"time",
+ " ".join(
+ [
+ str(x)
+ for x in (obj.hour, obj.minute, obj.second, obj.microsecond)
+ ]
+ ).encode("utf-8"),
+ ]
+ elif objType is datetime.date:
+ return [
+ b"date",
+ " ".join([str(x) for x in (obj.year, obj.month, obj.day)]).encode(
+ "utf-8"
+ ),
+ ]
+ elif objType is datetime.timedelta:
+ return [
+ b"timedelta",
+ " ".join(
+ [str(x) for x in (obj.days, obj.seconds, obj.microseconds)]
+ ).encode("utf-8"),
+ ]
+ elif issubclass(objType, type):
+ return [b"class", qual(obj).encode("utf-8")]
+ elif objType is decimal.Decimal:
+ return self.jelly_decimal(obj)
+ else:
+ preRef = self._checkMutable(obj)
+ if preRef:
+ return preRef
+ # "Mutable" Types
+ sxp = self.prepare(obj)
+ if objType is list:
+ sxp.extend(self._jellyIterable(list_atom, obj))
+ elif objType is tuple:
+ sxp.extend(self._jellyIterable(tuple_atom, obj))
+ elif objType in DictTypes:
+ sxp.append(dictionary_atom)
+ for key, val in obj.items():
+ sxp.append([self.jelly(key), self.jelly(val)])
+ elif objType is set:
+ sxp.extend(self._jellyIterable(set_atom, obj))
+ elif objType is frozenset:
+ sxp.extend(self._jellyIterable(frozenset_atom, obj))
+ else:
+ className = qual(obj.__class__).encode("utf-8")
+ persistent = None
+ if self.persistentStore:
+ persistent = self.persistentStore(obj, self)
+ if persistent is not None:
+ sxp.append(persistent_atom)
+ sxp.append(persistent)
+ elif self.taster.isClassAllowed(obj.__class__):
+ sxp.append(className)
+ if hasattr(obj, "__getstate__"):
+ state = obj.__getstate__()
+ else:
+ state = obj.__dict__
+ sxp.append(self.jelly(state))
+ else:
+ self.unpersistable(
+ "instance of class %s deemed insecure"
+ % qual(obj.__class__),
+ sxp,
+ )
+ return self.preserve(obj, sxp)
+ else:
+ raise InsecureJelly(f"Type not allowed for object: {objType} {obj}")
+
+ def _jellyIterable(self, atom, obj):
+ """
+ Jelly an iterable object.
+
+ @param atom: the identifier atom of the object.
+ @type atom: C{str}
+
+ @param obj: any iterable object.
+ @type obj: C{iterable}
+
+ @return: a generator of jellied data.
+ @rtype: C{generator}
+ """
+ yield atom
+ for item in obj:
+ yield self.jelly(item)
+
+ def jelly_decimal(self, d):
+ """
+ Jelly a decimal object.
+
+ @param d: a decimal object to serialize.
+ @type d: C{decimal.Decimal}
+
+ @return: jelly for the decimal object.
+ @rtype: C{list}
+ """
+ sign, guts, exponent = d.as_tuple()
+ value = reduce(lambda left, right: left * 10 + right, guts)
+ if sign:
+ value = -value
+ return [b"decimal", value, exponent]
+
+ def unpersistable(self, reason, sxp=None):
+ """
+ (internal) Returns an sexp: (unpersistable "reason"). Utility method
+ for making note that a particular object could not be serialized.
+ """
+ if sxp is None:
+ sxp = []
+ sxp.append(unpersistable_atom)
+ if isinstance(reason, str):
+ reason = reason.encode("utf-8")
+ sxp.append(reason)
+ return sxp
+
+
+class _Unjellier:
+ def __init__(self, taster, persistentLoad, invoker):
+ self.taster = taster
+ self.persistentLoad = persistentLoad
+ self.references = {}
+ self.postCallbacks = []
+ self.invoker = invoker
+
+ def unjellyFull(self, obj):
+ o = self.unjelly(obj)
+ for m in self.postCallbacks:
+ m()
+ return o
+
+ def _maybePostUnjelly(self, unjellied):
+ """
+ If the given object has support for the C{postUnjelly} hook, set it up
+ to be called at the end of deserialization.
+
+ @param unjellied: an object that has already been unjellied.
+
+ @return: C{unjellied}
+ """
+ if hasattr(unjellied, "postUnjelly"):
+ self.postCallbacks.append(unjellied.postUnjelly)
+ return unjellied
+
+ def unjelly(self, obj):
+ if type(obj) is not list:
+ return obj
+ jelTypeBytes = obj[0]
+ if not self.taster.isTypeAllowed(jelTypeBytes):
+ raise InsecureJelly(jelTypeBytes)
+ regClass = unjellyableRegistry.get(jelTypeBytes)
+ if regClass is not None:
+ method = getattr(_createBlank(regClass), "unjellyFor", regClass)
+ return self._maybePostUnjelly(method(self, obj))
+ regFactory = unjellyableFactoryRegistry.get(jelTypeBytes)
+ if regFactory is not None:
+ return self._maybePostUnjelly(regFactory(self.unjelly(obj[1])))
+
+ jelTypeText = nativeString(jelTypeBytes)
+ thunk = getattr(self, "_unjelly_%s" % jelTypeText, None)
+ if thunk is not None:
+ return thunk(obj[1:])
+ else:
+ nameSplit = jelTypeText.split(".")
+ modName = ".".join(nameSplit[:-1])
+ if not self.taster.isModuleAllowed(modName):
+ raise InsecureJelly(
+ f"Module {modName} not allowed (in type {jelTypeText})."
+ )
+ clz = namedObject(jelTypeText)
+ if not self.taster.isClassAllowed(clz):
+ raise InsecureJelly("Class %s not allowed." % jelTypeText)
+ return self._genericUnjelly(clz, obj[1])
+
+ def _genericUnjelly(self, cls, state):
+ """
+ Unjelly a type for which no specific unjellier is registered, but which
+ is nonetheless allowed.
+
+ @param cls: the class of the instance we are unjellying.
+ @type cls: L{type}
+
+ @param state: The jellied representation of the object's state; its
+ C{__dict__} unless it has a C{__setstate__} that takes something
+ else.
+ @type state: L{list}
+
+ @return: the new, unjellied instance.
+ """
+ return self._maybePostUnjelly(_newInstance(cls, self.unjelly(state)))
+
+ def _unjelly_None(self, exp):
+ return None
+
+ def _unjelly_unicode(self, exp):
+ return str(exp[0], "UTF-8")
+
+ def _unjelly_decimal(self, exp):
+ """
+ Unjelly decimal objects.
+ """
+ value = exp[0]
+ exponent = exp[1]
+ if value < 0:
+ sign = 1
+ else:
+ sign = 0
+ guts = decimal.Decimal(value).as_tuple()[1]
+ return decimal.Decimal((sign, guts, exponent))
+
+ def _unjelly_boolean(self, exp):
+ assert exp[0] in (b"true", b"false")
+ return exp[0] == b"true"
+
+ def _unjelly_datetime(self, exp):
+ return datetime.datetime(*map(int, exp[0].split()))
+
+ def _unjelly_date(self, exp):
+ return datetime.date(*map(int, exp[0].split()))
+
+ def _unjelly_time(self, exp):
+ return datetime.time(*map(int, exp[0].split()))
+
+ def _unjelly_timedelta(self, exp):
+ days, seconds, microseconds = map(int, exp[0].split())
+ return datetime.timedelta(days=days, seconds=seconds, microseconds=microseconds)
+
+ def unjellyInto(self, obj, loc, jel):
+ o = self.unjelly(jel)
+ if isinstance(o, NotKnown):
+ o.addDependant(obj, loc)
+ obj[loc] = o
+ return o
+
+ def _unjelly_dereference(self, lst):
+ refid = lst[0]
+ x = self.references.get(refid)
+ if x is not None:
+ return x
+ der = _Dereference(refid)
+ self.references[refid] = der
+ return der
+
+ def _unjelly_reference(self, lst):
+ refid = lst[0]
+ exp = lst[1]
+ o = self.unjelly(exp)
+ ref = self.references.get(refid)
+ if ref is None:
+ self.references[refid] = o
+ elif isinstance(ref, NotKnown):
+ ref.resolveDependants(o)
+ self.references[refid] = o
+ else:
+ assert 0, "Multiple references with same ID!"
+ return o
+
+ def _unjelly_tuple(self, lst):
+ l = list(range(len(lst)))
+ finished = 1
+ for elem in l:
+ if isinstance(self.unjellyInto(l, elem, lst[elem]), NotKnown):
+ finished = 0
+ if finished:
+ return tuple(l)
+ else:
+ return _Tuple(l)
+
+ def _unjelly_list(self, lst):
+ l = list(range(len(lst)))
+ for elem in l:
+ self.unjellyInto(l, elem, lst[elem])
+ return l
+
+ def _unjellySetOrFrozenset(self, lst, containerType):
+ """
+ Helper method to unjelly set or frozenset.
+
+ @param lst: the content of the set.
+ @type lst: C{list}
+
+ @param containerType: the type of C{set} to use.
+ """
+ l = list(range(len(lst)))
+ finished = True
+ for elem in l:
+ data = self.unjellyInto(l, elem, lst[elem])
+ if isinstance(data, NotKnown):
+ finished = False
+ if not finished:
+ return _Container(l, containerType)
+ else:
+ return containerType(l)
+
+ def _unjelly_set(self, lst):
+ """
+ Unjelly set using the C{set} builtin.
+ """
+ return self._unjellySetOrFrozenset(lst, set)
+
+ def _unjelly_frozenset(self, lst):
+ """
+ Unjelly frozenset using the C{frozenset} builtin.
+ """
+ return self._unjellySetOrFrozenset(lst, frozenset)
+
+ def _unjelly_dictionary(self, lst):
+ d = {}
+ for k, v in lst:
+ kvd = _DictKeyAndValue(d)
+ self.unjellyInto(kvd, 0, k)
+ self.unjellyInto(kvd, 1, v)
+ return d
+
+ def _unjelly_module(self, rest):
+ moduleName = nativeString(rest[0])
+ if type(moduleName) != str:
+ raise InsecureJelly("Attempted to unjelly a module with a non-string name.")
+ if not self.taster.isModuleAllowed(moduleName):
+ raise InsecureJelly(f"Attempted to unjelly module named {moduleName!r}")
+ mod = __import__(moduleName, {}, {}, "x")
+ return mod
+
+ def _unjelly_class(self, rest):
+ cname = nativeString(rest[0])
+ clist = cname.split(nativeString("."))
+ modName = nativeString(".").join(clist[:-1])
+ if not self.taster.isModuleAllowed(modName):
+ raise InsecureJelly("module %s not allowed" % modName)
+ klaus = namedObject(cname)
+ objType = type(klaus)
+ if objType is not type:
+ raise InsecureJelly(
+ "class %r unjellied to something that isn't a class: %r"
+ % (cname, klaus)
+ )
+ if not self.taster.isClassAllowed(klaus):
+ raise InsecureJelly("class not allowed: %s" % qual(klaus))
+ return klaus
+
+ def _unjelly_function(self, rest):
+ fname = nativeString(rest[0])
+ modSplit = fname.split(nativeString("."))
+ modName = nativeString(".").join(modSplit[:-1])
+ if not self.taster.isModuleAllowed(modName):
+ raise InsecureJelly("Module not allowed: %s" % modName)
+ # XXX do I need an isFunctionAllowed?
+ function = namedAny(fname)
+ return function
+
+ def _unjelly_persistent(self, rest):
+ if self.persistentLoad:
+ pload = self.persistentLoad(rest[0], self)
+ return pload
+ else:
+ return Unpersistable("Persistent callback not found")
+
+ def _unjelly_instance(self, rest):
+ """
+ (internal) Unjelly an instance.
+
+ Called to handle the deprecated I{instance} token.
+
+ @param rest: The s-expression representing the instance.
+
+ @return: The unjellied instance.
+ """
+ warnings.warn_explicit(
+ "Unjelly support for the instance atom is deprecated since "
+ "Twisted 15.0.0. Upgrade peer for modern instance support.",
+ category=DeprecationWarning,
+ filename="",
+ lineno=0,
+ )
+
+ clz = self.unjelly(rest[0])
+ return self._genericUnjelly(clz, rest[1])
+
+ def _unjelly_unpersistable(self, rest):
+ return Unpersistable(f"Unpersistable data: {rest[0]}")
+
+ def _unjelly_method(self, rest):
+ """
+ (internal) Unjelly a method.
+ """
+ im_name = rest[0]
+ im_self = self.unjelly(rest[1])
+ im_class = self.unjelly(rest[2])
+ if not isinstance(im_class, type):
+ raise InsecureJelly("Method found with non-class class.")
+ if im_name in im_class.__dict__:
+ if im_self is None:
+ im = getattr(im_class, im_name)
+ elif isinstance(im_self, NotKnown):
+ im = _InstanceMethod(im_name, im_self, im_class)
+ else:
+ im = types.MethodType(
+ im_class.__dict__[im_name], im_self, *([im_class] * (False))
+ )
+ else:
+ raise TypeError("instance method changed")
+ return im
+
+
+#### Published Interface.
+
+
+class InsecureJelly(Exception):
+ """
+ This exception will be raised when a jelly is deemed `insecure'; e.g. it
+ contains a type, class, or module disallowed by the specified `taster'
+ """
+
+
+class DummySecurityOptions:
+ """
+ DummySecurityOptions() -> insecure security options
+ Dummy security options -- this class will allow anything.
+ """
+
+ def isModuleAllowed(self, moduleName):
+ """
+ DummySecurityOptions.isModuleAllowed(moduleName) -> boolean
+ returns 1 if a module by that name is allowed, 0 otherwise
+ """
+ return 1
+
+ def isClassAllowed(self, klass):
+ """
+ DummySecurityOptions.isClassAllowed(class) -> boolean
+ Assumes the module has already been allowed. Returns 1 if the given
+ class is allowed, 0 otherwise.
+ """
+ return 1
+
+ def isTypeAllowed(self, typeName):
+ """
+ DummySecurityOptions.isTypeAllowed(typeName) -> boolean
+ Returns 1 if the given type is allowed, 0 otherwise.
+ """
+ return 1
+
+
+class SecurityOptions:
+ """
+ This will by default disallow everything, except for 'none'.
+ """
+
+ basicTypes = [
+ "dictionary",
+ "list",
+ "tuple",
+ "reference",
+ "dereference",
+ "unpersistable",
+ "persistent",
+ "long_int",
+ "long",
+ "dict",
+ ]
+
+ def __init__(self):
+ """
+ SecurityOptions() initialize.
+ """
+ # I don't believe any of these types can ever pose a security hazard,
+ # except perhaps "reference"...
+ self.allowedTypes = {
+ b"None": 1,
+ b"bool": 1,
+ b"boolean": 1,
+ b"string": 1,
+ b"str": 1,
+ b"int": 1,
+ b"float": 1,
+ b"datetime": 1,
+ b"time": 1,
+ b"date": 1,
+ b"timedelta": 1,
+ b"NoneType": 1,
+ b"unicode": 1,
+ b"decimal": 1,
+ b"set": 1,
+ b"frozenset": 1,
+ }
+ self.allowedModules = {}
+ self.allowedClasses = {}
+
+ def allowBasicTypes(self):
+ """
+ Allow all `basic' types. (Dictionary and list. Int, string, and float
+ are implicitly allowed.)
+ """
+ self.allowTypes(*self.basicTypes)
+
+ def allowTypes(self, *types):
+ """
+ SecurityOptions.allowTypes(typeString): Allow a particular type, by its
+ name.
+ """
+ for typ in types:
+ if isinstance(typ, str):
+ typ = typ.encode("utf-8")
+ if not isinstance(typ, bytes):
+ typ = qual(typ)
+ self.allowedTypes[typ] = 1
+
+ def allowInstancesOf(self, *classes):
+ """
+ SecurityOptions.allowInstances(klass, klass, ...): allow instances
+ of the specified classes
+
+ This will also allow the 'instance', 'class' (renamed 'classobj' in
+ Python 2.3), and 'module' types, as well as basic types.
+ """
+ self.allowBasicTypes()
+ self.allowTypes("instance", "class", "classobj", "module")
+ for klass in classes:
+ self.allowTypes(qual(klass))
+ self.allowModules(klass.__module__)
+ self.allowedClasses[klass] = 1
+
+ def allowModules(self, *modules):
+ """
+ SecurityOptions.allowModules(module, module, ...): allow modules by
+ name. This will also allow the 'module' type.
+ """
+ for module in modules:
+ if type(module) == types.ModuleType:
+ module = module.__name__
+
+ if not isinstance(module, bytes):
+ module = module.encode("utf-8")
+
+ self.allowedModules[module] = 1
+
+ def isModuleAllowed(self, moduleName):
+ """
+ SecurityOptions.isModuleAllowed(moduleName) -> boolean
+ returns 1 if a module by that name is allowed, 0 otherwise
+ """
+ if not isinstance(moduleName, bytes):
+ moduleName = moduleName.encode("utf-8")
+
+ return moduleName in self.allowedModules
+
+ def isClassAllowed(self, klass):
+ """
+ SecurityOptions.isClassAllowed(class) -> boolean
+ Assumes the module has already been allowed. Returns 1 if the given
+ class is allowed, 0 otherwise.
+ """
+ return klass in self.allowedClasses
+
+ def isTypeAllowed(self, typeName):
+ """
+ SecurityOptions.isTypeAllowed(typeName) -> boolean
+ Returns 1 if the given type is allowed, 0 otherwise.
+ """
+ if not isinstance(typeName, bytes):
+ typeName = typeName.encode("utf-8")
+
+ return typeName in self.allowedTypes or b"." in typeName
+
+
+globalSecurity = SecurityOptions()
+globalSecurity.allowBasicTypes()
+
+
+def jelly(object, taster=DummySecurityOptions(), persistentStore=None, invoker=None):
+ """
+ Serialize to s-expression.
+
+ Returns a list which is the serialized representation of an object. An
+ optional 'taster' argument takes a SecurityOptions and will mark any
+ insecure objects as unpersistable rather than serializing them.
+ """
+ return _Jellier(taster, persistentStore, invoker).jelly(object)
+
+
+def unjelly(sexp, taster=DummySecurityOptions(), persistentLoad=None, invoker=None):
+ """
+ Unserialize from s-expression.
+
+ Takes a list that was the result from a call to jelly() and unserializes
+ an arbitrary object from it. The optional 'taster' argument, an instance
+ of SecurityOptions, will cause an InsecureJelly exception to be raised if a
+ disallowed type, module, or class attempted to unserialize.
+ """
+ return _Unjellier(taster, persistentLoad, invoker).unjellyFull(sexp)
diff --git a/contrib/python/Twisted/py3/twisted/spread/pb.py b/contrib/python/Twisted/py3/twisted/spread/pb.py
new file mode 100644
index 0000000000..dcf545015d
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/spread/pb.py
@@ -0,0 +1,1674 @@
+# -*- test-case-name: twisted.spread.test.test_pb -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Perspective Broker
+
+\"This isn\'t a professional opinion, but it's probably got enough
+internet to kill you.\" --glyph
+
+Introduction
+============
+
+This is a broker for proxies for and copies of objects. It provides a
+translucent interface layer to those proxies.
+
+The protocol is not opaque, because it provides objects which represent the
+remote proxies and require no context (server references, IDs) to operate on.
+
+It is not transparent because it does I{not} attempt to make remote objects
+behave identically, or even similarly, to local objects. Method calls are
+invoked asynchronously, and specific rules are applied when serializing
+arguments.
+
+To get started, begin with L{PBClientFactory} and L{PBServerFactory}.
+
+@author: Glyph Lefkowitz
+"""
+
+
+import random
+from hashlib import md5
+
+from zope.interface import Interface, implementer
+
+from twisted.cred.credentials import (
+ Anonymous,
+ IAnonymous,
+ ICredentials,
+ IUsernameHashedPassword,
+)
+from twisted.cred.portal import Portal
+from twisted.internet import defer, protocol
+from twisted.persisted import styles
+
+# Twisted Imports
+from twisted.python import failure, log, reflect
+from twisted.python.compat import cmp, comparable
+from twisted.python.components import registerAdapter
+from twisted.spread import banana
+
+# These three are backwards compatibility aliases for the previous three.
+# Ultimately they should be deprecated. -exarkun
+from twisted.spread.flavors import (
+ Cacheable,
+ Copyable,
+ IPBRoot,
+ Jellyable,
+ NoSuchMethod,
+ Referenceable,
+ RemoteCache,
+ RemoteCacheObserver,
+ RemoteCopy,
+ Root,
+ Serializable,
+ Viewable,
+ ViewPoint,
+ copyTags,
+ setCopierForClass,
+ setCopierForClassTree,
+ setFactoryForClass,
+ setUnjellyableFactoryForClass,
+ setUnjellyableForClass,
+ setUnjellyableForClassTree,
+)
+from twisted.spread.interfaces import IJellyable, IUnjellyable
+from twisted.spread.jelly import _newInstance, globalSecurity, jelly, unjelly
+
+MAX_BROKER_REFS = 1024
+
+portno = 8787
+
+
+class ProtocolError(Exception):
+ """
+ This error is raised when an invalid protocol statement is received.
+ """
+
+
+class DeadReferenceError(ProtocolError):
+ """
+ This error is raised when a method is called on a dead reference (one whose
+ broker has been disconnected).
+ """
+
+
+class Error(Exception):
+ """
+ This error can be raised to generate known error conditions.
+
+ When a PB callable method (perspective_, remote_, view_) raises
+ this error, it indicates that a traceback should not be printed,
+ but instead, the string representation of the exception should be
+ sent.
+ """
+
+
+class RemoteError(Exception):
+ """
+ This class is used to wrap a string-ified exception from the remote side to
+ be able to reraise it. (Raising string exceptions is no longer possible in
+ Python 2.6+)
+
+ The value of this exception will be a str() representation of the remote
+ value.
+
+ @ivar remoteType: The full import path of the exception class which was
+ raised on the remote end.
+ @type remoteType: C{str}
+
+ @ivar remoteTraceback: The remote traceback.
+ @type remoteTraceback: C{str}
+
+ @note: It's not possible to include the remoteTraceback if this exception is
+ thrown into a generator. It must be accessed as an attribute.
+ """
+
+ def __init__(self, remoteType, value, remoteTraceback):
+ Exception.__init__(self, value)
+ self.remoteType = remoteType
+ self.remoteTraceback = remoteTraceback
+
+
+@comparable
+class RemoteMethod:
+ """
+ This is a translucent reference to a remote message.
+ """
+
+ def __init__(self, obj, name):
+ """
+ Initialize with a L{RemoteReference} and the name of this message.
+ """
+ self.obj = obj
+ self.name = name
+
+ def __cmp__(self, other):
+ return cmp((self.obj, self.name), other)
+
+ def __hash__(self):
+ return hash((self.obj, self.name))
+
+ def __call__(self, *args, **kw):
+ """
+ Asynchronously invoke a remote method.
+ """
+ return self.obj.broker._sendMessage(
+ b"",
+ self.obj.perspective,
+ self.obj.luid,
+ self.name.encode("utf-8"),
+ args,
+ kw,
+ )
+
+
+class PBConnectionLost(Exception):
+ pass
+
+
+class IPerspective(Interface):
+ """
+ per*spec*tive, n. : The relationship of aspects of a subject to each
+ other and to a whole: 'a perspective of history'; 'a need to view
+ the problem in the proper perspective'.
+
+ This is a Perspective Broker-specific wrapper for an avatar. That
+ is to say, a PB-published view on to the business logic for the
+ system's concept of a 'user'.
+
+ The concept of attached/detached is no longer implemented by the
+ framework. The realm is expected to implement such semantics if
+ needed.
+ """
+
+ def perspectiveMessageReceived(broker, message, args, kwargs):
+ """
+ This method is called when a network message is received.
+
+ @arg broker: The Perspective Broker.
+
+ @type message: str
+ @arg message: The name of the method called by the other end.
+
+ @type args: list in jelly format
+ @arg args: The arguments that were passed by the other end. It
+ is recommend that you use the `unserialize' method of the
+ broker to decode this.
+
+ @type kwargs: dict in jelly format
+ @arg kwargs: The keyword arguments that were passed by the
+ other end. It is recommended that you use the
+ `unserialize' method of the broker to decode this.
+
+ @rtype: A jelly list.
+ @return: It is recommended that you use the `serialize' method
+ of the broker on whatever object you need to return to
+ generate the return value.
+ """
+
+
+@implementer(IPerspective)
+class Avatar:
+ """
+ A default IPerspective implementor.
+
+ This class is intended to be subclassed, and a realm should return
+ an instance of such a subclass when IPerspective is requested of
+ it.
+
+ A peer requesting a perspective will receive only a
+ L{RemoteReference} to a pb.Avatar. When a method is called on
+ that L{RemoteReference}, it will translate to a method on the
+ remote perspective named 'perspective_methodname'. (For more
+ information on invoking methods on other objects, see
+ L{flavors.ViewPoint}.)
+ """
+
+ def perspectiveMessageReceived(self, broker, message, args, kw):
+ """
+ This method is called when a network message is received.
+
+ This will call::
+
+ self.perspective_%(message)s(*broker.unserialize(args),
+ **broker.unserialize(kw))
+
+ to handle the method; subclasses of Avatar are expected to
+ implement methods using this naming convention.
+ """
+
+ args = broker.unserialize(args, self)
+ kw = broker.unserialize(kw, self)
+ method = getattr(self, "perspective_%s" % message)
+ try:
+ state = method(*args, **kw)
+ except TypeError:
+ log.msg(f"{method} didn't accept {args} and {kw}")
+ raise
+ return broker.serialize(state, self, method, args, kw)
+
+
+class AsReferenceable(Referenceable):
+ """
+ A reference directed towards another object.
+ """
+
+ def __init__(self, object, messageType="remote"):
+ self.remoteMessageReceived = getattr(object, messageType + "MessageReceived")
+
+
+@implementer(IUnjellyable)
+@comparable
+class RemoteReference(Serializable, styles.Ephemeral):
+ """
+ A translucent reference to a remote object.
+
+ I may be a reference to a L{flavors.ViewPoint}, a
+ L{flavors.Referenceable}, or an L{IPerspective} implementer (e.g.,
+ pb.Avatar). From the client's perspective, it is not possible to
+ tell which except by convention.
+
+ I am a \"translucent\" reference because although no additional
+ bookkeeping overhead is given to the application programmer for
+ manipulating a reference, return values are asynchronous.
+
+ See also L{twisted.internet.defer}.
+
+ @ivar broker: The broker I am obtained through.
+ @type broker: L{Broker}
+ """
+
+ def __init__(self, perspective, broker, luid, doRefCount):
+ """(internal) Initialize me with a broker and a locally-unique ID.
+
+ The ID is unique only to the particular Perspective Broker
+ instance.
+ """
+ self.luid = luid
+ self.broker = broker
+ self.doRefCount = doRefCount
+ self.perspective = perspective
+ self.disconnectCallbacks = []
+
+ def notifyOnDisconnect(self, callback):
+ """
+ Register a callback to be called if our broker gets disconnected.
+
+ @param callback: a callable which will be called with one
+ argument, this instance.
+ """
+ assert callable(callback)
+ self.disconnectCallbacks.append(callback)
+ if len(self.disconnectCallbacks) == 1:
+ self.broker.notifyOnDisconnect(self._disconnected)
+
+ def dontNotifyOnDisconnect(self, callback):
+ """
+ Remove a callback that was registered with notifyOnDisconnect.
+
+ @param callback: a callable
+ """
+ self.disconnectCallbacks.remove(callback)
+ if not self.disconnectCallbacks:
+ self.broker.dontNotifyOnDisconnect(self._disconnected)
+
+ def _disconnected(self):
+ """
+ Called if we are disconnected and have callbacks registered.
+ """
+ for callback in self.disconnectCallbacks:
+ callback(self)
+ self.disconnectCallbacks = None
+
+ def jellyFor(self, jellier):
+ """
+ If I am being sent back to where I came from, serialize as a local backreference.
+ """
+ if jellier.invoker:
+ assert (
+ self.broker == jellier.invoker
+ ), "Can't send references to brokers other than their own."
+ return b"local", self.luid
+ else:
+ return b"unpersistable", "References cannot be serialized"
+
+ def unjellyFor(self, unjellier, unjellyList):
+ self.__init__(
+ unjellier.invoker.unserializingPerspective,
+ unjellier.invoker,
+ unjellyList[1],
+ 1,
+ )
+ return self
+
+ def callRemote(self, _name, *args, **kw):
+ """
+ Asynchronously invoke a remote method.
+
+ @type _name: L{str}
+ @param _name: the name of the remote method to invoke
+ @param args: arguments to serialize for the remote function
+ @param kw: keyword arguments to serialize for the remote function.
+ @rtype: L{twisted.internet.defer.Deferred}
+ @returns: a Deferred which will be fired when the result of
+ this remote call is received.
+ """
+ if not isinstance(_name, bytes):
+ _name = _name.encode("utf8")
+
+ # Note that we use '_name' instead of 'name' so the user can call
+ # remote methods with 'name' as a keyword parameter, like this:
+ # ref.callRemote("getPeopleNamed", count=12, name="Bob")
+ return self.broker._sendMessage(
+ b"", self.perspective, self.luid, _name, args, kw
+ )
+
+ def remoteMethod(self, key):
+ """
+
+ @param key: The key.
+ @return: A L{RemoteMethod} for this key.
+ """
+ return RemoteMethod(self, key)
+
+ def __cmp__(self, other):
+ """
+
+ @param other: another L{RemoteReference} to compare me to.
+ """
+ if isinstance(other, RemoteReference):
+ if other.broker == self.broker:
+ return cmp(self.luid, other.luid)
+ return cmp(self.broker, other)
+
+ def __hash__(self):
+ """
+ Hash me.
+ """
+ return self.luid
+
+ def __del__(self):
+ """
+ Do distributed reference counting on finalization.
+ """
+ if self.doRefCount:
+ self.broker.sendDecRef(self.luid)
+
+
+setUnjellyableForClass("remote", RemoteReference)
+
+
+class Local:
+ """
+ (internal) A reference to a local object.
+ """
+
+ def __init__(self, object, perspective=None):
+ """
+ Initialize.
+ """
+ self.object = object
+ self.perspective = perspective
+ self.refcount = 1
+
+ def __repr__(self) -> str:
+ return f"<pb.Local {self.object!r} ref:{self.refcount}>"
+
+ def incref(self):
+ """
+ Increment the reference count.
+
+ @return: the reference count after incrementing
+ """
+ self.refcount = self.refcount + 1
+ return self.refcount
+
+ def decref(self):
+ """
+ Decrement the reference count.
+
+ @return: the reference count after decrementing
+ """
+ self.refcount = self.refcount - 1
+ return self.refcount
+
+
+# Failure
+class CopyableFailure(failure.Failure, Copyable):
+ """
+ A L{flavors.RemoteCopy} and L{flavors.Copyable} version of
+ L{twisted.python.failure.Failure} for serialization.
+ """
+
+ unsafeTracebacks = 0
+
+ def getStateToCopy(self):
+ """
+ Collect state related to the exception which occurred, discarding
+ state which cannot reasonably be serialized.
+ """
+ state = self.__dict__.copy()
+ state["tb"] = None
+ state["frames"] = []
+ state["stack"] = []
+ state["value"] = str(self.value) # Exception instance
+ if isinstance(self.type, bytes):
+ state["type"] = self.type
+ else:
+ state["type"] = reflect.qual(self.type).encode("utf-8") # Exception class
+ if self.unsafeTracebacks:
+ state["traceback"] = self.getTraceback()
+ else:
+ state["traceback"] = "Traceback unavailable\n"
+ return state
+
+
+class CopiedFailure(RemoteCopy, failure.Failure):
+ """
+ A L{CopiedFailure} is a L{pb.RemoteCopy} of a L{failure.Failure}
+ transferred via PB.
+
+ @ivar type: The full import path of the exception class which was raised on
+ the remote end.
+ @type type: C{str}
+
+ @ivar value: A str() representation of the remote value.
+ @type value: L{CopiedFailure} or C{str}
+
+ @ivar traceback: The remote traceback.
+ @type traceback: C{str}
+ """
+
+ def printTraceback(self, file=None, elideFrameworkCode=0, detail="default"):
+ if file is None:
+ file = log.logfile
+ failureType = self.type
+ if not isinstance(failureType, str):
+ failureType = failureType.decode("utf-8")
+ file.write("Traceback from remote host -- ")
+ file.write(failureType + ": " + self.value)
+ file.write("\n")
+
+ def throwExceptionIntoGenerator(self, g):
+ """
+ Throw the original exception into the given generator, preserving
+ traceback information if available. In the case of a L{CopiedFailure}
+ where the exception type is a string, a L{pb.RemoteError} is thrown
+ instead.
+
+ @return: The next value yielded from the generator.
+ @raise StopIteration: If there are no more values in the generator.
+ @raise RemoteError: The wrapped remote exception.
+ """
+ return g.throw(RemoteError(self.type, self.value, self.traceback))
+
+ printBriefTraceback = printTraceback
+ printDetailedTraceback = printTraceback
+
+
+setUnjellyableForClass(CopyableFailure, CopiedFailure)
+
+
+def failure2Copyable(fail, unsafeTracebacks=0):
+ f = _newInstance(CopyableFailure, fail.__dict__)
+ f.unsafeTracebacks = unsafeTracebacks
+ return f
+
+
+class Broker(banana.Banana):
+ """
+ I am a broker for objects.
+ """
+
+ version = 6
+ username = None
+ factory = None
+
+ def __init__(self, isClient=1, security=globalSecurity):
+ banana.Banana.__init__(self, isClient)
+ self.disconnected = 0
+ self.disconnects = []
+ self.failures = []
+ self.connects = []
+ self.localObjects = {}
+ self.security = security
+ self.pageProducers = []
+ self.currentRequestID = 0
+ self.currentLocalID = 0
+ self.unserializingPerspective = None
+ # Some terms:
+ # PUID: process unique ID; return value of id() function. type "int".
+ # LUID: locally unique ID; an ID unique to an object mapped over this
+ # connection. type "int"
+ # GUID: (not used yet) globally unique ID; an ID for an object which
+ # may be on a redirected or meta server. Type as yet undecided.
+ # Dictionary mapping LUIDs to local objects.
+ # set above to allow root object to be assigned before connection is made
+ # self.localObjects = {}
+ # Dictionary mapping PUIDs to LUIDs.
+ self.luids = {}
+ # Dictionary mapping LUIDs to local (remotely cached) objects. Remotely
+ # cached means that they're objects which originate here, and were
+ # copied remotely.
+ self.remotelyCachedObjects = {}
+ # Dictionary mapping PUIDs to (cached) LUIDs
+ self.remotelyCachedLUIDs = {}
+ # Dictionary mapping (remote) LUIDs to (locally cached) objects.
+ self.locallyCachedObjects = {}
+ self.waitingForAnswers = {}
+
+ # Mapping from LUIDs to weakref objects with callbacks for performing
+ # any local cleanup which may be necessary for the corresponding
+ # object once it no longer exists.
+ self._localCleanup = {}
+
+ def resumeProducing(self):
+ """
+ Called when the consumer attached to me runs out of buffer.
+ """
+ # Go backwards over the list so we can remove indexes from it as we go
+ for pageridx in range(len(self.pageProducers) - 1, -1, -1):
+ pager = self.pageProducers[pageridx]
+ pager.sendNextPage()
+ if not pager.stillPaging():
+ del self.pageProducers[pageridx]
+ if not self.pageProducers:
+ self.transport.unregisterProducer()
+
+ def pauseProducing(self):
+ # Streaming producer method; not necessary to implement.
+ pass
+
+ def stopProducing(self):
+ # Streaming producer method; not necessary to implement.
+ pass
+
+ def registerPageProducer(self, pager):
+ self.pageProducers.append(pager)
+ if len(self.pageProducers) == 1:
+ self.transport.registerProducer(self, 0)
+
+ def expressionReceived(self, sexp):
+ """
+ Evaluate an expression as it's received.
+ """
+ if isinstance(sexp, list):
+ command = sexp[0]
+
+ if not isinstance(command, str):
+ command = command.decode("utf8")
+
+ methodName = "proto_%s" % command
+ method = getattr(self, methodName, None)
+
+ if method:
+ method(*sexp[1:])
+ else:
+ self.sendCall(b"didNotUnderstand", command)
+ else:
+ raise ProtocolError("Non-list expression received.")
+
+ def proto_version(self, vnum):
+ """
+ Protocol message: (version version-number)
+
+ Check to make sure that both ends of the protocol are speaking
+ the same version dialect.
+
+ @param vnum: The version number.
+ """
+
+ if vnum != self.version:
+ raise ProtocolError(f"Version Incompatibility: {self.version} {vnum}")
+
+ def sendCall(self, *exp):
+ """
+ Utility method to send an expression to the other side of the connection.
+
+ @param exp: The expression.
+ """
+ self.sendEncoded(exp)
+
+ def proto_didNotUnderstand(self, command):
+ """
+ Respond to stock 'C{didNotUnderstand}' message.
+
+ Log the command that was not understood and continue. (Note:
+ this will probably be changed to close the connection or raise
+ an exception in the future.)
+
+ @param command: The command to log.
+ """
+ log.msg("Didn't understand command: %r" % command)
+
+ def connectionReady(self):
+ """
+ Initialize. Called after Banana negotiation is done.
+ """
+ self.sendCall(b"version", self.version)
+ for notifier in self.connects:
+ try:
+ notifier()
+ except BaseException:
+ log.deferr()
+ self.connects = None
+ self.factory.clientConnectionMade(self)
+
+ def connectionFailed(self):
+ # XXX should never get called anymore? check!
+ for notifier in self.failures:
+ try:
+ notifier()
+ except BaseException:
+ log.deferr()
+ self.failures = None
+
+ waitingForAnswers = None
+
+ def connectionLost(self, reason):
+ """
+ The connection was lost.
+
+ @param reason: message to put in L{failure.Failure}
+ """
+ self.disconnected = 1
+ # Nuke potential circular references.
+ self.luids = None
+ if self.waitingForAnswers:
+ for d in self.waitingForAnswers.values():
+ try:
+ d.errback(failure.Failure(PBConnectionLost(reason)))
+ except BaseException:
+ log.deferr()
+ # Assure all Cacheable.stoppedObserving are called
+ for lobj in self.remotelyCachedObjects.values():
+ cacheable = lobj.object
+ perspective = lobj.perspective
+ try:
+ cacheable.stoppedObserving(
+ perspective, RemoteCacheObserver(self, cacheable, perspective)
+ )
+ except BaseException:
+ log.deferr()
+ # Loop on a copy to prevent notifiers to mixup
+ # the list by calling dontNotifyOnDisconnect
+ for notifier in self.disconnects[:]:
+ try:
+ notifier()
+ except BaseException:
+ log.deferr()
+ self.disconnects = None
+ self.waitingForAnswers = None
+ self.localSecurity = None
+ self.remoteSecurity = None
+ self.remotelyCachedObjects = None
+ self.remotelyCachedLUIDs = None
+ self.locallyCachedObjects = None
+ self.localObjects = None
+
+ def notifyOnDisconnect(self, notifier):
+ """
+
+ @param notifier: callback to call when the Broker disconnects.
+ """
+ assert callable(notifier)
+ self.disconnects.append(notifier)
+
+ def notifyOnFail(self, notifier):
+ """
+
+ @param notifier: callback to call if the Broker fails to connect.
+ """
+ assert callable(notifier)
+ self.failures.append(notifier)
+
+ def notifyOnConnect(self, notifier):
+ """
+
+ @param notifier: callback to call when the Broker connects.
+ """
+ assert callable(notifier)
+ if self.connects is None:
+ try:
+ notifier()
+ except BaseException:
+ log.err()
+ else:
+ self.connects.append(notifier)
+
+ def dontNotifyOnDisconnect(self, notifier):
+ """
+
+ @param notifier: callback to remove from list of disconnect callbacks.
+ """
+ try:
+ self.disconnects.remove(notifier)
+ except ValueError:
+ pass
+
+ def localObjectForID(self, luid):
+ """
+ Get a local object for a locally unique ID.
+
+ @return: An object previously stored with L{registerReference} or
+ L{None} if there is no object which corresponds to the given
+ identifier.
+ """
+ if isinstance(luid, str):
+ luid = luid.encode("utf8")
+
+ lob = self.localObjects.get(luid)
+ if lob is None:
+ return
+ return lob.object
+
+ maxBrokerRefsViolations = 0
+
+ def registerReference(self, object):
+ """
+ Store a persistent reference to a local object and map its
+ id() to a generated, session-unique ID.
+
+ @param object: a local object
+ @return: the generated ID
+ """
+
+ assert object is not None
+ puid = object.processUniqueID()
+ luid = self.luids.get(puid)
+ if luid is None:
+ if len(self.localObjects) > MAX_BROKER_REFS:
+ self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
+ if self.maxBrokerRefsViolations > 3:
+ self.transport.loseConnection()
+ raise Error("Maximum PB reference count exceeded. " "Goodbye.")
+ raise Error("Maximum PB reference count exceeded.")
+
+ luid = self.newLocalID()
+ self.localObjects[luid] = Local(object)
+ self.luids[puid] = luid
+ else:
+ self.localObjects[luid].incref()
+ return luid
+
+ def setNameForLocal(self, name, object):
+ """
+ Store a special (string) ID for this object.
+
+ This is how you specify a 'base' set of objects that the remote
+ protocol can connect to.
+
+ @param name: An ID.
+ @param object: The object.
+ """
+ if isinstance(name, str):
+ name = name.encode("utf8")
+
+ assert object is not None
+ self.localObjects[name] = Local(object)
+
+ def remoteForName(self, name):
+ """
+ Returns an object from the remote name mapping.
+
+ Note that this does not check the validity of the name, only
+ creates a translucent reference for it.
+
+ @param name: The name to look up.
+ @return: An object which maps to the name.
+ """
+ if isinstance(name, str):
+ name = name.encode("utf8")
+
+ return RemoteReference(None, self, name, 0)
+
+ def cachedRemotelyAs(self, instance, incref=0):
+ """
+
+ @param instance: The instance to look up.
+ @param incref: Flag to specify whether to increment the
+ reference.
+ @return: An ID that says what this instance is cached as
+ remotely, or L{None} if it's not.
+ """
+
+ puid = instance.processUniqueID()
+ luid = self.remotelyCachedLUIDs.get(puid)
+ if (luid is not None) and (incref):
+ self.remotelyCachedObjects[luid].incref()
+ return luid
+
+ def remotelyCachedForLUID(self, luid):
+ """
+
+ @param luid: The LUID to look up.
+ @return: An instance which is cached remotely.
+ """
+ return self.remotelyCachedObjects[luid].object
+
+ def cacheRemotely(self, instance):
+ """
+ XXX
+
+ @return: A new LUID.
+ """
+ puid = instance.processUniqueID()
+ luid = self.newLocalID()
+ if len(self.remotelyCachedObjects) > MAX_BROKER_REFS:
+ self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
+ if self.maxBrokerRefsViolations > 3:
+ self.transport.loseConnection()
+ raise Error("Maximum PB cache count exceeded. " "Goodbye.")
+ raise Error("Maximum PB cache count exceeded.")
+
+ self.remotelyCachedLUIDs[puid] = luid
+ # This table may not be necessary -- for now, it's to make sure that no
+ # monkey business happens with id(instance)
+ self.remotelyCachedObjects[luid] = Local(instance, self.serializingPerspective)
+ return luid
+
+ def cacheLocally(self, cid, instance):
+ """(internal)
+
+ Store a non-filled-out cached instance locally.
+ """
+ self.locallyCachedObjects[cid] = instance
+
+ def cachedLocallyAs(self, cid):
+ instance = self.locallyCachedObjects[cid]
+ return instance
+
+ def serialize(self, object, perspective=None, method=None, args=None, kw=None):
+ """
+ Jelly an object according to the remote security rules for this broker.
+
+ @param object: The object to jelly.
+ @param perspective: The perspective.
+ @param method: The method.
+ @param args: Arguments.
+ @param kw: Keyword arguments.
+ """
+
+ if isinstance(object, defer.Deferred):
+ object.addCallbacks(
+ self.serialize,
+ lambda x: x,
+ callbackKeywords={
+ "perspective": perspective,
+ "method": method,
+ "args": args,
+ "kw": kw,
+ },
+ )
+ return object
+
+ # XXX This call is NOT REENTRANT and testing for reentrancy is just
+ # crazy, so it likely won't be. Don't ever write methods that call the
+ # broker's serialize() method recursively (e.g. sending a method call
+ # from within a getState (this causes concurrency problems anyway so
+ # you really, really shouldn't do it))
+
+ self.serializingPerspective = perspective
+ self.jellyMethod = method
+ self.jellyArgs = args
+ self.jellyKw = kw
+ try:
+ return jelly(object, self.security, None, self)
+ finally:
+ self.serializingPerspective = None
+ self.jellyMethod = None
+ self.jellyArgs = None
+ self.jellyKw = None
+
+ def unserialize(self, sexp, perspective=None):
+ """
+ Unjelly an sexp according to the local security rules for this broker.
+
+ @param sexp: The object to unjelly.
+ @param perspective: The perspective.
+ """
+
+ self.unserializingPerspective = perspective
+ try:
+ return unjelly(sexp, self.security, None, self)
+ finally:
+ self.unserializingPerspective = None
+
+ def newLocalID(self):
+ """
+
+ @return: A newly generated LUID.
+ """
+ self.currentLocalID = self.currentLocalID + 1
+ return self.currentLocalID
+
+ def newRequestID(self):
+ """
+
+ @return: A newly generated request ID.
+ """
+ self.currentRequestID = self.currentRequestID + 1
+ return self.currentRequestID
+
+ def _sendMessage(self, prefix, perspective, objectID, message, args, kw):
+ pbc = None
+ pbe = None
+ answerRequired = 1
+ if "pbcallback" in kw:
+ pbc = kw["pbcallback"]
+ del kw["pbcallback"]
+ if "pberrback" in kw:
+ pbe = kw["pberrback"]
+ del kw["pberrback"]
+ if "pbanswer" in kw:
+ assert (not pbe) and (not pbc), "You can't specify a no-answer requirement."
+ answerRequired = kw["pbanswer"]
+ del kw["pbanswer"]
+ if self.disconnected:
+ raise DeadReferenceError("Calling Stale Broker")
+ try:
+ netArgs = self.serialize(args, perspective=perspective, method=message)
+ netKw = self.serialize(kw, perspective=perspective, method=message)
+ except BaseException:
+ return defer.fail(failure.Failure())
+ requestID = self.newRequestID()
+ if answerRequired:
+ rval = defer.Deferred()
+ self.waitingForAnswers[requestID] = rval
+ if pbc or pbe:
+ log.msg('warning! using deprecated "pbcallback"')
+ rval.addCallbacks(pbc, pbe)
+ else:
+ rval = None
+ self.sendCall(
+ prefix + b"message",
+ requestID,
+ objectID,
+ message,
+ answerRequired,
+ netArgs,
+ netKw,
+ )
+ return rval
+
+ def proto_message(
+ self, requestID, objectID, message, answerRequired, netArgs, netKw
+ ):
+ self._recvMessage(
+ self.localObjectForID,
+ requestID,
+ objectID,
+ message,
+ answerRequired,
+ netArgs,
+ netKw,
+ )
+
+ def proto_cachemessage(
+ self, requestID, objectID, message, answerRequired, netArgs, netKw
+ ):
+ self._recvMessage(
+ self.cachedLocallyAs,
+ requestID,
+ objectID,
+ message,
+ answerRequired,
+ netArgs,
+ netKw,
+ )
+
+ def _recvMessage(
+ self,
+ findObjMethod,
+ requestID,
+ objectID,
+ message,
+ answerRequired,
+ netArgs,
+ netKw,
+ ):
+ """
+ Received a message-send.
+
+ Look up message based on object, unserialize the arguments, and
+ invoke it with args, and send an 'answer' or 'error' response.
+
+ @param findObjMethod: A callable which takes C{objectID} as argument.
+ @param requestID: The requiest ID.
+ @param objectID: The object ID.
+ @param message: The message.
+ @param answerRequired:
+ @param netArgs: Arguments.
+ @param netKw: Keyword arguments.
+ """
+ if not isinstance(message, str):
+ message = message.decode("utf8")
+
+ try:
+ object = findObjMethod(objectID)
+ if object is None:
+ raise Error("Invalid Object ID")
+ netResult = object.remoteMessageReceived(self, message, netArgs, netKw)
+ except Error as e:
+ if answerRequired:
+ # If the error is Jellyable or explicitly allowed via our
+ # security options, send it back and let the code on the
+ # other end deal with unjellying. If it isn't Jellyable,
+ # wrap it in a CopyableFailure, which ensures it can be
+ # unjellied on the other end. We have to do this because
+ # all errors must be sent back.
+ if isinstance(e, Jellyable) or self.security.isClassAllowed(
+ e.__class__
+ ):
+ self._sendError(e, requestID)
+ else:
+ self._sendError(CopyableFailure(e), requestID)
+ except BaseException:
+ if answerRequired:
+ log.msg("Peer will receive following PB traceback:", isError=True)
+ f = CopyableFailure()
+ self._sendError(f, requestID)
+ log.err()
+ else:
+ if answerRequired:
+ if isinstance(netResult, defer.Deferred):
+ args = (requestID,)
+ netResult.addCallbacks(
+ self._sendAnswer,
+ self._sendFailureOrError,
+ callbackArgs=args,
+ errbackArgs=args,
+ )
+ # XXX Should this be done somewhere else?
+ else:
+ self._sendAnswer(netResult, requestID)
+
+ def _sendAnswer(self, netResult, requestID):
+ """
+ (internal) Send an answer to a previously sent message.
+
+ @param netResult: The answer.
+ @param requestID: The request ID.
+ """
+ self.sendCall(b"answer", requestID, netResult)
+
+ def proto_answer(self, requestID, netResult):
+ """
+ (internal) Got an answer to a previously sent message.
+
+ Look up the appropriate callback and call it.
+
+ @param requestID: The request ID.
+ @param netResult: The answer.
+ """
+ d = self.waitingForAnswers[requestID]
+ del self.waitingForAnswers[requestID]
+ d.callback(self.unserialize(netResult))
+
+ def _sendFailureOrError(self, fail, requestID):
+ """
+ Call L{_sendError} or L{_sendFailure}, depending on whether C{fail}
+ represents an L{Error} subclass or not.
+
+ @param fail: The failure.
+ @param requestID: The request ID.
+ """
+ if fail.check(Error) is None:
+ self._sendFailure(fail, requestID)
+ else:
+ self._sendError(fail, requestID)
+
+ def _sendFailure(self, fail, requestID):
+ """
+ Log error and then send it.
+
+ @param fail: The failure.
+ @param requestID: The request ID.
+ """
+ log.msg("Peer will receive following PB traceback:")
+ log.err(fail)
+ self._sendError(fail, requestID)
+
+ def _sendError(self, fail, requestID):
+ """
+ (internal) Send an error for a previously sent message.
+
+ @param fail: The failure.
+ @param requestID: The request ID.
+ """
+ if isinstance(fail, failure.Failure):
+ # If the failures value is jellyable or allowed through security,
+ # send the value
+ if isinstance(fail.value, Jellyable) or self.security.isClassAllowed(
+ fail.value.__class__
+ ):
+ fail = fail.value
+ elif not isinstance(fail, CopyableFailure):
+ fail = failure2Copyable(fail, self.factory.unsafeTracebacks)
+ if isinstance(fail, CopyableFailure):
+ fail.unsafeTracebacks = self.factory.unsafeTracebacks
+ self.sendCall(b"error", requestID, self.serialize(fail))
+
+ def proto_error(self, requestID, fail):
+ """
+ (internal) Deal with an error.
+
+ @param requestID: The request ID.
+ @param fail: The failure.
+ """
+ d = self.waitingForAnswers[requestID]
+ del self.waitingForAnswers[requestID]
+ d.errback(self.unserialize(fail))
+
+ def sendDecRef(self, objectID):
+ """
+ (internal) Send a DECREF directive.
+
+ @param objectID: The object ID.
+ """
+ self.sendCall(b"decref", objectID)
+
+ def proto_decref(self, objectID):
+ """
+ (internal) Decrement the reference count of an object.
+
+ If the reference count is zero, it will free the reference to this
+ object.
+
+ @param objectID: The object ID.
+ """
+ if isinstance(objectID, str):
+ objectID = objectID.encode("utf8")
+ refs = self.localObjects[objectID].decref()
+ if refs == 0:
+ puid = self.localObjects[objectID].object.processUniqueID()
+ del self.luids[puid]
+ del self.localObjects[objectID]
+ self._localCleanup.pop(puid, lambda: None)()
+
+ def decCacheRef(self, objectID):
+ """
+ (internal) Send a DECACHE directive.
+
+ @param objectID: The object ID.
+ """
+ self.sendCall(b"decache", objectID)
+
+ def proto_decache(self, objectID):
+ """
+ (internal) Decrement the reference count of a cached object.
+
+ If the reference count is zero, free the reference, then send an
+ 'uncached' directive.
+
+ @param objectID: The object ID.
+ """
+ refs = self.remotelyCachedObjects[objectID].decref()
+ # log.msg('decaching: %s #refs: %s' % (objectID, refs))
+ if refs == 0:
+ lobj = self.remotelyCachedObjects[objectID]
+ cacheable = lobj.object
+ perspective = lobj.perspective
+ # TODO: force_decache needs to be able to force-invalidate a
+ # cacheable reference.
+ try:
+ cacheable.stoppedObserving(
+ perspective, RemoteCacheObserver(self, cacheable, perspective)
+ )
+ except BaseException:
+ log.deferr()
+ puid = cacheable.processUniqueID()
+ del self.remotelyCachedLUIDs[puid]
+ del self.remotelyCachedObjects[objectID]
+ self.sendCall(b"uncache", objectID)
+
+ def proto_uncache(self, objectID):
+ """
+ (internal) Tell the client it is now OK to uncache an object.
+
+ @param objectID: The object ID.
+ """
+ # log.msg("uncaching locally %d" % objectID)
+ obj = self.locallyCachedObjects[objectID]
+ obj.broker = None
+ ## def reallyDel(obj=obj):
+ ## obj.__really_del__()
+ ## obj.__del__ = reallyDel
+ del self.locallyCachedObjects[objectID]
+
+
+def respond(challenge, password):
+ """
+ Respond to a challenge.
+
+ This is useful for challenge/response authentication.
+
+ @param challenge: A challenge.
+ @param password: A password.
+ @return: The password hashed twice.
+ """
+ m = md5()
+ m.update(password)
+ hashedPassword = m.digest()
+ m = md5()
+ m.update(hashedPassword)
+ m.update(challenge)
+ doubleHashedPassword = m.digest()
+ return doubleHashedPassword
+
+
+def challenge():
+ """
+
+ @return: Some random data.
+ """
+ crap = bytes(random.randint(65, 90) for x in range(random.randrange(15, 25)))
+ crap = md5(crap).digest()
+ return crap
+
+
+class PBClientFactory(protocol.ClientFactory):
+ """
+ Client factory for PB brokers.
+
+ As with all client factories, use with reactor.connectTCP/SSL/etc..
+ getPerspective and getRootObject can be called either before or
+ after the connect.
+ """
+
+ protocol = Broker
+ unsafeTracebacks = False
+
+ def __init__(self, unsafeTracebacks=False, security=globalSecurity):
+ """
+ @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
+ over the wire.
+ @type unsafeTracebacks: C{bool}
+
+ @param security: security options used by the broker, default to
+ C{globalSecurity}.
+ @type security: L{twisted.spread.jelly.SecurityOptions}
+ """
+ self.unsafeTracebacks = unsafeTracebacks
+ self.security = security
+ self._reset()
+
+ def buildProtocol(self, addr):
+ """
+ Build the broker instance, passing the security options to it.
+ """
+ p = self.protocol(isClient=True, security=self.security)
+ p.factory = self
+ return p
+
+ def _reset(self):
+ self.rootObjectRequests = [] # list of deferred
+ self._broker = None
+ self._root = None
+
+ def _failAll(self, reason):
+ deferreds = self.rootObjectRequests
+ self._reset()
+ for d in deferreds:
+ d.errback(reason)
+
+ def clientConnectionFailed(self, connector, reason):
+ self._failAll(reason)
+
+ def clientConnectionLost(self, connector, reason, reconnecting=0):
+ """
+ Reconnecting subclasses should call with reconnecting=1.
+ """
+ if reconnecting:
+ # Any pending requests will go to next connection attempt
+ # so we don't fail them.
+ self._broker = None
+ self._root = None
+ else:
+ self._failAll(reason)
+
+ def clientConnectionMade(self, broker):
+ self._broker = broker
+ self._root = broker.remoteForName("root")
+ ds = self.rootObjectRequests
+ self.rootObjectRequests = []
+ for d in ds:
+ d.callback(self._root)
+
+ def getRootObject(self):
+ """
+ Get root object of remote PB server.
+
+ @return: Deferred of the root object.
+ """
+ if self._broker and not self._broker.disconnected:
+ return defer.succeed(self._root)
+ d = defer.Deferred()
+ self.rootObjectRequests.append(d)
+ return d
+
+ def disconnect(self):
+ """
+ If the factory is connected, close the connection.
+
+ Note that if you set up the factory to reconnect, you will need to
+ implement extra logic to prevent automatic reconnection after this
+ is called.
+ """
+ if self._broker:
+ self._broker.transport.loseConnection()
+
+ def _cbSendUsername(self, root, username, password, client):
+ return root.callRemote("login", username).addCallback(
+ self._cbResponse, password, client
+ )
+
+ def _cbResponse(self, challenges, password, client):
+ challenge, challenger = challenges
+ return challenger.callRemote("respond", respond(challenge, password), client)
+
+ def _cbLoginAnonymous(self, root, client):
+ """
+ Attempt an anonymous login on the given remote root object.
+
+ @type root: L{RemoteReference}
+ @param root: The object on which to attempt the login, most likely
+ returned by a call to L{PBClientFactory.getRootObject}.
+
+ @param client: A jellyable object which will be used as the I{mind}
+ parameter for the login attempt.
+
+ @rtype: L{Deferred}
+ @return: A L{Deferred} which will be called back with a
+ L{RemoteReference} to an avatar when anonymous login succeeds, or
+ which will errback if anonymous login fails.
+ """
+ return root.callRemote("loginAnonymous", client)
+
+ def login(self, credentials, client=None):
+ """
+ Login and get perspective from remote PB server.
+
+ Currently the following credentials are supported::
+
+ L{twisted.cred.credentials.IUsernamePassword}
+ L{twisted.cred.credentials.IAnonymous}
+
+ @rtype: L{Deferred}
+ @return: A L{Deferred} which will be called back with a
+ L{RemoteReference} for the avatar logged in to, or which will
+ errback if login fails.
+ """
+ d = self.getRootObject()
+
+ if IAnonymous.providedBy(credentials):
+ d.addCallback(self._cbLoginAnonymous, client)
+ else:
+ d.addCallback(
+ self._cbSendUsername, credentials.username, credentials.password, client
+ )
+ return d
+
+
+class PBServerFactory(protocol.ServerFactory):
+ """
+ Server factory for perspective broker.
+
+ Login is done using a Portal object, whose realm is expected to return
+ avatars implementing IPerspective. The credential checkers in the portal
+ should accept IUsernameHashedPassword or IUsernameMD5Password.
+
+ Alternatively, any object providing or adaptable to L{IPBRoot} can be
+ used instead of a portal to provide the root object of the PB server.
+ """
+
+ unsafeTracebacks = False
+
+ # object broker factory
+ protocol = Broker
+
+ def __init__(self, root, unsafeTracebacks=False, security=globalSecurity):
+ """
+ @param root: factory providing the root Referenceable used by the broker.
+ @type root: object providing or adaptable to L{IPBRoot}.
+
+ @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
+ over the wire.
+ @type unsafeTracebacks: C{bool}
+
+ @param security: security options used by the broker, default to
+ C{globalSecurity}.
+ @type security: L{twisted.spread.jelly.SecurityOptions}
+ """
+ self.root = IPBRoot(root)
+ self.unsafeTracebacks = unsafeTracebacks
+ self.security = security
+
+ def buildProtocol(self, addr):
+ """
+ Return a Broker attached to the factory (as the service provider).
+ """
+ proto = self.protocol(isClient=False, security=self.security)
+ proto.factory = self
+ proto.setNameForLocal("root", self.root.rootObject(proto))
+ return proto
+
+ def clientConnectionMade(self, protocol):
+ # XXX does this method make any sense?
+ pass
+
+
+class IUsernameMD5Password(ICredentials):
+ """
+ I encapsulate a username and a hashed password.
+
+ This credential is used for username/password over PB. CredentialCheckers
+ which check this kind of credential must store the passwords in plaintext
+ form or as a MD5 digest.
+
+ @type username: C{str} or C{Deferred}
+ @ivar username: The username associated with these credentials.
+ """
+
+ def checkPassword(password):
+ """
+ Validate these credentials against the correct password.
+
+ @type password: C{str}
+ @param password: The correct, plaintext password against which to
+ check.
+
+ @rtype: C{bool} or L{Deferred}
+ @return: C{True} if the credentials represented by this object match the
+ given password, C{False} if they do not, or a L{Deferred} which will
+ be called back with one of these values.
+ """
+
+ def checkMD5Password(password):
+ """
+ Validate these credentials against the correct MD5 digest of the
+ password.
+
+ @type password: C{str}
+ @param password: The correct MD5 digest of a password against which to
+ check.
+
+ @rtype: C{bool} or L{Deferred}
+ @return: C{True} if the credentials represented by this object match the
+ given digest, C{False} if they do not, or a L{Deferred} which will
+ be called back with one of these values.
+ """
+
+
+@implementer(IPBRoot)
+class _PortalRoot:
+ """
+ Root object, used to login to portal.
+ """
+
+ def __init__(self, portal):
+ self.portal = portal
+
+ def rootObject(self, broker):
+ return _PortalWrapper(self.portal, broker)
+
+
+registerAdapter(_PortalRoot, Portal, IPBRoot)
+
+
+class _JellyableAvatarMixin:
+ """
+ Helper class for code which deals with avatars which PB must be capable of
+ sending to a peer.
+ """
+
+ def _cbLogin(self, result):
+ """
+ Ensure that the avatar to be returned to the client is jellyable and
+ set up disconnection notification to call the realm's logout object.
+ """
+ (interface, avatar, logout) = result
+ if not IJellyable.providedBy(avatar):
+ avatar = AsReferenceable(avatar, "perspective")
+
+ puid = avatar.processUniqueID()
+
+ # only call logout once, whether the connection is dropped (disconnect)
+ # or a logout occurs (cleanup), and be careful to drop the reference to
+ # it in either case
+ logout = [logout]
+
+ def maybeLogout():
+ if not logout:
+ return
+ fn = logout[0]
+ del logout[0]
+ fn()
+
+ self.broker._localCleanup[puid] = maybeLogout
+ self.broker.notifyOnDisconnect(maybeLogout)
+
+ return avatar
+
+
+class _PortalWrapper(Referenceable, _JellyableAvatarMixin):
+ """
+ Root Referenceable object, used to login to portal.
+ """
+
+ def __init__(self, portal, broker):
+ self.portal = portal
+ self.broker = broker
+
+ def remote_login(self, username):
+ """
+ Start of username/password login.
+
+ @param username: The username.
+ """
+ c = challenge()
+ return c, _PortalAuthChallenger(self.portal, self.broker, username, c)
+
+ def remote_loginAnonymous(self, mind):
+ """
+ Attempt an anonymous login.
+
+ @param mind: An object to use as the mind parameter to the portal login
+ call (possibly None).
+
+ @rtype: L{Deferred}
+ @return: A Deferred which will be called back with an avatar when login
+ succeeds or which will be errbacked if login fails somehow.
+ """
+ d = self.portal.login(Anonymous(), mind, IPerspective)
+ d.addCallback(self._cbLogin)
+ return d
+
+
+@implementer(IUsernameHashedPassword, IUsernameMD5Password)
+class _PortalAuthChallenger(Referenceable, _JellyableAvatarMixin):
+ """
+ Called with response to password challenge.
+ """
+
+ def __init__(self, portal, broker, username, challenge):
+ self.portal = portal
+ self.broker = broker
+ self.username = username
+ self.challenge = challenge
+
+ def remote_respond(self, response, mind):
+ self.response = response
+ d = self.portal.login(self, mind, IPerspective)
+ d.addCallback(self._cbLogin)
+ return d
+
+ def checkPassword(self, password):
+ """
+ L{IUsernameHashedPassword}
+
+ @param password: The password.
+ @return: L{_PortalAuthChallenger.checkMD5Password}
+ """
+ return self.checkMD5Password(md5(password).digest())
+
+ def checkMD5Password(self, md5Password):
+ """
+ L{IUsernameMD5Password}
+
+ @param md5Password:
+ @rtype: L{bool}
+ @return: L{True} if password matches.
+ """
+ md = md5()
+ md.update(md5Password)
+ md.update(self.challenge)
+ correct = md.digest()
+ return self.response == correct
+
+
+__all__ = [
+ # Everything from flavors is exposed publicly here.
+ "IPBRoot",
+ "Serializable",
+ "Referenceable",
+ "NoSuchMethod",
+ "Root",
+ "ViewPoint",
+ "Viewable",
+ "Copyable",
+ "Jellyable",
+ "Cacheable",
+ "RemoteCopy",
+ "RemoteCache",
+ "RemoteCacheObserver",
+ "copyTags",
+ "setUnjellyableForClass",
+ "setUnjellyableFactoryForClass",
+ "setUnjellyableForClassTree",
+ "setCopierForClass",
+ "setFactoryForClass",
+ "setCopierForClassTree",
+ "MAX_BROKER_REFS",
+ "portno",
+ "ProtocolError",
+ "DeadReferenceError",
+ "Error",
+ "PBConnectionLost",
+ "RemoteMethod",
+ "IPerspective",
+ "Avatar",
+ "AsReferenceable",
+ "RemoteReference",
+ "CopyableFailure",
+ "CopiedFailure",
+ "failure2Copyable",
+ "Broker",
+ "respond",
+ "challenge",
+ "PBClientFactory",
+ "PBServerFactory",
+ "IUsernameMD5Password",
+]
diff --git a/contrib/python/Twisted/py3/twisted/spread/publish.py b/contrib/python/Twisted/py3/twisted/spread/publish.py
new file mode 100644
index 0000000000..de525083e5
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/spread/publish.py
@@ -0,0 +1,144 @@
+# -*- test-case-name: twisted.spread.test.test_pb -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Persistently cached objects for PB.
+
+Maintainer: Glyph Lefkowitz
+
+Future Plans: None known.
+"""
+
+
+import time
+
+from twisted.internet import defer
+from twisted.spread import banana, flavors, jelly
+
+
+class Publishable(flavors.Cacheable):
+ """An object whose cached state persists across sessions."""
+
+ def __init__(self, publishedID):
+ self.republish()
+ self.publishedID = publishedID
+
+ def republish(self):
+ """Set the timestamp to current and (TODO) update all observers."""
+ self.timestamp = time.time()
+
+ def view_getStateToPublish(self, perspective):
+ "(internal)"
+ return self.getStateToPublishFor(perspective)
+
+ def getStateToPublishFor(self, perspective):
+ """Implement me to special-case your state for a perspective."""
+ return self.getStateToPublish()
+
+ def getStateToPublish(self):
+ """Implement me to return state to copy as part of the publish phase."""
+ raise NotImplementedError("%s.getStateToPublishFor" % self.__class__)
+
+ def getStateToCacheAndObserveFor(self, perspective, observer):
+ """Get all necessary metadata to keep a clientside cache."""
+ if perspective:
+ pname = perspective.perspectiveName
+ sname = perspective.getService().serviceName
+ else:
+ pname = "None"
+ sname = "None"
+
+ return {
+ "remote": flavors.ViewPoint(perspective, self),
+ "publishedID": self.publishedID,
+ "perspective": pname,
+ "service": sname,
+ "timestamp": self.timestamp,
+ }
+
+
+class RemotePublished(flavors.RemoteCache):
+ """The local representation of remote Publishable object."""
+
+ isActivated = 0
+ _wasCleanWhenLoaded = 0
+
+ def getFileName(self, ext="pub"):
+ return "{}-{}-{}.{}".format(
+ self.service,
+ self.perspective,
+ str(self.publishedID),
+ ext,
+ )
+
+ def setCopyableState(self, state):
+ self.__dict__.update(state)
+ self._activationListeners = []
+ try:
+ with open(self.getFileName(), "rb") as dataFile:
+ data = dataFile.read()
+ except OSError:
+ recent = 0
+ else:
+ newself = jelly.unjelly(banana.decode(data))
+ recent = newself.timestamp == self.timestamp
+ if recent:
+ self._cbGotUpdate(newself.__dict__)
+ self._wasCleanWhenLoaded = 1
+ else:
+ self.remote.callRemote("getStateToPublish").addCallbacks(self._cbGotUpdate)
+
+ def __getstate__(self):
+ other = self.__dict__.copy()
+ # Remove PB-specific attributes
+ del other["broker"]
+ del other["remote"]
+ del other["luid"]
+ # remove my own runtime-tracking stuff
+ del other["_activationListeners"]
+ del other["isActivated"]
+ return other
+
+ def _cbGotUpdate(self, newState):
+ self.__dict__.update(newState)
+ self.isActivated = 1
+ # send out notifications
+ for listener in self._activationListeners:
+ listener(self)
+ self._activationListeners = []
+ self.activated()
+ with open(self.getFileName(), "wb") as dataFile:
+ dataFile.write(banana.encode(jelly.jelly(self)))
+
+ def activated(self):
+ """Implement this method if you want to be notified when your
+ publishable subclass is activated.
+ """
+
+ def callWhenActivated(self, callback):
+ """Externally register for notification when this publishable has received all relevant data."""
+ if self.isActivated:
+ callback(self)
+ else:
+ self._activationListeners.append(callback)
+
+
+def whenReady(d):
+ """
+ Wrap a deferred returned from a pb method in another deferred that
+ expects a RemotePublished as a result. This will allow you to wait until
+ the result is really available.
+
+ Idiomatic usage would look like::
+
+ publish.whenReady(serverObject.getMeAPublishable()).addCallback(lookAtThePublishable)
+ """
+ d2 = defer.Deferred()
+ d.addCallbacks(_pubReady, d2.errback, callbackArgs=(d2,))
+ return d2
+
+
+def _pubReady(result, d2):
+ "(internal)"
+ result.callWhenActivated(d2.callback)
diff --git a/contrib/python/Twisted/py3/twisted/spread/util.py b/contrib/python/Twisted/py3/twisted/spread/util.py
new file mode 100644
index 0000000000..5e9c81624f
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/spread/util.py
@@ -0,0 +1,217 @@
+# -*- test-case-name: twisted.test.test_pb -*-
+
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+
+"""
+Utility classes for spread.
+"""
+
+from zope.interface import implementer
+
+from twisted.internet import defer, interfaces
+from twisted.protocols import basic
+from twisted.python.failure import Failure
+from twisted.spread import pb
+
+
+class LocalMethod:
+ def __init__(self, local, name):
+ self.local = local
+ self.name = name
+
+ def __call__(self, *args, **kw):
+ return self.local.callRemote(self.name, *args, **kw)
+
+
+class LocalAsRemote:
+ """
+ A class useful for emulating the effects of remote behavior locally.
+ """
+
+ reportAllTracebacks = 1
+
+ def callRemote(self, name, *args, **kw):
+ """
+ Call a specially-designated local method.
+
+ self.callRemote('x') will first try to invoke a method named
+ sync_x and return its result (which should probably be a
+ Deferred). Second, it will look for a method called async_x,
+ which will be called and then have its result (or Failure)
+ automatically wrapped in a Deferred.
+ """
+ if hasattr(self, "sync_" + name):
+ return getattr(self, "sync_" + name)(*args, **kw)
+ try:
+ method = getattr(self, "async_" + name)
+ return defer.succeed(method(*args, **kw))
+ except BaseException:
+ f = Failure()
+ if self.reportAllTracebacks:
+ f.printTraceback()
+ return defer.fail(f)
+
+ def remoteMethod(self, name):
+ return LocalMethod(self, name)
+
+
+class LocalAsyncForwarder:
+ """
+ A class useful for forwarding a locally-defined interface.
+ """
+
+ def __init__(self, forwarded, interfaceClass, failWhenNotImplemented=0):
+ assert interfaceClass.providedBy(forwarded)
+ self.forwarded = forwarded
+ self.interfaceClass = interfaceClass
+ self.failWhenNotImplemented = failWhenNotImplemented
+
+ def _callMethod(self, method, *args, **kw):
+ return getattr(self.forwarded, method)(*args, **kw)
+
+ def callRemote(self, method, *args, **kw):
+ if self.interfaceClass.queryDescriptionFor(method):
+ result = defer.maybeDeferred(self._callMethod, method, *args, **kw)
+ return result
+ elif self.failWhenNotImplemented:
+ return defer.fail(
+ Failure(NotImplementedError, "No Such Method in Interface: %s" % method)
+ )
+ else:
+ return defer.succeed(None)
+
+
+class Pager:
+ """
+ I am an object which pages out information.
+ """
+
+ def __init__(self, collector, callback=None, *args, **kw):
+ """
+ Create a pager with a Reference to a remote collector and
+ an optional callable to invoke upon completion.
+ """
+ if callable(callback):
+ self.callback = callback
+ self.callbackArgs = args
+ self.callbackKeyword = kw
+ else:
+ self.callback = None
+ self._stillPaging = 1
+ self.collector = collector
+ collector.broker.registerPageProducer(self)
+
+ def stillPaging(self):
+ """
+ (internal) Method called by Broker.
+ """
+ if not self._stillPaging:
+ self.collector.callRemote("endedPaging", pbanswer=False)
+ if self.callback is not None:
+ self.callback(*self.callbackArgs, **self.callbackKeyword)
+ return self._stillPaging
+
+ def sendNextPage(self):
+ """
+ (internal) Method called by Broker.
+ """
+ self.collector.callRemote("gotPage", self.nextPage(), pbanswer=False)
+
+ def nextPage(self):
+ """
+ Override this to return an object to be sent to my collector.
+ """
+ raise NotImplementedError()
+
+ def stopPaging(self):
+ """
+ Call this when you're done paging.
+ """
+ self._stillPaging = 0
+
+
+class StringPager(Pager):
+ """
+ A simple pager that splits a string into chunks.
+ """
+
+ def __init__(self, collector, st, chunkSize=8192, callback=None, *args, **kw):
+ self.string = st
+ self.pointer = 0
+ self.chunkSize = chunkSize
+ Pager.__init__(self, collector, callback, *args, **kw)
+
+ def nextPage(self):
+ val = self.string[self.pointer : self.pointer + self.chunkSize]
+ self.pointer += self.chunkSize
+ if self.pointer >= len(self.string):
+ self.stopPaging()
+ return val
+
+
+@implementer(interfaces.IConsumer)
+class FilePager(Pager):
+ """
+ Reads a file in chunks and sends the chunks as they come.
+ """
+
+ def __init__(self, collector, fd, callback=None, *args, **kw):
+ self.chunks = []
+ Pager.__init__(self, collector, callback, *args, **kw)
+ self.startProducing(fd)
+
+ def startProducing(self, fd):
+ self.deferred = basic.FileSender().beginFileTransfer(fd, self)
+ self.deferred.addBoth(lambda x: self.stopPaging())
+
+ def registerProducer(self, producer, streaming):
+ self.producer = producer
+ if not streaming:
+ self.producer.resumeProducing()
+
+ def unregisterProducer(self):
+ self.producer = None
+
+ def write(self, chunk):
+ self.chunks.append(chunk)
+
+ def sendNextPage(self):
+ """
+ Get the first chunk read and send it to collector.
+ """
+ if not self.chunks:
+ return
+ val = self.chunks.pop(0)
+ self.producer.resumeProducing()
+ self.collector.callRemote("gotPage", val, pbanswer=False)
+
+
+# Utility paging stuff.
+class CallbackPageCollector(pb.Referenceable):
+ """
+ I receive pages from the peer. You may instantiate a Pager with a
+ remote reference to me. I will call the callback with a list of pages
+ once they are all received.
+ """
+
+ def __init__(self, callback):
+ self.pages = []
+ self.callback = callback
+
+ def remote_gotPage(self, page):
+ self.pages.append(page)
+
+ def remote_endedPaging(self):
+ self.callback(self.pages)
+
+
+def getAllPages(referenceable, methodName, *args, **kw):
+ """
+ A utility method that will call a remote method which expects a
+ PageCollector as the first argument.
+ """
+ d = defer.Deferred()
+ referenceable.callRemote(methodName, CallbackPageCollector(d.callback), *args, **kw)
+ return d