diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/spread | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/spread')
-rw-r--r-- | contrib/python/Twisted/py3/twisted/spread/__init__.py | 8 | ||||
-rw-r--r-- | contrib/python/Twisted/py3/twisted/spread/banana.py | 403 | ||||
-rw-r--r-- | contrib/python/Twisted/py3/twisted/spread/flavors.py | 651 | ||||
-rw-r--r-- | contrib/python/Twisted/py3/twisted/spread/interfaces.py | 30 | ||||
-rw-r--r-- | contrib/python/Twisted/py3/twisted/spread/jelly.py | 1092 | ||||
-rw-r--r-- | contrib/python/Twisted/py3/twisted/spread/pb.py | 1674 | ||||
-rw-r--r-- | contrib/python/Twisted/py3/twisted/spread/publish.py | 144 | ||||
-rw-r--r-- | contrib/python/Twisted/py3/twisted/spread/util.py | 217 |
8 files changed, 4219 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/spread/__init__.py b/contrib/python/Twisted/py3/twisted/spread/__init__.py new file mode 100644 index 0000000000..ab3881055d --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/spread/__init__.py @@ -0,0 +1,8 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Twisted Spread: Spreadable (Distributed) Computing. + +@author: Glyph Lefkowitz +""" diff --git a/contrib/python/Twisted/py3/twisted/spread/banana.py b/contrib/python/Twisted/py3/twisted/spread/banana.py new file mode 100644 index 0000000000..ee54c2e2a9 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/spread/banana.py @@ -0,0 +1,403 @@ +# -*- test-case-name: twisted.spread.test.test_banana -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Banana -- s-exp based protocol. + +Future Plans: This module is almost entirely stable. The same caveat applies +to it as applies to L{twisted.spread.jelly}, however. Read its future plans +for more details. + +@author: Glyph Lefkowitz +""" + + +import copy +import struct +from io import BytesIO + +from twisted.internet import protocol +from twisted.persisted import styles +from twisted.python import log +from twisted.python.compat import iterbytes +from twisted.python.reflect import fullyQualifiedName + + +class BananaError(Exception): + pass + + +def int2b128(integer, stream): + if integer == 0: + stream(b"\0") + return + assert integer > 0, "can only encode positive integers" + while integer: + stream(bytes((integer & 0x7F,))) + integer = integer >> 7 + + +def b1282int(st): + """ + Convert an integer represented as a base 128 string into an L{int}. + + @param st: The integer encoded in a byte string. + @type st: L{bytes} + + @return: The integer value extracted from the byte string. + @rtype: L{int} + """ + e = 1 + i = 0 + for char in iterbytes(st): + n = ord(char) + i += n * e + e <<= 7 + return i + + +# delimiter characters. +LIST = b"\x80" +INT = b"\x81" +STRING = b"\x82" +NEG = b"\x83" +FLOAT = b"\x84" +# "optional" -- these might be refused by a low-level implementation. +LONGINT = b"\x85" +LONGNEG = b"\x86" +# really optional; this is part of the 'pb' vocabulary +VOCAB = b"\x87" + +HIGH_BIT_SET = b"\x80" + + +def setPrefixLimit(limit): + """ + Set the limit on the prefix length for all Banana connections + established after this call. + + The prefix length limit determines how many bytes of prefix a banana + decoder will allow before rejecting a potential object as too large. + + @type limit: L{int} + @param limit: The number of bytes of prefix for banana to allow when + decoding. + """ + global _PREFIX_LIMIT + _PREFIX_LIMIT = limit + + +_PREFIX_LIMIT = None +setPrefixLimit(64) + +SIZE_LIMIT = 640 * 1024 # 640k is all you'll ever need :-) + + +class Banana(protocol.Protocol, styles.Ephemeral): + """ + L{Banana} implements the I{Banana} s-expression protocol, client and + server. + + @ivar knownDialects: These are the profiles supported by this Banana + implementation. + @type knownDialects: L{list} of L{bytes} + """ + + # The specification calls these profiles but this implementation calls them + # dialects instead. + knownDialects = [b"pb", b"none"] + + prefixLimit = None + sizeLimit = SIZE_LIMIT + + def setPrefixLimit(self, limit): + """ + Set the prefix limit for decoding done by this protocol instance. + + @see: L{setPrefixLimit} + """ + self.prefixLimit = limit + self._smallestLongInt = -(2 ** (limit * 7)) + 1 + self._smallestInt = -(2**31) + self._largestInt = 2**31 - 1 + self._largestLongInt = 2 ** (limit * 7) - 1 + + def connectionReady(self): + """Surrogate for connectionMade + Called after protocol negotiation. + """ + + def _selectDialect(self, dialect): + self.currentDialect = dialect + self.connectionReady() + + def callExpressionReceived(self, obj): + if self.currentDialect: + self.expressionReceived(obj) + else: + # this is the first message we've received + if self.isClient: + # if I'm a client I have to respond + for serverVer in obj: + if serverVer in self.knownDialects: + self.sendEncoded(serverVer) + self._selectDialect(serverVer) + break + else: + # I can't speak any of those dialects. + log.msg( + "The client doesn't speak any of the protocols " + "offered by the server: disconnecting." + ) + self.transport.loseConnection() + else: + if obj in self.knownDialects: + self._selectDialect(obj) + else: + # the client just selected a protocol that I did not suggest. + log.msg( + "The client selected a protocol the server didn't " + "suggest and doesn't know: disconnecting." + ) + self.transport.loseConnection() + + def connectionMade(self): + self.setPrefixLimit(_PREFIX_LIMIT) + self.currentDialect = None + if not self.isClient: + self.sendEncoded(self.knownDialects) + + def gotItem(self, item): + l = self.listStack + if l: + l[-1][1].append(item) + else: + self.callExpressionReceived(item) + + buffer = b"" + + def dataReceived(self, chunk): + buffer = self.buffer + chunk + listStack = self.listStack + gotItem = self.gotItem + while buffer: + assert self.buffer != buffer, "This ain't right: {} {}".format( + repr(self.buffer), + repr(buffer), + ) + self.buffer = buffer + pos = 0 + for ch in iterbytes(buffer): + if ch >= HIGH_BIT_SET: + break + pos = pos + 1 + else: + if pos > self.prefixLimit: + raise BananaError( + "Security precaution: more than %d bytes of prefix" + % (self.prefixLimit,) + ) + return + num = buffer[:pos] + typebyte = buffer[pos : pos + 1] + rest = buffer[pos + 1 :] + if len(num) > self.prefixLimit: + raise BananaError( + "Security precaution: longer than %d bytes worth of prefix" + % (self.prefixLimit,) + ) + if typebyte == LIST: + num = b1282int(num) + if num > SIZE_LIMIT: + raise BananaError("Security precaution: List too long.") + listStack.append((num, [])) + buffer = rest + elif typebyte == STRING: + num = b1282int(num) + if num > SIZE_LIMIT: + raise BananaError("Security precaution: String too long.") + if len(rest) >= num: + buffer = rest[num:] + gotItem(rest[:num]) + else: + return + elif typebyte == INT: + buffer = rest + num = b1282int(num) + gotItem(num) + elif typebyte == LONGINT: + buffer = rest + num = b1282int(num) + gotItem(num) + elif typebyte == LONGNEG: + buffer = rest + num = b1282int(num) + gotItem(-num) + elif typebyte == NEG: + buffer = rest + num = -b1282int(num) + gotItem(num) + elif typebyte == VOCAB: + buffer = rest + num = b1282int(num) + item = self.incomingVocabulary[num] + if self.currentDialect == b"pb": + # the sender issues VOCAB only for dialect pb + gotItem(item) + else: + raise NotImplementedError(f"Invalid item for pb protocol {item!r}") + elif typebyte == FLOAT: + if len(rest) >= 8: + buffer = rest[8:] + gotItem(struct.unpack("!d", rest[:8])[0]) + else: + return + else: + raise NotImplementedError(f"Invalid Type Byte {typebyte!r}") + while listStack and (len(listStack[-1][1]) == listStack[-1][0]): + item = listStack.pop()[1] + gotItem(item) + self.buffer = b"" + + def expressionReceived(self, lst): + """Called when an expression (list, string, or int) is received.""" + raise NotImplementedError() + + outgoingVocabulary = { + # Jelly Data Types + b"None": 1, + b"class": 2, + b"dereference": 3, + b"reference": 4, + b"dictionary": 5, + b"function": 6, + b"instance": 7, + b"list": 8, + b"module": 9, + b"persistent": 10, + b"tuple": 11, + b"unpersistable": 12, + # PB Data Types + b"copy": 13, + b"cache": 14, + b"cached": 15, + b"remote": 16, + b"local": 17, + b"lcache": 18, + # PB Protocol Messages + b"version": 19, + b"login": 20, + b"password": 21, + b"challenge": 22, + b"logged_in": 23, + b"not_logged_in": 24, + b"cachemessage": 25, + b"message": 26, + b"answer": 27, + b"error": 28, + b"decref": 29, + b"decache": 30, + b"uncache": 31, + } + + incomingVocabulary = {} + for k, v in outgoingVocabulary.items(): + incomingVocabulary[v] = k + + def __init__(self, isClient=1): + self.listStack = [] + self.outgoingSymbols = copy.copy(self.outgoingVocabulary) + self.outgoingSymbolCount = 0 + self.isClient = isClient + + def sendEncoded(self, obj): + """ + Send the encoded representation of the given object: + + @param obj: An object to encode and send. + + @raise BananaError: If the given object is not an instance of one of + the types supported by Banana. + + @return: L{None} + """ + encodeStream = BytesIO() + self._encode(obj, encodeStream.write) + value = encodeStream.getvalue() + self.transport.write(value) + + def _encode(self, obj, write): + if isinstance(obj, (list, tuple)): + if len(obj) > SIZE_LIMIT: + raise BananaError("list/tuple is too long to send (%d)" % (len(obj),)) + int2b128(len(obj), write) + write(LIST) + for elem in obj: + self._encode(elem, write) + elif isinstance(obj, int): + if obj < self._smallestLongInt or obj > self._largestLongInt: + raise BananaError("int is too large to send (%d)" % (obj,)) + if obj < self._smallestInt: + int2b128(-obj, write) + write(LONGNEG) + elif obj < 0: + int2b128(-obj, write) + write(NEG) + elif obj <= self._largestInt: + int2b128(obj, write) + write(INT) + else: + int2b128(obj, write) + write(LONGINT) + elif isinstance(obj, float): + write(FLOAT) + write(struct.pack("!d", obj)) + elif isinstance(obj, bytes): + # TODO: an API for extending banana... + if self.currentDialect == b"pb" and obj in self.outgoingSymbols: + symbolID = self.outgoingSymbols[obj] + int2b128(symbolID, write) + write(VOCAB) + else: + if len(obj) > SIZE_LIMIT: + raise BananaError( + "byte string is too long to send (%d)" % (len(obj),) + ) + int2b128(len(obj), write) + write(STRING) + write(obj) + else: + raise BananaError( + "Banana cannot send {} objects: {!r}".format( + fullyQualifiedName(type(obj)), obj + ) + ) + + +# For use from the interactive interpreter +_i = Banana() +_i.connectionMade() +_i._selectDialect(b"none") + + +def encode(lst): + """Encode a list s-expression.""" + encodeStream = BytesIO() + _i.transport = encodeStream + _i.sendEncoded(lst) + return encodeStream.getvalue() + + +def decode(st): + """ + Decode a banana-encoded string. + """ + l = [] + _i.expressionReceived = l.append + try: + _i.dataReceived(st) + finally: + _i.buffer = b"" + del _i.expressionReceived + return l[0] diff --git a/contrib/python/Twisted/py3/twisted/spread/flavors.py b/contrib/python/Twisted/py3/twisted/spread/flavors.py new file mode 100644 index 0000000000..ef98fee272 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/spread/flavors.py @@ -0,0 +1,651 @@ +# -*- test-case-name: twisted.spread.test.test_pb -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +This module represents flavors of remotely accessible objects. + +Currently this is only objects accessible through Perspective Broker, but will +hopefully encompass all forms of remote access which can emulate subsets of PB +(such as XMLRPC or SOAP). + +Future Plans: Optimization. Exploitation of new-style object model. +Optimizations to this module should not affect external-use semantics at all, +but may have a small impact on users who subclass and override methods. + +@author: Glyph Lefkowitz +""" + + +# NOTE: this module should NOT import pb; it is supposed to be a module which +# abstractly defines remotely accessible types. Many of these types expect to +# be serialized by Jelly, but they ought to be accessible through other +# mechanisms (like XMLRPC) + +import sys + +from zope.interface import Interface, implementer + +from twisted.python import log, reflect +from twisted.python.compat import cmp, comparable +from .jelly import ( + Jellyable, + Unjellyable, + _createBlank, + getInstanceState, + setInstanceState, + setUnjellyableFactoryForClass, + setUnjellyableForClass, + setUnjellyableForClassTree, + unjellyableRegistry, +) + +# compatibility +setCopierForClass = setUnjellyableForClass +setCopierForClassTree = setUnjellyableForClassTree +setFactoryForClass = setUnjellyableFactoryForClass +copyTags = unjellyableRegistry + +copy_atom = b"copy" +cache_atom = b"cache" +cached_atom = b"cached" +remote_atom = b"remote" + + +class NoSuchMethod(AttributeError): + """Raised if there is no such remote method""" + + +class IPBRoot(Interface): + """Factory for root Referenceable objects for PB servers.""" + + def rootObject(broker): + """Return root Referenceable for broker.""" + + +class Serializable(Jellyable): + """An object that can be passed remotely. + + I am a style of object which can be serialized by Perspective + Broker. Objects which wish to be referenceable or copied remotely + have to subclass Serializable. However, clients of Perspective + Broker will probably not want to directly subclass Serializable; the + Flavors of transferable objects are listed below. + + What it means to be \"Serializable\" is that an object can be + passed to or returned from a remote method. Certain basic types + (dictionaries, lists, tuples, numbers, strings) are serializable by + default; however, classes need to choose a specific serialization + style: L{Referenceable}, L{Viewable}, L{Copyable} or L{Cacheable}. + + You may also pass C{[lists, dictionaries, tuples]} of L{Serializable} + instances to or return them from remote methods, as many levels deep + as you like. + """ + + def processUniqueID(self): + """Return an ID which uniquely represents this object for this process. + + By default, this uses the 'id' builtin, but can be overridden to + indicate that two values are identity-equivalent (such as proxies + for the same object). + """ + + return id(self) + + +class Referenceable(Serializable): + perspective = None + """I am an object sent remotely as a direct reference. + + When one of my subclasses is sent as an argument to or returned + from a remote method call, I will be serialized by default as a + direct reference. + + This means that the peer will be able to call methods on me; + a method call xxx() from my peer will be resolved to methods + of the name remote_xxx. + """ + + def remoteMessageReceived(self, broker, message, args, kw): + """A remote message has been received. Dispatch it appropriately. + + The default implementation is to dispatch to a method called + 'remote_messagename' and call it with the same arguments. + """ + args = broker.unserialize(args) + kw = broker.unserialize(kw) + # Need this to interoperate with Python 2 clients + # which may try to send use keywords where keys are of type + # bytes. + if [key for key in kw.keys() if isinstance(key, bytes)]: + kw = {k.decode("utf8"): v for k, v in kw.items()} + + if not isinstance(message, str): + message = message.decode("utf8") + + method = getattr(self, "remote_%s" % message, None) + if method is None: + raise NoSuchMethod(f"No such method: remote_{message}") + try: + state = method(*args, **kw) + except TypeError: + log.msg(f"{method} didn't accept {args} and {kw}") + raise + return broker.serialize(state, self.perspective) + + def jellyFor(self, jellier): + """(internal) + + Return a tuple which will be used as the s-expression to + serialize this to a peer. + """ + + return [b"remote", jellier.invoker.registerReference(self)] + + +@implementer(IPBRoot) +class Root(Referenceable): + """I provide a root object to L{pb.Broker}s for a L{pb.PBClientFactory} or + L{pb.PBServerFactory}. + + When a factory produces a L{pb.Broker}, it supplies that + L{pb.Broker} with an object named \"root\". That object is obtained + by calling my rootObject method. + """ + + def rootObject(self, broker): + """A factory is requesting to publish me as a root object. + + When a factory is sending me as the root object, this + method will be invoked to allow per-broker versions of an + object. By default I return myself. + """ + return self + + +class ViewPoint(Referenceable): + """ + I act as an indirect reference to an object accessed through a + L{pb.IPerspective}. + + Simply put, I combine an object with a perspective so that when a + peer calls methods on the object I refer to, the method will be + invoked with that perspective as a first argument, so that it can + know who is calling it. + + While L{Viewable} objects will be converted to ViewPoints by default + when they are returned from or sent as arguments to a remote + method, any object may be manually proxied as well. (XXX: Now that + this class is no longer named C{Proxy}, this is the only occurrence + of the term 'proxied' in this docstring, and may be unclear.) + + This can be useful when dealing with L{pb.IPerspective}s, L{Copyable}s, + and L{Cacheable}s. It is legal to implement a method as such on + a perspective:: + + | def perspective_getViewPointForOther(self, name): + | defr = self.service.getPerspectiveRequest(name) + | defr.addCallbacks(lambda x, self=self: ViewPoint(self, x), log.msg) + | return defr + + This will allow you to have references to Perspective objects in two + different ways. One is through the initial 'attach' call -- each + peer will have a L{pb.RemoteReference} to their perspective directly. The + other is through this method; each peer can get a L{pb.RemoteReference} to + all other perspectives in the service; but that L{pb.RemoteReference} will + be to a L{ViewPoint}, not directly to the object. + + The practical offshoot of this is that you can implement 2 varieties + of remotely callable methods on this Perspective; view_xxx and + C{perspective_xxx}. C{view_xxx} methods will follow the rules for + ViewPoint methods (see ViewPoint.L{remoteMessageReceived}), and + C{perspective_xxx} methods will follow the rules for Perspective + methods. + """ + + def __init__(self, perspective, object): + """Initialize me with a Perspective and an Object.""" + self.perspective = perspective + self.object = object + + def processUniqueID(self): + """Return an ID unique to a proxy for this perspective+object combination.""" + return (id(self.perspective), id(self.object)) + + def remoteMessageReceived(self, broker, message, args, kw): + """A remote message has been received. Dispatch it appropriately. + + The default implementation is to dispatch to a method called + 'C{view_messagename}' to my Object and call it on my object with + the same arguments, modified by inserting my Perspective as + the first argument. + """ + args = broker.unserialize(args, self.perspective) + kw = broker.unserialize(kw, self.perspective) + + if not isinstance(message, str): + message = message.decode("utf8") + + method = getattr(self.object, "view_%s" % message) + try: + state = method(*(self.perspective,) + args, **kw) + except TypeError: + log.msg(f"{method} didn't accept {args} and {kw}") + raise + rv = broker.serialize(state, self.perspective, method, args, kw) + return rv + + +class Viewable(Serializable): + """I will be converted to a L{ViewPoint} when passed to or returned from a remote method. + + The beginning of a peer's interaction with a PB Service is always + through a perspective. However, if a C{perspective_xxx} method returns + a Viewable, it will be serialized to the peer as a response to that + method. + """ + + def jellyFor(self, jellier): + """Serialize a L{ViewPoint} for me and the perspective of the given broker.""" + return ViewPoint(jellier.invoker.serializingPerspective, self).jellyFor(jellier) + + +class Copyable(Serializable): + """Subclass me to get copied each time you are returned from or passed to a remote method. + + When I am returned from or passed to a remote method call, I will be + converted into data via a set of callbacks (see my methods for more + info). That data will then be serialized using Jelly, and sent to + the peer. + + The peer will then look up the type to represent this with; see + L{RemoteCopy} for details. + """ + + def getStateToCopy(self): + """Gather state to send when I am serialized for a peer. + + I will default to returning self.__dict__. Override this to + customize this behavior. + """ + + return self.__dict__ + + def getStateToCopyFor(self, perspective): + """ + Gather state to send when I am serialized for a particular + perspective. + + I will default to calling L{getStateToCopy}. Override this to + customize this behavior. + """ + + return self.getStateToCopy() + + def getTypeToCopy(self): + """Determine what type tag to send for me. + + By default, send the string representation of my class + (package.module.Class); normally this is adequate, but + you may override this to change it. + """ + + return reflect.qual(self.__class__).encode("utf-8") + + def getTypeToCopyFor(self, perspective): + """Determine what type tag to send for me. + + By default, defer to self.L{getTypeToCopy}() normally this is + adequate, but you may override this to change it. + """ + + return self.getTypeToCopy() + + def jellyFor(self, jellier): + """Assemble type tag and state to copy for this broker. + + This will call L{getTypeToCopyFor} and L{getStateToCopy}, and + return an appropriate s-expression to represent me. + """ + + if jellier.invoker is None: + return getInstanceState(self, jellier) + p = jellier.invoker.serializingPerspective + t = self.getTypeToCopyFor(p) + state = self.getStateToCopyFor(p) + sxp = jellier.prepare(self) + sxp.extend([t, jellier.jelly(state)]) + return jellier.preserve(self, sxp) + + +class Cacheable(Copyable): + """A cached instance. + + This means that it's copied; but there is some logic to make sure + that it's only copied once. Additionally, when state is retrieved, + it is passed a "proto-reference" to the state as it will exist on + the client. + + XXX: The documentation for this class needs work, but it's the most + complex part of PB and it is inherently difficult to explain. + """ + + def getStateToCacheAndObserveFor(self, perspective, observer): + """ + Get state to cache on the client and client-cache reference + to observe locally. + + This is similar to getStateToCopyFor, but it additionally + passes in a reference to the client-side RemoteCache instance + that will be created when it is unserialized. This allows + Cacheable instances to keep their RemoteCaches up to date when + they change, such that no changes can occur between the point + at which the state is initially copied and the client receives + it that are not propagated. + """ + + return self.getStateToCopyFor(perspective) + + def jellyFor(self, jellier): + """Return an appropriate tuple to serialize me. + + Depending on whether this broker has cached me or not, this may + return either a full state or a reference to an existing cache. + """ + if jellier.invoker is None: + return getInstanceState(self, jellier) + luid = jellier.invoker.cachedRemotelyAs(self, 1) + if luid is None: + luid = jellier.invoker.cacheRemotely(self) + p = jellier.invoker.serializingPerspective + type_ = self.getTypeToCopyFor(p) + observer = RemoteCacheObserver(jellier.invoker, self, p) + state = self.getStateToCacheAndObserveFor(p, observer) + l = jellier.prepare(self) + jstate = jellier.jelly(state) + l.extend([type_, luid, jstate]) + return jellier.preserve(self, l) + else: + return cached_atom, luid + + def stoppedObserving(self, perspective, observer): + """This method is called when a client has stopped observing me. + + The 'observer' argument is the same as that passed in to + getStateToCacheAndObserveFor. + """ + + +class RemoteCopy(Unjellyable): + """I am a remote copy of a Copyable object. + + When the state from a L{Copyable} object is received, an instance will + be created based on the copy tags table (see setUnjellyableForClass) and + sent the L{setCopyableState} message. I provide a reasonable default + implementation of that message; subclass me if you wish to serve as + a copier for remote data. + + NOTE: copiers are invoked with no arguments. Do not implement a + constructor which requires args in a subclass of L{RemoteCopy}! + """ + + def setCopyableState(self, state): + """I will be invoked with the state to copy locally. + + 'state' is the data returned from the remote object's + 'getStateToCopyFor' method, which will often be the remote + object's dictionary (or a filtered approximation of it depending + on my peer's perspective). + """ + if not state: + state = {} + state = { + x.decode("utf8") if isinstance(x, bytes) else x: y for x, y in state.items() + } + self.__dict__ = state + + def unjellyFor(self, unjellier, jellyList): + if unjellier.invoker is None: + return setInstanceState(self, unjellier, jellyList) + self.setCopyableState(unjellier.unjelly(jellyList[1])) + return self + + +class RemoteCache(RemoteCopy, Serializable): + """A cache is a local representation of a remote L{Cacheable} object. + + This represents the last known state of this object. It may + also have methods invoked on it -- in order to update caches, + the cached class generates a L{pb.RemoteReference} to this object as + it is originally sent. + + Much like copy, I will be invoked with no arguments. Do not + implement a constructor that requires arguments in one of my + subclasses. + """ + + def remoteMessageReceived(self, broker, message, args, kw): + """A remote message has been received. Dispatch it appropriately. + + The default implementation is to dispatch to a method called + 'C{observe_messagename}' and call it on my with the same arguments. + """ + if not isinstance(message, str): + message = message.decode("utf8") + + args = broker.unserialize(args) + kw = broker.unserialize(kw) + method = getattr(self, "observe_%s" % message) + try: + state = method(*args, **kw) + except TypeError: + log.msg(f"{method} didn't accept {args} and {kw}") + raise + return broker.serialize(state, None, method, args, kw) + + def jellyFor(self, jellier): + """serialize me (only for the broker I'm for) as the original cached reference""" + if jellier.invoker is None: + return getInstanceState(self, jellier) + assert ( + jellier.invoker is self.broker + ), "You cannot exchange cached proxies between brokers." + return b"lcache", self.luid + + def unjellyFor(self, unjellier, jellyList): + if unjellier.invoker is None: + return setInstanceState(self, unjellier, jellyList) + self.broker = unjellier.invoker + self.luid = jellyList[1] + borgCopy = self._borgify() + # XXX questionable whether this was a good design idea... + init = getattr(borgCopy, "__init__", None) + if init: + init() + unjellier.invoker.cacheLocally(jellyList[1], self) + borgCopy.setCopyableState(unjellier.unjelly(jellyList[2])) + # Might have changed due to setCopyableState method; we'll assume that + # it's bad form to do so afterwards. + self.__dict__ = borgCopy.__dict__ + # chomp, chomp -- some existing code uses "self.__dict__ =", some uses + # "__dict__.update". This is here in order to handle both cases. + self.broker = unjellier.invoker + self.luid = jellyList[1] + return borgCopy + + ## def __really_del__(self): + ## """Final finalization call, made after all remote references have been lost. + ## """ + + def __cmp__(self, other): + """Compare me [to another RemoteCache.""" + if isinstance(other, self.__class__): + return cmp(id(self.__dict__), id(other.__dict__)) + else: + return cmp(id(self.__dict__), other) + + def __hash__(self): + """Hash me.""" + return int(id(self.__dict__) % sys.maxsize) + + broker = None + luid = None + + def __del__(self): + """Do distributed reference counting on finalize.""" + try: + # log.msg( ' --- decache: %s %s' % (self, self.luid) ) + if self.broker: + self.broker.decCacheRef(self.luid) + except BaseException: + log.deferr() + + def _borgify(self): + """ + Create a new object that shares its state (i.e. its C{__dict__}) and + type with this object, but does not share its identity. + + This is an instance of U{the Borg design pattern + <https://code.activestate.com/recipes/66531/>} originally described by + Alex Martelli, but unlike the example given there, this is not a + replacement for a Singleton. Instead, it is for lifecycle tracking + (and distributed garbage collection). The purpose of these separate + objects is to have a separate object tracking each application-level + reference to the root L{RemoteCache} object being tracked by the + broker, and to have their C{__del__} methods be invoked. + + This may be achievable via a weak value dictionary to track the root + L{RemoteCache} instances instead, but this implementation strategy + predates the availability of weak references in Python. + + @return: The new instance. + @rtype: C{self.__class__} + """ + blank = _createBlank(self.__class__) + blank.__dict__ = self.__dict__ + return blank + + +def unjellyCached(unjellier, unjellyList): + luid = unjellyList[1] + return unjellier.invoker.cachedLocallyAs(luid)._borgify() + + +setUnjellyableForClass("cached", unjellyCached) + + +def unjellyLCache(unjellier, unjellyList): + luid = unjellyList[1] + obj = unjellier.invoker.remotelyCachedForLUID(luid) + return obj + + +setUnjellyableForClass("lcache", unjellyLCache) + + +def unjellyLocal(unjellier, unjellyList): + obj = unjellier.invoker.localObjectForID(unjellyList[1]) + return obj + + +setUnjellyableForClass("local", unjellyLocal) + + +@comparable +class RemoteCacheMethod: + """A method on a reference to a L{RemoteCache}.""" + + def __init__(self, name, broker, cached, perspective): + """(internal) initialize.""" + self.name = name + self.broker = broker + self.perspective = perspective + self.cached = cached + + def __cmp__(self, other): + return cmp((self.name, self.broker, self.perspective, self.cached), other) + + def __hash__(self): + return hash((self.name, self.broker, self.perspective, self.cached)) + + def __call__(self, *args, **kw): + """(internal) action method.""" + cacheID = self.broker.cachedRemotelyAs(self.cached) + if cacheID is None: + from pb import ProtocolError # type: ignore[import] + + raise ProtocolError( + "You can't call a cached method when the object hasn't been given to the peer yet." + ) + return self.broker._sendMessage( + b"cache", self.perspective, cacheID, self.name, args, kw + ) + + +@comparable +class RemoteCacheObserver: + """I am a reverse-reference to the peer's L{RemoteCache}. + + I am generated automatically when a cache is serialized. I + represent a reference to the client's L{RemoteCache} object that + will represent a particular L{Cacheable}; I am the additional + object passed to getStateToCacheAndObserveFor. + """ + + def __init__(self, broker, cached, perspective): + """(internal) Initialize me. + + @param broker: a L{pb.Broker} instance. + + @param cached: a L{Cacheable} instance that this L{RemoteCacheObserver} + corresponds to. + + @param perspective: a reference to the perspective who is observing this. + """ + + self.broker = broker + self.cached = cached + self.perspective = perspective + + def __repr__(self) -> str: + return "<RemoteCacheObserver({}, {}, {}) at {}>".format( + self.broker, + self.cached, + self.perspective, + id(self), + ) + + def __hash__(self): + """Generate a hash unique to all L{RemoteCacheObserver}s for this broker/perspective/cached triplet""" + + return ( + (hash(self.broker) % 2**10) + + (hash(self.perspective) % 2**10) + + (hash(self.cached) % 2**10) + ) + + def __cmp__(self, other): + """Compare me to another L{RemoteCacheObserver}.""" + + return cmp((self.broker, self.perspective, self.cached), other) + + def callRemote(self, _name, *args, **kw): + """(internal) action method.""" + cacheID = self.broker.cachedRemotelyAs(self.cached) + if isinstance(_name, str): + _name = _name.encode("utf-8") + if cacheID is None: + from pb import ProtocolError + + raise ProtocolError( + "You can't call a cached method when the " + "object hasn't been given to the peer yet." + ) + return self.broker._sendMessage( + b"cache", self.perspective, cacheID, _name, args, kw + ) + + def remoteMethod(self, key): + """Get a L{pb.RemoteMethod} for this key.""" + return RemoteCacheMethod(key, self.broker, self.cached, self.perspective) diff --git a/contrib/python/Twisted/py3/twisted/spread/interfaces.py b/contrib/python/Twisted/py3/twisted/spread/interfaces.py new file mode 100644 index 0000000000..4b8b28935b --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/spread/interfaces.py @@ -0,0 +1,30 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Twisted Spread Interfaces. +""" + +from zope.interface import Interface + + +class IJellyable(Interface): + def jellyFor(jellier): + """ + Jelly myself for jellier. + """ + + +class IUnjellyable(Interface): + def unjellyFor(jellier, jellyList): + """ + Unjelly myself for the jellier. + + @param jellier: A stateful object which exists for the lifetime of a + single call to L{unjelly}. + + @param jellyList: The C{list} which represents the jellied state of the + object to be unjellied. + + @return: The object which results from unjellying. + """ diff --git a/contrib/python/Twisted/py3/twisted/spread/jelly.py b/contrib/python/Twisted/py3/twisted/spread/jelly.py new file mode 100644 index 0000000000..46cda17844 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/spread/jelly.py @@ -0,0 +1,1092 @@ +# -*- test-case-name: twisted.spread.test.test_jelly -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +S-expression-based persistence of python objects. + +It does something very much like L{Pickle<pickle>}; however, pickle's main goal +seems to be efficiency (both in space and time); jelly's main goals are +security, human readability, and portability to other environments. + +This is how Jelly converts various objects to s-expressions. + +Boolean:: + True --> ['boolean', 'true'] + +Integer:: + 1 --> 1 + +List:: + [1, 2] --> ['list', 1, 2] + +String:: + \"hello\" --> \"hello\" + +Float:: + 2.3 --> 2.3 + +Dictionary:: + {'a': 1, 'b': 'c'} --> ['dictionary', ['b', 'c'], ['a', 1]] + +Module:: + UserString --> ['module', 'UserString'] + +Class:: + UserString.UserString --> ['class', ['module', 'UserString'], 'UserString'] + +Function:: + string.join --> ['function', 'join', ['module', 'string']] + +Instance: s is an instance of UserString.UserString, with a __dict__ +{'data': 'hello'}:: + [\"UserString.UserString\", ['dictionary', ['data', 'hello']]] + +Class Method: UserString.UserString.center:: + ['method', 'center', ['None'], ['class', ['module', 'UserString'], + 'UserString']] + +Instance Method: s.center, where s is an instance of UserString.UserString:: + ['method', 'center', ['instance', ['reference', 1, ['class', + ['module', 'UserString'], 'UserString']], ['dictionary', ['data', 'd']]], + ['dereference', 1]] + +The Python 2.x C{sets.Set} and C{sets.ImmutableSet} classes are +serialized to the same thing as the builtin C{set} and C{frozenset} +classes. (This is only relevant if you are communicating with a +version of jelly running on an older version of Python.) + +@author: Glyph Lefkowitz + +""" + +import copy +import datetime +import decimal + +# System Imports +import types +import warnings +from functools import reduce + +from zope.interface import implementer + +from incremental import Version + +from twisted.persisted.crefutil import ( + NotKnown, + _Container, + _Dereference, + _DictKeyAndValue, + _InstanceMethod, + _Tuple, +) + +# Twisted Imports +from twisted.python.compat import nativeString +from twisted.python.deprecate import deprecatedModuleAttribute +from twisted.python.reflect import namedAny, namedObject, qual +from twisted.spread.interfaces import IJellyable, IUnjellyable + +DictTypes = (dict,) + +None_atom = b"None" # N +# code +class_atom = b"class" # c +module_atom = b"module" # m +function_atom = b"function" # f + +# references +dereference_atom = b"dereference" # D +persistent_atom = b"persistent" # p +reference_atom = b"reference" # r + +# mutable collections +dictionary_atom = b"dictionary" # d +list_atom = b"list" # l +set_atom = b"set" + +# immutable collections +# (assignment to __dict__ and __class__ still might go away!) +tuple_atom = b"tuple" # t +instance_atom = b"instance" # i +frozenset_atom = b"frozenset" + + +deprecatedModuleAttribute( + Version("Twisted", 15, 0, 0), + "instance_atom is unused within Twisted.", + "twisted.spread.jelly", + "instance_atom", +) + +# errors +unpersistable_atom = b"unpersistable" # u +unjellyableRegistry = {} +unjellyableFactoryRegistry = {} + + +def _createBlank(cls): + """ + Given an object, if that object is a type, return a new, blank instance + of that type which has not had C{__init__} called on it. If the object + is not a type, return L{None}. + + @param cls: The type (or class) to create an instance of. + @type cls: L{type} or something else that cannot be + instantiated. + + @return: a new blank instance or L{None} if C{cls} is not a class or type. + """ + if isinstance(cls, type): + return cls.__new__(cls) + + +def _newInstance(cls, state): + """ + Make a new instance of a class without calling its __init__ method. + + @param state: A C{dict} used to update C{inst.__dict__} either directly or + via C{__setstate__}, if available. + + @return: A new instance of C{cls}. + """ + instance = _createBlank(cls) + + def defaultSetter(state): + if isinstance(state, dict): + instance.__dict__ = state or {} + + setter = getattr(instance, "__setstate__", defaultSetter) + setter(state) + return instance + + +def _maybeClass(classnamep): + isObject = isinstance(classnamep, type) + + if isObject: + classnamep = qual(classnamep) + + if not isinstance(classnamep, bytes): + classnamep = classnamep.encode("utf-8") + + return classnamep + + +def setUnjellyableForClass(classname, unjellyable): + """ + Set which local class will represent a remote type. + + If you have written a Copyable class that you expect your client to be + receiving, write a local "copy" class to represent it, then call:: + + jellier.setUnjellyableForClass('module.package.Class', MyCopier). + + Call this at the module level immediately after its class + definition. MyCopier should be a subclass of RemoteCopy. + + The classname may be a special tag returned by + 'Copyable.getTypeToCopyFor' rather than an actual classname. + + This call is also for cached classes, since there will be no + overlap. The rules are the same. + """ + + global unjellyableRegistry + classname = _maybeClass(classname) + unjellyableRegistry[classname] = unjellyable + globalSecurity.allowTypes(classname) + + +def setUnjellyableFactoryForClass(classname, copyFactory): + """ + Set the factory to construct a remote instance of a type:: + + jellier.setUnjellyableFactoryForClass('module.package.Class', MyFactory) + + Call this at the module level immediately after its class definition. + C{copyFactory} should return an instance or subclass of + L{RemoteCopy<pb.RemoteCopy>}. + + Similar to L{setUnjellyableForClass} except it uses a factory instead + of creating an instance. + """ + + global unjellyableFactoryRegistry + classname = _maybeClass(classname) + unjellyableFactoryRegistry[classname] = copyFactory + globalSecurity.allowTypes(classname) + + +def setUnjellyableForClassTree(module, baseClass, prefix=None): + """ + Set all classes in a module derived from C{baseClass} as copiers for + a corresponding remote class. + + When you have a hierarchy of Copyable (or Cacheable) classes on one + side, and a mirror structure of Copied (or RemoteCache) classes on the + other, use this to setUnjellyableForClass all your Copieds for the + Copyables. + + Each copyTag (the \"classname\" argument to getTypeToCopyFor, and + what the Copyable's getTypeToCopyFor returns) is formed from + adding a prefix to the Copied's class name. The prefix defaults + to module.__name__. If you wish the copy tag to consist of solely + the classname, pass the empty string \'\'. + + @param module: a module object from which to pull the Copied classes. + (passing sys.modules[__name__] might be useful) + + @param baseClass: the base class from which all your Copied classes derive. + + @param prefix: the string prefixed to classnames to form the + unjellyableRegistry. + """ + if prefix is None: + prefix = module.__name__ + + if prefix: + prefix = "%s." % prefix + + for name in dir(module): + loaded = getattr(module, name) + try: + yes = issubclass(loaded, baseClass) + except TypeError: + "It's not a class." + else: + if yes: + setUnjellyableForClass(f"{prefix}{name}", loaded) + + +def getInstanceState(inst, jellier): + """ + Utility method to default to 'normal' state rules in serialization. + """ + if hasattr(inst, "__getstate__"): + state = inst.__getstate__() + else: + state = inst.__dict__ + sxp = jellier.prepare(inst) + sxp.extend([qual(inst.__class__).encode("utf-8"), jellier.jelly(state)]) + return jellier.preserve(inst, sxp) + + +def setInstanceState(inst, unjellier, jellyList): + """ + Utility method to default to 'normal' state rules in unserialization. + """ + state = unjellier.unjelly(jellyList[1]) + if hasattr(inst, "__setstate__"): + inst.__setstate__(state) + else: + inst.__dict__ = state + return inst + + +class Unpersistable: + """ + This is an instance of a class that comes back when something couldn't be + unpersisted. + """ + + def __init__(self, reason): + """ + Initialize an unpersistable object with a descriptive C{reason} string. + """ + self.reason = reason + + def __repr__(self) -> str: + return "Unpersistable(%s)" % repr(self.reason) + + +@implementer(IJellyable) +class Jellyable: + """ + Inherit from me to Jelly yourself directly with the `getStateFor' + convenience method. + """ + + def getStateFor(self, jellier): + return self.__dict__ + + def jellyFor(self, jellier): + """ + @see: L{twisted.spread.interfaces.IJellyable.jellyFor} + """ + sxp = jellier.prepare(self) + sxp.extend( + [ + qual(self.__class__).encode("utf-8"), + jellier.jelly(self.getStateFor(jellier)), + ] + ) + return jellier.preserve(self, sxp) + + +@implementer(IUnjellyable) +class Unjellyable: + """ + Inherit from me to Unjelly yourself directly with the + C{setStateFor} convenience method. + """ + + def setStateFor(self, unjellier, state): + self.__dict__ = state + + def unjellyFor(self, unjellier, jellyList): + """ + Perform the inverse operation of L{Jellyable.jellyFor}. + + @see: L{twisted.spread.interfaces.IUnjellyable.unjellyFor} + """ + state = unjellier.unjelly(jellyList[1]) + self.setStateFor(unjellier, state) + return self + + +class _Jellier: + """ + (Internal) This class manages state for a call to jelly() + """ + + def __init__(self, taster, persistentStore, invoker): + """ + Initialize. + """ + self.taster = taster + # `preserved' is a dict of previously seen instances. + self.preserved = {} + # `cooked' is a dict of previously backreferenced instances to their + # `ref' lists. + self.cooked = {} + self.cooker = {} + self._ref_id = 1 + self.persistentStore = persistentStore + self.invoker = invoker + + def _cook(self, object): + """ + (internal) Backreference an object. + + Notes on this method for the hapless future maintainer: If I've already + gone through the prepare/preserve cycle on the specified object (it is + being referenced after the serializer is \"done with\" it, e.g. this + reference is NOT circular), the copy-in-place of aList is relevant, + since the list being modified is the actual, pre-existing jelly + expression that was returned for that object. If not, it's technically + superfluous, since the value in self.preserved didn't need to be set, + but the invariant that self.preserved[id(object)] is a list is + convenient because that means we don't have to test and create it or + not create it here, creating fewer code-paths. that's why + self.preserved is always set to a list. + + Sorry that this code is so hard to follow, but Python objects are + tricky to persist correctly. -glyph + """ + aList = self.preserved[id(object)] + newList = copy.copy(aList) + # make a new reference ID + refid = self._ref_id + self._ref_id = self._ref_id + 1 + # replace the old list in-place, so that we don't have to track the + # previous reference to it. + aList[:] = [reference_atom, refid, newList] + self.cooked[id(object)] = [dereference_atom, refid] + return aList + + def prepare(self, object): + """ + (internal) Create a list for persisting an object to. This will allow + backreferences to be made internal to the object. (circular + references). + + The reason this needs to happen is that we don't generate an ID for + every object, so we won't necessarily know which ID the object will + have in the future. When it is 'cooked' ( see _cook ), it will be + assigned an ID, and the temporary placeholder list created here will be + modified in-place to create an expression that gives this object an ID: + [reference id# [object-jelly]]. + """ + + # create a placeholder list to be preserved + self.preserved[id(object)] = [] + # keep a reference to this object around, so it doesn't disappear! + # (This isn't always necessary, but for cases where the objects are + # dynamically generated by __getstate__ or getStateToCopyFor calls, it + # is; id() will return the same value for a different object if it gets + # garbage collected. This may be optimized later.) + self.cooker[id(object)] = object + return [] + + def preserve(self, object, sexp): + """ + (internal) Mark an object's persistent list for later referral. + """ + # if I've been cooked in the meanwhile, + if id(object) in self.cooked: + # replace the placeholder empty list with the real one + self.preserved[id(object)][2] = sexp + # but give this one back. + sexp = self.preserved[id(object)] + else: + self.preserved[id(object)] = sexp + return sexp + + def _checkMutable(self, obj): + objId = id(obj) + if objId in self.cooked: + return self.cooked[objId] + if objId in self.preserved: + self._cook(obj) + return self.cooked[objId] + + def jelly(self, obj): + if isinstance(obj, Jellyable): + preRef = self._checkMutable(obj) + if preRef: + return preRef + return obj.jellyFor(self) + objType = type(obj) + if self.taster.isTypeAllowed(qual(objType).encode("utf-8")): + # "Immutable" Types + if objType in (bytes, int, float): + return obj + elif isinstance(obj, types.MethodType): + aSelf = obj.__self__ + aFunc = obj.__func__ + aClass = aSelf.__class__ + return [ + b"method", + aFunc.__name__, + self.jelly(aSelf), + self.jelly(aClass), + ] + elif objType is str: + return [b"unicode", obj.encode("UTF-8")] + elif isinstance(obj, type(None)): + return [b"None"] + elif isinstance(obj, types.FunctionType): + return [b"function", obj.__module__ + "." + obj.__qualname__] + elif isinstance(obj, types.ModuleType): + return [b"module", obj.__name__] + elif objType is bool: + return [b"boolean", obj and b"true" or b"false"] + elif objType is datetime.datetime: + if obj.tzinfo: + raise NotImplementedError( + "Currently can't jelly datetime objects with tzinfo" + ) + return [ + b"datetime", + " ".join( + [ + str(x) + for x in ( + obj.year, + obj.month, + obj.day, + obj.hour, + obj.minute, + obj.second, + obj.microsecond, + ) + ] + ).encode("utf-8"), + ] + elif objType is datetime.time: + if obj.tzinfo: + raise NotImplementedError( + "Currently can't jelly datetime objects with tzinfo" + ) + return [ + b"time", + " ".join( + [ + str(x) + for x in (obj.hour, obj.minute, obj.second, obj.microsecond) + ] + ).encode("utf-8"), + ] + elif objType is datetime.date: + return [ + b"date", + " ".join([str(x) for x in (obj.year, obj.month, obj.day)]).encode( + "utf-8" + ), + ] + elif objType is datetime.timedelta: + return [ + b"timedelta", + " ".join( + [str(x) for x in (obj.days, obj.seconds, obj.microseconds)] + ).encode("utf-8"), + ] + elif issubclass(objType, type): + return [b"class", qual(obj).encode("utf-8")] + elif objType is decimal.Decimal: + return self.jelly_decimal(obj) + else: + preRef = self._checkMutable(obj) + if preRef: + return preRef + # "Mutable" Types + sxp = self.prepare(obj) + if objType is list: + sxp.extend(self._jellyIterable(list_atom, obj)) + elif objType is tuple: + sxp.extend(self._jellyIterable(tuple_atom, obj)) + elif objType in DictTypes: + sxp.append(dictionary_atom) + for key, val in obj.items(): + sxp.append([self.jelly(key), self.jelly(val)]) + elif objType is set: + sxp.extend(self._jellyIterable(set_atom, obj)) + elif objType is frozenset: + sxp.extend(self._jellyIterable(frozenset_atom, obj)) + else: + className = qual(obj.__class__).encode("utf-8") + persistent = None + if self.persistentStore: + persistent = self.persistentStore(obj, self) + if persistent is not None: + sxp.append(persistent_atom) + sxp.append(persistent) + elif self.taster.isClassAllowed(obj.__class__): + sxp.append(className) + if hasattr(obj, "__getstate__"): + state = obj.__getstate__() + else: + state = obj.__dict__ + sxp.append(self.jelly(state)) + else: + self.unpersistable( + "instance of class %s deemed insecure" + % qual(obj.__class__), + sxp, + ) + return self.preserve(obj, sxp) + else: + raise InsecureJelly(f"Type not allowed for object: {objType} {obj}") + + def _jellyIterable(self, atom, obj): + """ + Jelly an iterable object. + + @param atom: the identifier atom of the object. + @type atom: C{str} + + @param obj: any iterable object. + @type obj: C{iterable} + + @return: a generator of jellied data. + @rtype: C{generator} + """ + yield atom + for item in obj: + yield self.jelly(item) + + def jelly_decimal(self, d): + """ + Jelly a decimal object. + + @param d: a decimal object to serialize. + @type d: C{decimal.Decimal} + + @return: jelly for the decimal object. + @rtype: C{list} + """ + sign, guts, exponent = d.as_tuple() + value = reduce(lambda left, right: left * 10 + right, guts) + if sign: + value = -value + return [b"decimal", value, exponent] + + def unpersistable(self, reason, sxp=None): + """ + (internal) Returns an sexp: (unpersistable "reason"). Utility method + for making note that a particular object could not be serialized. + """ + if sxp is None: + sxp = [] + sxp.append(unpersistable_atom) + if isinstance(reason, str): + reason = reason.encode("utf-8") + sxp.append(reason) + return sxp + + +class _Unjellier: + def __init__(self, taster, persistentLoad, invoker): + self.taster = taster + self.persistentLoad = persistentLoad + self.references = {} + self.postCallbacks = [] + self.invoker = invoker + + def unjellyFull(self, obj): + o = self.unjelly(obj) + for m in self.postCallbacks: + m() + return o + + def _maybePostUnjelly(self, unjellied): + """ + If the given object has support for the C{postUnjelly} hook, set it up + to be called at the end of deserialization. + + @param unjellied: an object that has already been unjellied. + + @return: C{unjellied} + """ + if hasattr(unjellied, "postUnjelly"): + self.postCallbacks.append(unjellied.postUnjelly) + return unjellied + + def unjelly(self, obj): + if type(obj) is not list: + return obj + jelTypeBytes = obj[0] + if not self.taster.isTypeAllowed(jelTypeBytes): + raise InsecureJelly(jelTypeBytes) + regClass = unjellyableRegistry.get(jelTypeBytes) + if regClass is not None: + method = getattr(_createBlank(regClass), "unjellyFor", regClass) + return self._maybePostUnjelly(method(self, obj)) + regFactory = unjellyableFactoryRegistry.get(jelTypeBytes) + if regFactory is not None: + return self._maybePostUnjelly(regFactory(self.unjelly(obj[1]))) + + jelTypeText = nativeString(jelTypeBytes) + thunk = getattr(self, "_unjelly_%s" % jelTypeText, None) + if thunk is not None: + return thunk(obj[1:]) + else: + nameSplit = jelTypeText.split(".") + modName = ".".join(nameSplit[:-1]) + if not self.taster.isModuleAllowed(modName): + raise InsecureJelly( + f"Module {modName} not allowed (in type {jelTypeText})." + ) + clz = namedObject(jelTypeText) + if not self.taster.isClassAllowed(clz): + raise InsecureJelly("Class %s not allowed." % jelTypeText) + return self._genericUnjelly(clz, obj[1]) + + def _genericUnjelly(self, cls, state): + """ + Unjelly a type for which no specific unjellier is registered, but which + is nonetheless allowed. + + @param cls: the class of the instance we are unjellying. + @type cls: L{type} + + @param state: The jellied representation of the object's state; its + C{__dict__} unless it has a C{__setstate__} that takes something + else. + @type state: L{list} + + @return: the new, unjellied instance. + """ + return self._maybePostUnjelly(_newInstance(cls, self.unjelly(state))) + + def _unjelly_None(self, exp): + return None + + def _unjelly_unicode(self, exp): + return str(exp[0], "UTF-8") + + def _unjelly_decimal(self, exp): + """ + Unjelly decimal objects. + """ + value = exp[0] + exponent = exp[1] + if value < 0: + sign = 1 + else: + sign = 0 + guts = decimal.Decimal(value).as_tuple()[1] + return decimal.Decimal((sign, guts, exponent)) + + def _unjelly_boolean(self, exp): + assert exp[0] in (b"true", b"false") + return exp[0] == b"true" + + def _unjelly_datetime(self, exp): + return datetime.datetime(*map(int, exp[0].split())) + + def _unjelly_date(self, exp): + return datetime.date(*map(int, exp[0].split())) + + def _unjelly_time(self, exp): + return datetime.time(*map(int, exp[0].split())) + + def _unjelly_timedelta(self, exp): + days, seconds, microseconds = map(int, exp[0].split()) + return datetime.timedelta(days=days, seconds=seconds, microseconds=microseconds) + + def unjellyInto(self, obj, loc, jel): + o = self.unjelly(jel) + if isinstance(o, NotKnown): + o.addDependant(obj, loc) + obj[loc] = o + return o + + def _unjelly_dereference(self, lst): + refid = lst[0] + x = self.references.get(refid) + if x is not None: + return x + der = _Dereference(refid) + self.references[refid] = der + return der + + def _unjelly_reference(self, lst): + refid = lst[0] + exp = lst[1] + o = self.unjelly(exp) + ref = self.references.get(refid) + if ref is None: + self.references[refid] = o + elif isinstance(ref, NotKnown): + ref.resolveDependants(o) + self.references[refid] = o + else: + assert 0, "Multiple references with same ID!" + return o + + def _unjelly_tuple(self, lst): + l = list(range(len(lst))) + finished = 1 + for elem in l: + if isinstance(self.unjellyInto(l, elem, lst[elem]), NotKnown): + finished = 0 + if finished: + return tuple(l) + else: + return _Tuple(l) + + def _unjelly_list(self, lst): + l = list(range(len(lst))) + for elem in l: + self.unjellyInto(l, elem, lst[elem]) + return l + + def _unjellySetOrFrozenset(self, lst, containerType): + """ + Helper method to unjelly set or frozenset. + + @param lst: the content of the set. + @type lst: C{list} + + @param containerType: the type of C{set} to use. + """ + l = list(range(len(lst))) + finished = True + for elem in l: + data = self.unjellyInto(l, elem, lst[elem]) + if isinstance(data, NotKnown): + finished = False + if not finished: + return _Container(l, containerType) + else: + return containerType(l) + + def _unjelly_set(self, lst): + """ + Unjelly set using the C{set} builtin. + """ + return self._unjellySetOrFrozenset(lst, set) + + def _unjelly_frozenset(self, lst): + """ + Unjelly frozenset using the C{frozenset} builtin. + """ + return self._unjellySetOrFrozenset(lst, frozenset) + + def _unjelly_dictionary(self, lst): + d = {} + for k, v in lst: + kvd = _DictKeyAndValue(d) + self.unjellyInto(kvd, 0, k) + self.unjellyInto(kvd, 1, v) + return d + + def _unjelly_module(self, rest): + moduleName = nativeString(rest[0]) + if type(moduleName) != str: + raise InsecureJelly("Attempted to unjelly a module with a non-string name.") + if not self.taster.isModuleAllowed(moduleName): + raise InsecureJelly(f"Attempted to unjelly module named {moduleName!r}") + mod = __import__(moduleName, {}, {}, "x") + return mod + + def _unjelly_class(self, rest): + cname = nativeString(rest[0]) + clist = cname.split(nativeString(".")) + modName = nativeString(".").join(clist[:-1]) + if not self.taster.isModuleAllowed(modName): + raise InsecureJelly("module %s not allowed" % modName) + klaus = namedObject(cname) + objType = type(klaus) + if objType is not type: + raise InsecureJelly( + "class %r unjellied to something that isn't a class: %r" + % (cname, klaus) + ) + if not self.taster.isClassAllowed(klaus): + raise InsecureJelly("class not allowed: %s" % qual(klaus)) + return klaus + + def _unjelly_function(self, rest): + fname = nativeString(rest[0]) + modSplit = fname.split(nativeString(".")) + modName = nativeString(".").join(modSplit[:-1]) + if not self.taster.isModuleAllowed(modName): + raise InsecureJelly("Module not allowed: %s" % modName) + # XXX do I need an isFunctionAllowed? + function = namedAny(fname) + return function + + def _unjelly_persistent(self, rest): + if self.persistentLoad: + pload = self.persistentLoad(rest[0], self) + return pload + else: + return Unpersistable("Persistent callback not found") + + def _unjelly_instance(self, rest): + """ + (internal) Unjelly an instance. + + Called to handle the deprecated I{instance} token. + + @param rest: The s-expression representing the instance. + + @return: The unjellied instance. + """ + warnings.warn_explicit( + "Unjelly support for the instance atom is deprecated since " + "Twisted 15.0.0. Upgrade peer for modern instance support.", + category=DeprecationWarning, + filename="", + lineno=0, + ) + + clz = self.unjelly(rest[0]) + return self._genericUnjelly(clz, rest[1]) + + def _unjelly_unpersistable(self, rest): + return Unpersistable(f"Unpersistable data: {rest[0]}") + + def _unjelly_method(self, rest): + """ + (internal) Unjelly a method. + """ + im_name = rest[0] + im_self = self.unjelly(rest[1]) + im_class = self.unjelly(rest[2]) + if not isinstance(im_class, type): + raise InsecureJelly("Method found with non-class class.") + if im_name in im_class.__dict__: + if im_self is None: + im = getattr(im_class, im_name) + elif isinstance(im_self, NotKnown): + im = _InstanceMethod(im_name, im_self, im_class) + else: + im = types.MethodType( + im_class.__dict__[im_name], im_self, *([im_class] * (False)) + ) + else: + raise TypeError("instance method changed") + return im + + +#### Published Interface. + + +class InsecureJelly(Exception): + """ + This exception will be raised when a jelly is deemed `insecure'; e.g. it + contains a type, class, or module disallowed by the specified `taster' + """ + + +class DummySecurityOptions: + """ + DummySecurityOptions() -> insecure security options + Dummy security options -- this class will allow anything. + """ + + def isModuleAllowed(self, moduleName): + """ + DummySecurityOptions.isModuleAllowed(moduleName) -> boolean + returns 1 if a module by that name is allowed, 0 otherwise + """ + return 1 + + def isClassAllowed(self, klass): + """ + DummySecurityOptions.isClassAllowed(class) -> boolean + Assumes the module has already been allowed. Returns 1 if the given + class is allowed, 0 otherwise. + """ + return 1 + + def isTypeAllowed(self, typeName): + """ + DummySecurityOptions.isTypeAllowed(typeName) -> boolean + Returns 1 if the given type is allowed, 0 otherwise. + """ + return 1 + + +class SecurityOptions: + """ + This will by default disallow everything, except for 'none'. + """ + + basicTypes = [ + "dictionary", + "list", + "tuple", + "reference", + "dereference", + "unpersistable", + "persistent", + "long_int", + "long", + "dict", + ] + + def __init__(self): + """ + SecurityOptions() initialize. + """ + # I don't believe any of these types can ever pose a security hazard, + # except perhaps "reference"... + self.allowedTypes = { + b"None": 1, + b"bool": 1, + b"boolean": 1, + b"string": 1, + b"str": 1, + b"int": 1, + b"float": 1, + b"datetime": 1, + b"time": 1, + b"date": 1, + b"timedelta": 1, + b"NoneType": 1, + b"unicode": 1, + b"decimal": 1, + b"set": 1, + b"frozenset": 1, + } + self.allowedModules = {} + self.allowedClasses = {} + + def allowBasicTypes(self): + """ + Allow all `basic' types. (Dictionary and list. Int, string, and float + are implicitly allowed.) + """ + self.allowTypes(*self.basicTypes) + + def allowTypes(self, *types): + """ + SecurityOptions.allowTypes(typeString): Allow a particular type, by its + name. + """ + for typ in types: + if isinstance(typ, str): + typ = typ.encode("utf-8") + if not isinstance(typ, bytes): + typ = qual(typ) + self.allowedTypes[typ] = 1 + + def allowInstancesOf(self, *classes): + """ + SecurityOptions.allowInstances(klass, klass, ...): allow instances + of the specified classes + + This will also allow the 'instance', 'class' (renamed 'classobj' in + Python 2.3), and 'module' types, as well as basic types. + """ + self.allowBasicTypes() + self.allowTypes("instance", "class", "classobj", "module") + for klass in classes: + self.allowTypes(qual(klass)) + self.allowModules(klass.__module__) + self.allowedClasses[klass] = 1 + + def allowModules(self, *modules): + """ + SecurityOptions.allowModules(module, module, ...): allow modules by + name. This will also allow the 'module' type. + """ + for module in modules: + if type(module) == types.ModuleType: + module = module.__name__ + + if not isinstance(module, bytes): + module = module.encode("utf-8") + + self.allowedModules[module] = 1 + + def isModuleAllowed(self, moduleName): + """ + SecurityOptions.isModuleAllowed(moduleName) -> boolean + returns 1 if a module by that name is allowed, 0 otherwise + """ + if not isinstance(moduleName, bytes): + moduleName = moduleName.encode("utf-8") + + return moduleName in self.allowedModules + + def isClassAllowed(self, klass): + """ + SecurityOptions.isClassAllowed(class) -> boolean + Assumes the module has already been allowed. Returns 1 if the given + class is allowed, 0 otherwise. + """ + return klass in self.allowedClasses + + def isTypeAllowed(self, typeName): + """ + SecurityOptions.isTypeAllowed(typeName) -> boolean + Returns 1 if the given type is allowed, 0 otherwise. + """ + if not isinstance(typeName, bytes): + typeName = typeName.encode("utf-8") + + return typeName in self.allowedTypes or b"." in typeName + + +globalSecurity = SecurityOptions() +globalSecurity.allowBasicTypes() + + +def jelly(object, taster=DummySecurityOptions(), persistentStore=None, invoker=None): + """ + Serialize to s-expression. + + Returns a list which is the serialized representation of an object. An + optional 'taster' argument takes a SecurityOptions and will mark any + insecure objects as unpersistable rather than serializing them. + """ + return _Jellier(taster, persistentStore, invoker).jelly(object) + + +def unjelly(sexp, taster=DummySecurityOptions(), persistentLoad=None, invoker=None): + """ + Unserialize from s-expression. + + Takes a list that was the result from a call to jelly() and unserializes + an arbitrary object from it. The optional 'taster' argument, an instance + of SecurityOptions, will cause an InsecureJelly exception to be raised if a + disallowed type, module, or class attempted to unserialize. + """ + return _Unjellier(taster, persistentLoad, invoker).unjellyFull(sexp) diff --git a/contrib/python/Twisted/py3/twisted/spread/pb.py b/contrib/python/Twisted/py3/twisted/spread/pb.py new file mode 100644 index 0000000000..dcf545015d --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/spread/pb.py @@ -0,0 +1,1674 @@ +# -*- test-case-name: twisted.spread.test.test_pb -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Perspective Broker + +\"This isn\'t a professional opinion, but it's probably got enough +internet to kill you.\" --glyph + +Introduction +============ + +This is a broker for proxies for and copies of objects. It provides a +translucent interface layer to those proxies. + +The protocol is not opaque, because it provides objects which represent the +remote proxies and require no context (server references, IDs) to operate on. + +It is not transparent because it does I{not} attempt to make remote objects +behave identically, or even similarly, to local objects. Method calls are +invoked asynchronously, and specific rules are applied when serializing +arguments. + +To get started, begin with L{PBClientFactory} and L{PBServerFactory}. + +@author: Glyph Lefkowitz +""" + + +import random +from hashlib import md5 + +from zope.interface import Interface, implementer + +from twisted.cred.credentials import ( + Anonymous, + IAnonymous, + ICredentials, + IUsernameHashedPassword, +) +from twisted.cred.portal import Portal +from twisted.internet import defer, protocol +from twisted.persisted import styles + +# Twisted Imports +from twisted.python import failure, log, reflect +from twisted.python.compat import cmp, comparable +from twisted.python.components import registerAdapter +from twisted.spread import banana + +# These three are backwards compatibility aliases for the previous three. +# Ultimately they should be deprecated. -exarkun +from twisted.spread.flavors import ( + Cacheable, + Copyable, + IPBRoot, + Jellyable, + NoSuchMethod, + Referenceable, + RemoteCache, + RemoteCacheObserver, + RemoteCopy, + Root, + Serializable, + Viewable, + ViewPoint, + copyTags, + setCopierForClass, + setCopierForClassTree, + setFactoryForClass, + setUnjellyableFactoryForClass, + setUnjellyableForClass, + setUnjellyableForClassTree, +) +from twisted.spread.interfaces import IJellyable, IUnjellyable +from twisted.spread.jelly import _newInstance, globalSecurity, jelly, unjelly + +MAX_BROKER_REFS = 1024 + +portno = 8787 + + +class ProtocolError(Exception): + """ + This error is raised when an invalid protocol statement is received. + """ + + +class DeadReferenceError(ProtocolError): + """ + This error is raised when a method is called on a dead reference (one whose + broker has been disconnected). + """ + + +class Error(Exception): + """ + This error can be raised to generate known error conditions. + + When a PB callable method (perspective_, remote_, view_) raises + this error, it indicates that a traceback should not be printed, + but instead, the string representation of the exception should be + sent. + """ + + +class RemoteError(Exception): + """ + This class is used to wrap a string-ified exception from the remote side to + be able to reraise it. (Raising string exceptions is no longer possible in + Python 2.6+) + + The value of this exception will be a str() representation of the remote + value. + + @ivar remoteType: The full import path of the exception class which was + raised on the remote end. + @type remoteType: C{str} + + @ivar remoteTraceback: The remote traceback. + @type remoteTraceback: C{str} + + @note: It's not possible to include the remoteTraceback if this exception is + thrown into a generator. It must be accessed as an attribute. + """ + + def __init__(self, remoteType, value, remoteTraceback): + Exception.__init__(self, value) + self.remoteType = remoteType + self.remoteTraceback = remoteTraceback + + +@comparable +class RemoteMethod: + """ + This is a translucent reference to a remote message. + """ + + def __init__(self, obj, name): + """ + Initialize with a L{RemoteReference} and the name of this message. + """ + self.obj = obj + self.name = name + + def __cmp__(self, other): + return cmp((self.obj, self.name), other) + + def __hash__(self): + return hash((self.obj, self.name)) + + def __call__(self, *args, **kw): + """ + Asynchronously invoke a remote method. + """ + return self.obj.broker._sendMessage( + b"", + self.obj.perspective, + self.obj.luid, + self.name.encode("utf-8"), + args, + kw, + ) + + +class PBConnectionLost(Exception): + pass + + +class IPerspective(Interface): + """ + per*spec*tive, n. : The relationship of aspects of a subject to each + other and to a whole: 'a perspective of history'; 'a need to view + the problem in the proper perspective'. + + This is a Perspective Broker-specific wrapper for an avatar. That + is to say, a PB-published view on to the business logic for the + system's concept of a 'user'. + + The concept of attached/detached is no longer implemented by the + framework. The realm is expected to implement such semantics if + needed. + """ + + def perspectiveMessageReceived(broker, message, args, kwargs): + """ + This method is called when a network message is received. + + @arg broker: The Perspective Broker. + + @type message: str + @arg message: The name of the method called by the other end. + + @type args: list in jelly format + @arg args: The arguments that were passed by the other end. It + is recommend that you use the `unserialize' method of the + broker to decode this. + + @type kwargs: dict in jelly format + @arg kwargs: The keyword arguments that were passed by the + other end. It is recommended that you use the + `unserialize' method of the broker to decode this. + + @rtype: A jelly list. + @return: It is recommended that you use the `serialize' method + of the broker on whatever object you need to return to + generate the return value. + """ + + +@implementer(IPerspective) +class Avatar: + """ + A default IPerspective implementor. + + This class is intended to be subclassed, and a realm should return + an instance of such a subclass when IPerspective is requested of + it. + + A peer requesting a perspective will receive only a + L{RemoteReference} to a pb.Avatar. When a method is called on + that L{RemoteReference}, it will translate to a method on the + remote perspective named 'perspective_methodname'. (For more + information on invoking methods on other objects, see + L{flavors.ViewPoint}.) + """ + + def perspectiveMessageReceived(self, broker, message, args, kw): + """ + This method is called when a network message is received. + + This will call:: + + self.perspective_%(message)s(*broker.unserialize(args), + **broker.unserialize(kw)) + + to handle the method; subclasses of Avatar are expected to + implement methods using this naming convention. + """ + + args = broker.unserialize(args, self) + kw = broker.unserialize(kw, self) + method = getattr(self, "perspective_%s" % message) + try: + state = method(*args, **kw) + except TypeError: + log.msg(f"{method} didn't accept {args} and {kw}") + raise + return broker.serialize(state, self, method, args, kw) + + +class AsReferenceable(Referenceable): + """ + A reference directed towards another object. + """ + + def __init__(self, object, messageType="remote"): + self.remoteMessageReceived = getattr(object, messageType + "MessageReceived") + + +@implementer(IUnjellyable) +@comparable +class RemoteReference(Serializable, styles.Ephemeral): + """ + A translucent reference to a remote object. + + I may be a reference to a L{flavors.ViewPoint}, a + L{flavors.Referenceable}, or an L{IPerspective} implementer (e.g., + pb.Avatar). From the client's perspective, it is not possible to + tell which except by convention. + + I am a \"translucent\" reference because although no additional + bookkeeping overhead is given to the application programmer for + manipulating a reference, return values are asynchronous. + + See also L{twisted.internet.defer}. + + @ivar broker: The broker I am obtained through. + @type broker: L{Broker} + """ + + def __init__(self, perspective, broker, luid, doRefCount): + """(internal) Initialize me with a broker and a locally-unique ID. + + The ID is unique only to the particular Perspective Broker + instance. + """ + self.luid = luid + self.broker = broker + self.doRefCount = doRefCount + self.perspective = perspective + self.disconnectCallbacks = [] + + def notifyOnDisconnect(self, callback): + """ + Register a callback to be called if our broker gets disconnected. + + @param callback: a callable which will be called with one + argument, this instance. + """ + assert callable(callback) + self.disconnectCallbacks.append(callback) + if len(self.disconnectCallbacks) == 1: + self.broker.notifyOnDisconnect(self._disconnected) + + def dontNotifyOnDisconnect(self, callback): + """ + Remove a callback that was registered with notifyOnDisconnect. + + @param callback: a callable + """ + self.disconnectCallbacks.remove(callback) + if not self.disconnectCallbacks: + self.broker.dontNotifyOnDisconnect(self._disconnected) + + def _disconnected(self): + """ + Called if we are disconnected and have callbacks registered. + """ + for callback in self.disconnectCallbacks: + callback(self) + self.disconnectCallbacks = None + + def jellyFor(self, jellier): + """ + If I am being sent back to where I came from, serialize as a local backreference. + """ + if jellier.invoker: + assert ( + self.broker == jellier.invoker + ), "Can't send references to brokers other than their own." + return b"local", self.luid + else: + return b"unpersistable", "References cannot be serialized" + + def unjellyFor(self, unjellier, unjellyList): + self.__init__( + unjellier.invoker.unserializingPerspective, + unjellier.invoker, + unjellyList[1], + 1, + ) + return self + + def callRemote(self, _name, *args, **kw): + """ + Asynchronously invoke a remote method. + + @type _name: L{str} + @param _name: the name of the remote method to invoke + @param args: arguments to serialize for the remote function + @param kw: keyword arguments to serialize for the remote function. + @rtype: L{twisted.internet.defer.Deferred} + @returns: a Deferred which will be fired when the result of + this remote call is received. + """ + if not isinstance(_name, bytes): + _name = _name.encode("utf8") + + # Note that we use '_name' instead of 'name' so the user can call + # remote methods with 'name' as a keyword parameter, like this: + # ref.callRemote("getPeopleNamed", count=12, name="Bob") + return self.broker._sendMessage( + b"", self.perspective, self.luid, _name, args, kw + ) + + def remoteMethod(self, key): + """ + + @param key: The key. + @return: A L{RemoteMethod} for this key. + """ + return RemoteMethod(self, key) + + def __cmp__(self, other): + """ + + @param other: another L{RemoteReference} to compare me to. + """ + if isinstance(other, RemoteReference): + if other.broker == self.broker: + return cmp(self.luid, other.luid) + return cmp(self.broker, other) + + def __hash__(self): + """ + Hash me. + """ + return self.luid + + def __del__(self): + """ + Do distributed reference counting on finalization. + """ + if self.doRefCount: + self.broker.sendDecRef(self.luid) + + +setUnjellyableForClass("remote", RemoteReference) + + +class Local: + """ + (internal) A reference to a local object. + """ + + def __init__(self, object, perspective=None): + """ + Initialize. + """ + self.object = object + self.perspective = perspective + self.refcount = 1 + + def __repr__(self) -> str: + return f"<pb.Local {self.object!r} ref:{self.refcount}>" + + def incref(self): + """ + Increment the reference count. + + @return: the reference count after incrementing + """ + self.refcount = self.refcount + 1 + return self.refcount + + def decref(self): + """ + Decrement the reference count. + + @return: the reference count after decrementing + """ + self.refcount = self.refcount - 1 + return self.refcount + + +# Failure +class CopyableFailure(failure.Failure, Copyable): + """ + A L{flavors.RemoteCopy} and L{flavors.Copyable} version of + L{twisted.python.failure.Failure} for serialization. + """ + + unsafeTracebacks = 0 + + def getStateToCopy(self): + """ + Collect state related to the exception which occurred, discarding + state which cannot reasonably be serialized. + """ + state = self.__dict__.copy() + state["tb"] = None + state["frames"] = [] + state["stack"] = [] + state["value"] = str(self.value) # Exception instance + if isinstance(self.type, bytes): + state["type"] = self.type + else: + state["type"] = reflect.qual(self.type).encode("utf-8") # Exception class + if self.unsafeTracebacks: + state["traceback"] = self.getTraceback() + else: + state["traceback"] = "Traceback unavailable\n" + return state + + +class CopiedFailure(RemoteCopy, failure.Failure): + """ + A L{CopiedFailure} is a L{pb.RemoteCopy} of a L{failure.Failure} + transferred via PB. + + @ivar type: The full import path of the exception class which was raised on + the remote end. + @type type: C{str} + + @ivar value: A str() representation of the remote value. + @type value: L{CopiedFailure} or C{str} + + @ivar traceback: The remote traceback. + @type traceback: C{str} + """ + + def printTraceback(self, file=None, elideFrameworkCode=0, detail="default"): + if file is None: + file = log.logfile + failureType = self.type + if not isinstance(failureType, str): + failureType = failureType.decode("utf-8") + file.write("Traceback from remote host -- ") + file.write(failureType + ": " + self.value) + file.write("\n") + + def throwExceptionIntoGenerator(self, g): + """ + Throw the original exception into the given generator, preserving + traceback information if available. In the case of a L{CopiedFailure} + where the exception type is a string, a L{pb.RemoteError} is thrown + instead. + + @return: The next value yielded from the generator. + @raise StopIteration: If there are no more values in the generator. + @raise RemoteError: The wrapped remote exception. + """ + return g.throw(RemoteError(self.type, self.value, self.traceback)) + + printBriefTraceback = printTraceback + printDetailedTraceback = printTraceback + + +setUnjellyableForClass(CopyableFailure, CopiedFailure) + + +def failure2Copyable(fail, unsafeTracebacks=0): + f = _newInstance(CopyableFailure, fail.__dict__) + f.unsafeTracebacks = unsafeTracebacks + return f + + +class Broker(banana.Banana): + """ + I am a broker for objects. + """ + + version = 6 + username = None + factory = None + + def __init__(self, isClient=1, security=globalSecurity): + banana.Banana.__init__(self, isClient) + self.disconnected = 0 + self.disconnects = [] + self.failures = [] + self.connects = [] + self.localObjects = {} + self.security = security + self.pageProducers = [] + self.currentRequestID = 0 + self.currentLocalID = 0 + self.unserializingPerspective = None + # Some terms: + # PUID: process unique ID; return value of id() function. type "int". + # LUID: locally unique ID; an ID unique to an object mapped over this + # connection. type "int" + # GUID: (not used yet) globally unique ID; an ID for an object which + # may be on a redirected or meta server. Type as yet undecided. + # Dictionary mapping LUIDs to local objects. + # set above to allow root object to be assigned before connection is made + # self.localObjects = {} + # Dictionary mapping PUIDs to LUIDs. + self.luids = {} + # Dictionary mapping LUIDs to local (remotely cached) objects. Remotely + # cached means that they're objects which originate here, and were + # copied remotely. + self.remotelyCachedObjects = {} + # Dictionary mapping PUIDs to (cached) LUIDs + self.remotelyCachedLUIDs = {} + # Dictionary mapping (remote) LUIDs to (locally cached) objects. + self.locallyCachedObjects = {} + self.waitingForAnswers = {} + + # Mapping from LUIDs to weakref objects with callbacks for performing + # any local cleanup which may be necessary for the corresponding + # object once it no longer exists. + self._localCleanup = {} + + def resumeProducing(self): + """ + Called when the consumer attached to me runs out of buffer. + """ + # Go backwards over the list so we can remove indexes from it as we go + for pageridx in range(len(self.pageProducers) - 1, -1, -1): + pager = self.pageProducers[pageridx] + pager.sendNextPage() + if not pager.stillPaging(): + del self.pageProducers[pageridx] + if not self.pageProducers: + self.transport.unregisterProducer() + + def pauseProducing(self): + # Streaming producer method; not necessary to implement. + pass + + def stopProducing(self): + # Streaming producer method; not necessary to implement. + pass + + def registerPageProducer(self, pager): + self.pageProducers.append(pager) + if len(self.pageProducers) == 1: + self.transport.registerProducer(self, 0) + + def expressionReceived(self, sexp): + """ + Evaluate an expression as it's received. + """ + if isinstance(sexp, list): + command = sexp[0] + + if not isinstance(command, str): + command = command.decode("utf8") + + methodName = "proto_%s" % command + method = getattr(self, methodName, None) + + if method: + method(*sexp[1:]) + else: + self.sendCall(b"didNotUnderstand", command) + else: + raise ProtocolError("Non-list expression received.") + + def proto_version(self, vnum): + """ + Protocol message: (version version-number) + + Check to make sure that both ends of the protocol are speaking + the same version dialect. + + @param vnum: The version number. + """ + + if vnum != self.version: + raise ProtocolError(f"Version Incompatibility: {self.version} {vnum}") + + def sendCall(self, *exp): + """ + Utility method to send an expression to the other side of the connection. + + @param exp: The expression. + """ + self.sendEncoded(exp) + + def proto_didNotUnderstand(self, command): + """ + Respond to stock 'C{didNotUnderstand}' message. + + Log the command that was not understood and continue. (Note: + this will probably be changed to close the connection or raise + an exception in the future.) + + @param command: The command to log. + """ + log.msg("Didn't understand command: %r" % command) + + def connectionReady(self): + """ + Initialize. Called after Banana negotiation is done. + """ + self.sendCall(b"version", self.version) + for notifier in self.connects: + try: + notifier() + except BaseException: + log.deferr() + self.connects = None + self.factory.clientConnectionMade(self) + + def connectionFailed(self): + # XXX should never get called anymore? check! + for notifier in self.failures: + try: + notifier() + except BaseException: + log.deferr() + self.failures = None + + waitingForAnswers = None + + def connectionLost(self, reason): + """ + The connection was lost. + + @param reason: message to put in L{failure.Failure} + """ + self.disconnected = 1 + # Nuke potential circular references. + self.luids = None + if self.waitingForAnswers: + for d in self.waitingForAnswers.values(): + try: + d.errback(failure.Failure(PBConnectionLost(reason))) + except BaseException: + log.deferr() + # Assure all Cacheable.stoppedObserving are called + for lobj in self.remotelyCachedObjects.values(): + cacheable = lobj.object + perspective = lobj.perspective + try: + cacheable.stoppedObserving( + perspective, RemoteCacheObserver(self, cacheable, perspective) + ) + except BaseException: + log.deferr() + # Loop on a copy to prevent notifiers to mixup + # the list by calling dontNotifyOnDisconnect + for notifier in self.disconnects[:]: + try: + notifier() + except BaseException: + log.deferr() + self.disconnects = None + self.waitingForAnswers = None + self.localSecurity = None + self.remoteSecurity = None + self.remotelyCachedObjects = None + self.remotelyCachedLUIDs = None + self.locallyCachedObjects = None + self.localObjects = None + + def notifyOnDisconnect(self, notifier): + """ + + @param notifier: callback to call when the Broker disconnects. + """ + assert callable(notifier) + self.disconnects.append(notifier) + + def notifyOnFail(self, notifier): + """ + + @param notifier: callback to call if the Broker fails to connect. + """ + assert callable(notifier) + self.failures.append(notifier) + + def notifyOnConnect(self, notifier): + """ + + @param notifier: callback to call when the Broker connects. + """ + assert callable(notifier) + if self.connects is None: + try: + notifier() + except BaseException: + log.err() + else: + self.connects.append(notifier) + + def dontNotifyOnDisconnect(self, notifier): + """ + + @param notifier: callback to remove from list of disconnect callbacks. + """ + try: + self.disconnects.remove(notifier) + except ValueError: + pass + + def localObjectForID(self, luid): + """ + Get a local object for a locally unique ID. + + @return: An object previously stored with L{registerReference} or + L{None} if there is no object which corresponds to the given + identifier. + """ + if isinstance(luid, str): + luid = luid.encode("utf8") + + lob = self.localObjects.get(luid) + if lob is None: + return + return lob.object + + maxBrokerRefsViolations = 0 + + def registerReference(self, object): + """ + Store a persistent reference to a local object and map its + id() to a generated, session-unique ID. + + @param object: a local object + @return: the generated ID + """ + + assert object is not None + puid = object.processUniqueID() + luid = self.luids.get(puid) + if luid is None: + if len(self.localObjects) > MAX_BROKER_REFS: + self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1 + if self.maxBrokerRefsViolations > 3: + self.transport.loseConnection() + raise Error("Maximum PB reference count exceeded. " "Goodbye.") + raise Error("Maximum PB reference count exceeded.") + + luid = self.newLocalID() + self.localObjects[luid] = Local(object) + self.luids[puid] = luid + else: + self.localObjects[luid].incref() + return luid + + def setNameForLocal(self, name, object): + """ + Store a special (string) ID for this object. + + This is how you specify a 'base' set of objects that the remote + protocol can connect to. + + @param name: An ID. + @param object: The object. + """ + if isinstance(name, str): + name = name.encode("utf8") + + assert object is not None + self.localObjects[name] = Local(object) + + def remoteForName(self, name): + """ + Returns an object from the remote name mapping. + + Note that this does not check the validity of the name, only + creates a translucent reference for it. + + @param name: The name to look up. + @return: An object which maps to the name. + """ + if isinstance(name, str): + name = name.encode("utf8") + + return RemoteReference(None, self, name, 0) + + def cachedRemotelyAs(self, instance, incref=0): + """ + + @param instance: The instance to look up. + @param incref: Flag to specify whether to increment the + reference. + @return: An ID that says what this instance is cached as + remotely, or L{None} if it's not. + """ + + puid = instance.processUniqueID() + luid = self.remotelyCachedLUIDs.get(puid) + if (luid is not None) and (incref): + self.remotelyCachedObjects[luid].incref() + return luid + + def remotelyCachedForLUID(self, luid): + """ + + @param luid: The LUID to look up. + @return: An instance which is cached remotely. + """ + return self.remotelyCachedObjects[luid].object + + def cacheRemotely(self, instance): + """ + XXX + + @return: A new LUID. + """ + puid = instance.processUniqueID() + luid = self.newLocalID() + if len(self.remotelyCachedObjects) > MAX_BROKER_REFS: + self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1 + if self.maxBrokerRefsViolations > 3: + self.transport.loseConnection() + raise Error("Maximum PB cache count exceeded. " "Goodbye.") + raise Error("Maximum PB cache count exceeded.") + + self.remotelyCachedLUIDs[puid] = luid + # This table may not be necessary -- for now, it's to make sure that no + # monkey business happens with id(instance) + self.remotelyCachedObjects[luid] = Local(instance, self.serializingPerspective) + return luid + + def cacheLocally(self, cid, instance): + """(internal) + + Store a non-filled-out cached instance locally. + """ + self.locallyCachedObjects[cid] = instance + + def cachedLocallyAs(self, cid): + instance = self.locallyCachedObjects[cid] + return instance + + def serialize(self, object, perspective=None, method=None, args=None, kw=None): + """ + Jelly an object according to the remote security rules for this broker. + + @param object: The object to jelly. + @param perspective: The perspective. + @param method: The method. + @param args: Arguments. + @param kw: Keyword arguments. + """ + + if isinstance(object, defer.Deferred): + object.addCallbacks( + self.serialize, + lambda x: x, + callbackKeywords={ + "perspective": perspective, + "method": method, + "args": args, + "kw": kw, + }, + ) + return object + + # XXX This call is NOT REENTRANT and testing for reentrancy is just + # crazy, so it likely won't be. Don't ever write methods that call the + # broker's serialize() method recursively (e.g. sending a method call + # from within a getState (this causes concurrency problems anyway so + # you really, really shouldn't do it)) + + self.serializingPerspective = perspective + self.jellyMethod = method + self.jellyArgs = args + self.jellyKw = kw + try: + return jelly(object, self.security, None, self) + finally: + self.serializingPerspective = None + self.jellyMethod = None + self.jellyArgs = None + self.jellyKw = None + + def unserialize(self, sexp, perspective=None): + """ + Unjelly an sexp according to the local security rules for this broker. + + @param sexp: The object to unjelly. + @param perspective: The perspective. + """ + + self.unserializingPerspective = perspective + try: + return unjelly(sexp, self.security, None, self) + finally: + self.unserializingPerspective = None + + def newLocalID(self): + """ + + @return: A newly generated LUID. + """ + self.currentLocalID = self.currentLocalID + 1 + return self.currentLocalID + + def newRequestID(self): + """ + + @return: A newly generated request ID. + """ + self.currentRequestID = self.currentRequestID + 1 + return self.currentRequestID + + def _sendMessage(self, prefix, perspective, objectID, message, args, kw): + pbc = None + pbe = None + answerRequired = 1 + if "pbcallback" in kw: + pbc = kw["pbcallback"] + del kw["pbcallback"] + if "pberrback" in kw: + pbe = kw["pberrback"] + del kw["pberrback"] + if "pbanswer" in kw: + assert (not pbe) and (not pbc), "You can't specify a no-answer requirement." + answerRequired = kw["pbanswer"] + del kw["pbanswer"] + if self.disconnected: + raise DeadReferenceError("Calling Stale Broker") + try: + netArgs = self.serialize(args, perspective=perspective, method=message) + netKw = self.serialize(kw, perspective=perspective, method=message) + except BaseException: + return defer.fail(failure.Failure()) + requestID = self.newRequestID() + if answerRequired: + rval = defer.Deferred() + self.waitingForAnswers[requestID] = rval + if pbc or pbe: + log.msg('warning! using deprecated "pbcallback"') + rval.addCallbacks(pbc, pbe) + else: + rval = None + self.sendCall( + prefix + b"message", + requestID, + objectID, + message, + answerRequired, + netArgs, + netKw, + ) + return rval + + def proto_message( + self, requestID, objectID, message, answerRequired, netArgs, netKw + ): + self._recvMessage( + self.localObjectForID, + requestID, + objectID, + message, + answerRequired, + netArgs, + netKw, + ) + + def proto_cachemessage( + self, requestID, objectID, message, answerRequired, netArgs, netKw + ): + self._recvMessage( + self.cachedLocallyAs, + requestID, + objectID, + message, + answerRequired, + netArgs, + netKw, + ) + + def _recvMessage( + self, + findObjMethod, + requestID, + objectID, + message, + answerRequired, + netArgs, + netKw, + ): + """ + Received a message-send. + + Look up message based on object, unserialize the arguments, and + invoke it with args, and send an 'answer' or 'error' response. + + @param findObjMethod: A callable which takes C{objectID} as argument. + @param requestID: The requiest ID. + @param objectID: The object ID. + @param message: The message. + @param answerRequired: + @param netArgs: Arguments. + @param netKw: Keyword arguments. + """ + if not isinstance(message, str): + message = message.decode("utf8") + + try: + object = findObjMethod(objectID) + if object is None: + raise Error("Invalid Object ID") + netResult = object.remoteMessageReceived(self, message, netArgs, netKw) + except Error as e: + if answerRequired: + # If the error is Jellyable or explicitly allowed via our + # security options, send it back and let the code on the + # other end deal with unjellying. If it isn't Jellyable, + # wrap it in a CopyableFailure, which ensures it can be + # unjellied on the other end. We have to do this because + # all errors must be sent back. + if isinstance(e, Jellyable) or self.security.isClassAllowed( + e.__class__ + ): + self._sendError(e, requestID) + else: + self._sendError(CopyableFailure(e), requestID) + except BaseException: + if answerRequired: + log.msg("Peer will receive following PB traceback:", isError=True) + f = CopyableFailure() + self._sendError(f, requestID) + log.err() + else: + if answerRequired: + if isinstance(netResult, defer.Deferred): + args = (requestID,) + netResult.addCallbacks( + self._sendAnswer, + self._sendFailureOrError, + callbackArgs=args, + errbackArgs=args, + ) + # XXX Should this be done somewhere else? + else: + self._sendAnswer(netResult, requestID) + + def _sendAnswer(self, netResult, requestID): + """ + (internal) Send an answer to a previously sent message. + + @param netResult: The answer. + @param requestID: The request ID. + """ + self.sendCall(b"answer", requestID, netResult) + + def proto_answer(self, requestID, netResult): + """ + (internal) Got an answer to a previously sent message. + + Look up the appropriate callback and call it. + + @param requestID: The request ID. + @param netResult: The answer. + """ + d = self.waitingForAnswers[requestID] + del self.waitingForAnswers[requestID] + d.callback(self.unserialize(netResult)) + + def _sendFailureOrError(self, fail, requestID): + """ + Call L{_sendError} or L{_sendFailure}, depending on whether C{fail} + represents an L{Error} subclass or not. + + @param fail: The failure. + @param requestID: The request ID. + """ + if fail.check(Error) is None: + self._sendFailure(fail, requestID) + else: + self._sendError(fail, requestID) + + def _sendFailure(self, fail, requestID): + """ + Log error and then send it. + + @param fail: The failure. + @param requestID: The request ID. + """ + log.msg("Peer will receive following PB traceback:") + log.err(fail) + self._sendError(fail, requestID) + + def _sendError(self, fail, requestID): + """ + (internal) Send an error for a previously sent message. + + @param fail: The failure. + @param requestID: The request ID. + """ + if isinstance(fail, failure.Failure): + # If the failures value is jellyable or allowed through security, + # send the value + if isinstance(fail.value, Jellyable) or self.security.isClassAllowed( + fail.value.__class__ + ): + fail = fail.value + elif not isinstance(fail, CopyableFailure): + fail = failure2Copyable(fail, self.factory.unsafeTracebacks) + if isinstance(fail, CopyableFailure): + fail.unsafeTracebacks = self.factory.unsafeTracebacks + self.sendCall(b"error", requestID, self.serialize(fail)) + + def proto_error(self, requestID, fail): + """ + (internal) Deal with an error. + + @param requestID: The request ID. + @param fail: The failure. + """ + d = self.waitingForAnswers[requestID] + del self.waitingForAnswers[requestID] + d.errback(self.unserialize(fail)) + + def sendDecRef(self, objectID): + """ + (internal) Send a DECREF directive. + + @param objectID: The object ID. + """ + self.sendCall(b"decref", objectID) + + def proto_decref(self, objectID): + """ + (internal) Decrement the reference count of an object. + + If the reference count is zero, it will free the reference to this + object. + + @param objectID: The object ID. + """ + if isinstance(objectID, str): + objectID = objectID.encode("utf8") + refs = self.localObjects[objectID].decref() + if refs == 0: + puid = self.localObjects[objectID].object.processUniqueID() + del self.luids[puid] + del self.localObjects[objectID] + self._localCleanup.pop(puid, lambda: None)() + + def decCacheRef(self, objectID): + """ + (internal) Send a DECACHE directive. + + @param objectID: The object ID. + """ + self.sendCall(b"decache", objectID) + + def proto_decache(self, objectID): + """ + (internal) Decrement the reference count of a cached object. + + If the reference count is zero, free the reference, then send an + 'uncached' directive. + + @param objectID: The object ID. + """ + refs = self.remotelyCachedObjects[objectID].decref() + # log.msg('decaching: %s #refs: %s' % (objectID, refs)) + if refs == 0: + lobj = self.remotelyCachedObjects[objectID] + cacheable = lobj.object + perspective = lobj.perspective + # TODO: force_decache needs to be able to force-invalidate a + # cacheable reference. + try: + cacheable.stoppedObserving( + perspective, RemoteCacheObserver(self, cacheable, perspective) + ) + except BaseException: + log.deferr() + puid = cacheable.processUniqueID() + del self.remotelyCachedLUIDs[puid] + del self.remotelyCachedObjects[objectID] + self.sendCall(b"uncache", objectID) + + def proto_uncache(self, objectID): + """ + (internal) Tell the client it is now OK to uncache an object. + + @param objectID: The object ID. + """ + # log.msg("uncaching locally %d" % objectID) + obj = self.locallyCachedObjects[objectID] + obj.broker = None + ## def reallyDel(obj=obj): + ## obj.__really_del__() + ## obj.__del__ = reallyDel + del self.locallyCachedObjects[objectID] + + +def respond(challenge, password): + """ + Respond to a challenge. + + This is useful for challenge/response authentication. + + @param challenge: A challenge. + @param password: A password. + @return: The password hashed twice. + """ + m = md5() + m.update(password) + hashedPassword = m.digest() + m = md5() + m.update(hashedPassword) + m.update(challenge) + doubleHashedPassword = m.digest() + return doubleHashedPassword + + +def challenge(): + """ + + @return: Some random data. + """ + crap = bytes(random.randint(65, 90) for x in range(random.randrange(15, 25))) + crap = md5(crap).digest() + return crap + + +class PBClientFactory(protocol.ClientFactory): + """ + Client factory for PB brokers. + + As with all client factories, use with reactor.connectTCP/SSL/etc.. + getPerspective and getRootObject can be called either before or + after the connect. + """ + + protocol = Broker + unsafeTracebacks = False + + def __init__(self, unsafeTracebacks=False, security=globalSecurity): + """ + @param unsafeTracebacks: if set, tracebacks for exceptions will be sent + over the wire. + @type unsafeTracebacks: C{bool} + + @param security: security options used by the broker, default to + C{globalSecurity}. + @type security: L{twisted.spread.jelly.SecurityOptions} + """ + self.unsafeTracebacks = unsafeTracebacks + self.security = security + self._reset() + + def buildProtocol(self, addr): + """ + Build the broker instance, passing the security options to it. + """ + p = self.protocol(isClient=True, security=self.security) + p.factory = self + return p + + def _reset(self): + self.rootObjectRequests = [] # list of deferred + self._broker = None + self._root = None + + def _failAll(self, reason): + deferreds = self.rootObjectRequests + self._reset() + for d in deferreds: + d.errback(reason) + + def clientConnectionFailed(self, connector, reason): + self._failAll(reason) + + def clientConnectionLost(self, connector, reason, reconnecting=0): + """ + Reconnecting subclasses should call with reconnecting=1. + """ + if reconnecting: + # Any pending requests will go to next connection attempt + # so we don't fail them. + self._broker = None + self._root = None + else: + self._failAll(reason) + + def clientConnectionMade(self, broker): + self._broker = broker + self._root = broker.remoteForName("root") + ds = self.rootObjectRequests + self.rootObjectRequests = [] + for d in ds: + d.callback(self._root) + + def getRootObject(self): + """ + Get root object of remote PB server. + + @return: Deferred of the root object. + """ + if self._broker and not self._broker.disconnected: + return defer.succeed(self._root) + d = defer.Deferred() + self.rootObjectRequests.append(d) + return d + + def disconnect(self): + """ + If the factory is connected, close the connection. + + Note that if you set up the factory to reconnect, you will need to + implement extra logic to prevent automatic reconnection after this + is called. + """ + if self._broker: + self._broker.transport.loseConnection() + + def _cbSendUsername(self, root, username, password, client): + return root.callRemote("login", username).addCallback( + self._cbResponse, password, client + ) + + def _cbResponse(self, challenges, password, client): + challenge, challenger = challenges + return challenger.callRemote("respond", respond(challenge, password), client) + + def _cbLoginAnonymous(self, root, client): + """ + Attempt an anonymous login on the given remote root object. + + @type root: L{RemoteReference} + @param root: The object on which to attempt the login, most likely + returned by a call to L{PBClientFactory.getRootObject}. + + @param client: A jellyable object which will be used as the I{mind} + parameter for the login attempt. + + @rtype: L{Deferred} + @return: A L{Deferred} which will be called back with a + L{RemoteReference} to an avatar when anonymous login succeeds, or + which will errback if anonymous login fails. + """ + return root.callRemote("loginAnonymous", client) + + def login(self, credentials, client=None): + """ + Login and get perspective from remote PB server. + + Currently the following credentials are supported:: + + L{twisted.cred.credentials.IUsernamePassword} + L{twisted.cred.credentials.IAnonymous} + + @rtype: L{Deferred} + @return: A L{Deferred} which will be called back with a + L{RemoteReference} for the avatar logged in to, or which will + errback if login fails. + """ + d = self.getRootObject() + + if IAnonymous.providedBy(credentials): + d.addCallback(self._cbLoginAnonymous, client) + else: + d.addCallback( + self._cbSendUsername, credentials.username, credentials.password, client + ) + return d + + +class PBServerFactory(protocol.ServerFactory): + """ + Server factory for perspective broker. + + Login is done using a Portal object, whose realm is expected to return + avatars implementing IPerspective. The credential checkers in the portal + should accept IUsernameHashedPassword or IUsernameMD5Password. + + Alternatively, any object providing or adaptable to L{IPBRoot} can be + used instead of a portal to provide the root object of the PB server. + """ + + unsafeTracebacks = False + + # object broker factory + protocol = Broker + + def __init__(self, root, unsafeTracebacks=False, security=globalSecurity): + """ + @param root: factory providing the root Referenceable used by the broker. + @type root: object providing or adaptable to L{IPBRoot}. + + @param unsafeTracebacks: if set, tracebacks for exceptions will be sent + over the wire. + @type unsafeTracebacks: C{bool} + + @param security: security options used by the broker, default to + C{globalSecurity}. + @type security: L{twisted.spread.jelly.SecurityOptions} + """ + self.root = IPBRoot(root) + self.unsafeTracebacks = unsafeTracebacks + self.security = security + + def buildProtocol(self, addr): + """ + Return a Broker attached to the factory (as the service provider). + """ + proto = self.protocol(isClient=False, security=self.security) + proto.factory = self + proto.setNameForLocal("root", self.root.rootObject(proto)) + return proto + + def clientConnectionMade(self, protocol): + # XXX does this method make any sense? + pass + + +class IUsernameMD5Password(ICredentials): + """ + I encapsulate a username and a hashed password. + + This credential is used for username/password over PB. CredentialCheckers + which check this kind of credential must store the passwords in plaintext + form or as a MD5 digest. + + @type username: C{str} or C{Deferred} + @ivar username: The username associated with these credentials. + """ + + def checkPassword(password): + """ + Validate these credentials against the correct password. + + @type password: C{str} + @param password: The correct, plaintext password against which to + check. + + @rtype: C{bool} or L{Deferred} + @return: C{True} if the credentials represented by this object match the + given password, C{False} if they do not, or a L{Deferred} which will + be called back with one of these values. + """ + + def checkMD5Password(password): + """ + Validate these credentials against the correct MD5 digest of the + password. + + @type password: C{str} + @param password: The correct MD5 digest of a password against which to + check. + + @rtype: C{bool} or L{Deferred} + @return: C{True} if the credentials represented by this object match the + given digest, C{False} if they do not, or a L{Deferred} which will + be called back with one of these values. + """ + + +@implementer(IPBRoot) +class _PortalRoot: + """ + Root object, used to login to portal. + """ + + def __init__(self, portal): + self.portal = portal + + def rootObject(self, broker): + return _PortalWrapper(self.portal, broker) + + +registerAdapter(_PortalRoot, Portal, IPBRoot) + + +class _JellyableAvatarMixin: + """ + Helper class for code which deals with avatars which PB must be capable of + sending to a peer. + """ + + def _cbLogin(self, result): + """ + Ensure that the avatar to be returned to the client is jellyable and + set up disconnection notification to call the realm's logout object. + """ + (interface, avatar, logout) = result + if not IJellyable.providedBy(avatar): + avatar = AsReferenceable(avatar, "perspective") + + puid = avatar.processUniqueID() + + # only call logout once, whether the connection is dropped (disconnect) + # or a logout occurs (cleanup), and be careful to drop the reference to + # it in either case + logout = [logout] + + def maybeLogout(): + if not logout: + return + fn = logout[0] + del logout[0] + fn() + + self.broker._localCleanup[puid] = maybeLogout + self.broker.notifyOnDisconnect(maybeLogout) + + return avatar + + +class _PortalWrapper(Referenceable, _JellyableAvatarMixin): + """ + Root Referenceable object, used to login to portal. + """ + + def __init__(self, portal, broker): + self.portal = portal + self.broker = broker + + def remote_login(self, username): + """ + Start of username/password login. + + @param username: The username. + """ + c = challenge() + return c, _PortalAuthChallenger(self.portal, self.broker, username, c) + + def remote_loginAnonymous(self, mind): + """ + Attempt an anonymous login. + + @param mind: An object to use as the mind parameter to the portal login + call (possibly None). + + @rtype: L{Deferred} + @return: A Deferred which will be called back with an avatar when login + succeeds or which will be errbacked if login fails somehow. + """ + d = self.portal.login(Anonymous(), mind, IPerspective) + d.addCallback(self._cbLogin) + return d + + +@implementer(IUsernameHashedPassword, IUsernameMD5Password) +class _PortalAuthChallenger(Referenceable, _JellyableAvatarMixin): + """ + Called with response to password challenge. + """ + + def __init__(self, portal, broker, username, challenge): + self.portal = portal + self.broker = broker + self.username = username + self.challenge = challenge + + def remote_respond(self, response, mind): + self.response = response + d = self.portal.login(self, mind, IPerspective) + d.addCallback(self._cbLogin) + return d + + def checkPassword(self, password): + """ + L{IUsernameHashedPassword} + + @param password: The password. + @return: L{_PortalAuthChallenger.checkMD5Password} + """ + return self.checkMD5Password(md5(password).digest()) + + def checkMD5Password(self, md5Password): + """ + L{IUsernameMD5Password} + + @param md5Password: + @rtype: L{bool} + @return: L{True} if password matches. + """ + md = md5() + md.update(md5Password) + md.update(self.challenge) + correct = md.digest() + return self.response == correct + + +__all__ = [ + # Everything from flavors is exposed publicly here. + "IPBRoot", + "Serializable", + "Referenceable", + "NoSuchMethod", + "Root", + "ViewPoint", + "Viewable", + "Copyable", + "Jellyable", + "Cacheable", + "RemoteCopy", + "RemoteCache", + "RemoteCacheObserver", + "copyTags", + "setUnjellyableForClass", + "setUnjellyableFactoryForClass", + "setUnjellyableForClassTree", + "setCopierForClass", + "setFactoryForClass", + "setCopierForClassTree", + "MAX_BROKER_REFS", + "portno", + "ProtocolError", + "DeadReferenceError", + "Error", + "PBConnectionLost", + "RemoteMethod", + "IPerspective", + "Avatar", + "AsReferenceable", + "RemoteReference", + "CopyableFailure", + "CopiedFailure", + "failure2Copyable", + "Broker", + "respond", + "challenge", + "PBClientFactory", + "PBServerFactory", + "IUsernameMD5Password", +] diff --git a/contrib/python/Twisted/py3/twisted/spread/publish.py b/contrib/python/Twisted/py3/twisted/spread/publish.py new file mode 100644 index 0000000000..de525083e5 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/spread/publish.py @@ -0,0 +1,144 @@ +# -*- test-case-name: twisted.spread.test.test_pb -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Persistently cached objects for PB. + +Maintainer: Glyph Lefkowitz + +Future Plans: None known. +""" + + +import time + +from twisted.internet import defer +from twisted.spread import banana, flavors, jelly + + +class Publishable(flavors.Cacheable): + """An object whose cached state persists across sessions.""" + + def __init__(self, publishedID): + self.republish() + self.publishedID = publishedID + + def republish(self): + """Set the timestamp to current and (TODO) update all observers.""" + self.timestamp = time.time() + + def view_getStateToPublish(self, perspective): + "(internal)" + return self.getStateToPublishFor(perspective) + + def getStateToPublishFor(self, perspective): + """Implement me to special-case your state for a perspective.""" + return self.getStateToPublish() + + def getStateToPublish(self): + """Implement me to return state to copy as part of the publish phase.""" + raise NotImplementedError("%s.getStateToPublishFor" % self.__class__) + + def getStateToCacheAndObserveFor(self, perspective, observer): + """Get all necessary metadata to keep a clientside cache.""" + if perspective: + pname = perspective.perspectiveName + sname = perspective.getService().serviceName + else: + pname = "None" + sname = "None" + + return { + "remote": flavors.ViewPoint(perspective, self), + "publishedID": self.publishedID, + "perspective": pname, + "service": sname, + "timestamp": self.timestamp, + } + + +class RemotePublished(flavors.RemoteCache): + """The local representation of remote Publishable object.""" + + isActivated = 0 + _wasCleanWhenLoaded = 0 + + def getFileName(self, ext="pub"): + return "{}-{}-{}.{}".format( + self.service, + self.perspective, + str(self.publishedID), + ext, + ) + + def setCopyableState(self, state): + self.__dict__.update(state) + self._activationListeners = [] + try: + with open(self.getFileName(), "rb") as dataFile: + data = dataFile.read() + except OSError: + recent = 0 + else: + newself = jelly.unjelly(banana.decode(data)) + recent = newself.timestamp == self.timestamp + if recent: + self._cbGotUpdate(newself.__dict__) + self._wasCleanWhenLoaded = 1 + else: + self.remote.callRemote("getStateToPublish").addCallbacks(self._cbGotUpdate) + + def __getstate__(self): + other = self.__dict__.copy() + # Remove PB-specific attributes + del other["broker"] + del other["remote"] + del other["luid"] + # remove my own runtime-tracking stuff + del other["_activationListeners"] + del other["isActivated"] + return other + + def _cbGotUpdate(self, newState): + self.__dict__.update(newState) + self.isActivated = 1 + # send out notifications + for listener in self._activationListeners: + listener(self) + self._activationListeners = [] + self.activated() + with open(self.getFileName(), "wb") as dataFile: + dataFile.write(banana.encode(jelly.jelly(self))) + + def activated(self): + """Implement this method if you want to be notified when your + publishable subclass is activated. + """ + + def callWhenActivated(self, callback): + """Externally register for notification when this publishable has received all relevant data.""" + if self.isActivated: + callback(self) + else: + self._activationListeners.append(callback) + + +def whenReady(d): + """ + Wrap a deferred returned from a pb method in another deferred that + expects a RemotePublished as a result. This will allow you to wait until + the result is really available. + + Idiomatic usage would look like:: + + publish.whenReady(serverObject.getMeAPublishable()).addCallback(lookAtThePublishable) + """ + d2 = defer.Deferred() + d.addCallbacks(_pubReady, d2.errback, callbackArgs=(d2,)) + return d2 + + +def _pubReady(result, d2): + "(internal)" + result.callWhenActivated(d2.callback) diff --git a/contrib/python/Twisted/py3/twisted/spread/util.py b/contrib/python/Twisted/py3/twisted/spread/util.py new file mode 100644 index 0000000000..5e9c81624f --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/spread/util.py @@ -0,0 +1,217 @@ +# -*- test-case-name: twisted.test.test_pb -*- + +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + + +""" +Utility classes for spread. +""" + +from zope.interface import implementer + +from twisted.internet import defer, interfaces +from twisted.protocols import basic +from twisted.python.failure import Failure +from twisted.spread import pb + + +class LocalMethod: + def __init__(self, local, name): + self.local = local + self.name = name + + def __call__(self, *args, **kw): + return self.local.callRemote(self.name, *args, **kw) + + +class LocalAsRemote: + """ + A class useful for emulating the effects of remote behavior locally. + """ + + reportAllTracebacks = 1 + + def callRemote(self, name, *args, **kw): + """ + Call a specially-designated local method. + + self.callRemote('x') will first try to invoke a method named + sync_x and return its result (which should probably be a + Deferred). Second, it will look for a method called async_x, + which will be called and then have its result (or Failure) + automatically wrapped in a Deferred. + """ + if hasattr(self, "sync_" + name): + return getattr(self, "sync_" + name)(*args, **kw) + try: + method = getattr(self, "async_" + name) + return defer.succeed(method(*args, **kw)) + except BaseException: + f = Failure() + if self.reportAllTracebacks: + f.printTraceback() + return defer.fail(f) + + def remoteMethod(self, name): + return LocalMethod(self, name) + + +class LocalAsyncForwarder: + """ + A class useful for forwarding a locally-defined interface. + """ + + def __init__(self, forwarded, interfaceClass, failWhenNotImplemented=0): + assert interfaceClass.providedBy(forwarded) + self.forwarded = forwarded + self.interfaceClass = interfaceClass + self.failWhenNotImplemented = failWhenNotImplemented + + def _callMethod(self, method, *args, **kw): + return getattr(self.forwarded, method)(*args, **kw) + + def callRemote(self, method, *args, **kw): + if self.interfaceClass.queryDescriptionFor(method): + result = defer.maybeDeferred(self._callMethod, method, *args, **kw) + return result + elif self.failWhenNotImplemented: + return defer.fail( + Failure(NotImplementedError, "No Such Method in Interface: %s" % method) + ) + else: + return defer.succeed(None) + + +class Pager: + """ + I am an object which pages out information. + """ + + def __init__(self, collector, callback=None, *args, **kw): + """ + Create a pager with a Reference to a remote collector and + an optional callable to invoke upon completion. + """ + if callable(callback): + self.callback = callback + self.callbackArgs = args + self.callbackKeyword = kw + else: + self.callback = None + self._stillPaging = 1 + self.collector = collector + collector.broker.registerPageProducer(self) + + def stillPaging(self): + """ + (internal) Method called by Broker. + """ + if not self._stillPaging: + self.collector.callRemote("endedPaging", pbanswer=False) + if self.callback is not None: + self.callback(*self.callbackArgs, **self.callbackKeyword) + return self._stillPaging + + def sendNextPage(self): + """ + (internal) Method called by Broker. + """ + self.collector.callRemote("gotPage", self.nextPage(), pbanswer=False) + + def nextPage(self): + """ + Override this to return an object to be sent to my collector. + """ + raise NotImplementedError() + + def stopPaging(self): + """ + Call this when you're done paging. + """ + self._stillPaging = 0 + + +class StringPager(Pager): + """ + A simple pager that splits a string into chunks. + """ + + def __init__(self, collector, st, chunkSize=8192, callback=None, *args, **kw): + self.string = st + self.pointer = 0 + self.chunkSize = chunkSize + Pager.__init__(self, collector, callback, *args, **kw) + + def nextPage(self): + val = self.string[self.pointer : self.pointer + self.chunkSize] + self.pointer += self.chunkSize + if self.pointer >= len(self.string): + self.stopPaging() + return val + + +@implementer(interfaces.IConsumer) +class FilePager(Pager): + """ + Reads a file in chunks and sends the chunks as they come. + """ + + def __init__(self, collector, fd, callback=None, *args, **kw): + self.chunks = [] + Pager.__init__(self, collector, callback, *args, **kw) + self.startProducing(fd) + + def startProducing(self, fd): + self.deferred = basic.FileSender().beginFileTransfer(fd, self) + self.deferred.addBoth(lambda x: self.stopPaging()) + + def registerProducer(self, producer, streaming): + self.producer = producer + if not streaming: + self.producer.resumeProducing() + + def unregisterProducer(self): + self.producer = None + + def write(self, chunk): + self.chunks.append(chunk) + + def sendNextPage(self): + """ + Get the first chunk read and send it to collector. + """ + if not self.chunks: + return + val = self.chunks.pop(0) + self.producer.resumeProducing() + self.collector.callRemote("gotPage", val, pbanswer=False) + + +# Utility paging stuff. +class CallbackPageCollector(pb.Referenceable): + """ + I receive pages from the peer. You may instantiate a Pager with a + remote reference to me. I will call the callback with a list of pages + once they are all received. + """ + + def __init__(self, callback): + self.pages = [] + self.callback = callback + + def remote_gotPage(self, page): + self.pages.append(page) + + def remote_endedPaging(self): + self.callback(self.pages) + + +def getAllPages(referenceable, methodName, *args, **kw): + """ + A utility method that will call a remote method which expects a + PageCollector as the first argument. + """ + d = defer.Deferred() + referenceable.callRemote(methodName, CallbackPageCollector(d.callback), *args, **kw) + return d |