diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/application/internet.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/application/internet.py')
-rw-r--r-- | contrib/python/Twisted/py2/twisted/application/internet.py | 1157 |
1 files changed, 1157 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/application/internet.py b/contrib/python/Twisted/py2/twisted/application/internet.py new file mode 100644 index 0000000000..a8582f4ebe --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/application/internet.py @@ -0,0 +1,1157 @@ +# -*- test-case-name: twisted.application.test.test_internet,twisted.test.test_application,twisted.test.test_cooperator -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Reactor-based Services + +Here are services to run clients, servers and periodic services using +the reactor. + +If you want to run a server service, L{StreamServerEndpointService} defines a +service that can wrap an arbitrary L{IStreamServerEndpoint +<twisted.internet.interfaces.IStreamServerEndpoint>} +as an L{IService}. See also L{twisted.application.strports.service} for +constructing one of these directly from a descriptive string. + +Additionally, this module (dynamically) defines various Service subclasses that +let you represent clients and servers in a Service hierarchy. Endpoints APIs +should be preferred for stream server services, but since those APIs do not yet +exist for clients or datagram services, many of these are still useful. + +They are as follows:: + + TCPServer, TCPClient, + UNIXServer, UNIXClient, + SSLServer, SSLClient, + UDPServer, + UNIXDatagramServer, UNIXDatagramClient, + MulticastServer + +These classes take arbitrary arguments in their constructors and pass +them straight on to their respective reactor.listenXXX or +reactor.connectXXX calls. + +For example, the following service starts a web server on port 8080: +C{TCPServer(8080, server.Site(r))}. See the documentation for the +reactor.listen/connect* methods for more information. +""" + +from __future__ import absolute_import, division + +from random import random as _goodEnoughRandom + +from twisted.python import log +from twisted.logger import Logger + +from twisted.application import service +from twisted.internet import task +from twisted.python.failure import Failure +from twisted.internet.defer import ( + CancelledError, Deferred, succeed, fail, maybeDeferred +) + +from automat import MethodicalMachine + + +def _maybeGlobalReactor(maybeReactor): + """ + @return: the argument, or the global reactor if the argument is L{None}. + """ + if maybeReactor is None: + from twisted.internet import reactor + return reactor + else: + return maybeReactor + + + +class _VolatileDataService(service.Service): + + volatile = [] + + def __getstate__(self): + d = service.Service.__getstate__(self) + for attr in self.volatile: + if attr in d: + del d[attr] + return d + + + +class _AbstractServer(_VolatileDataService): + """ + @cvar volatile: list of attribute to remove from pickling. + @type volatile: C{list} + + @ivar method: the type of method to call on the reactor, one of B{TCP}, + B{UDP}, B{SSL} or B{UNIX}. + @type method: C{str} + + @ivar reactor: the current running reactor. + @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP}, + C{IReactorSSL} or C{IReactorUnix}. + + @ivar _port: instance of port set when the service is started. + @type _port: a provider of L{twisted.internet.interfaces.IListeningPort}. + """ + + volatile = ['_port'] + method = None + reactor = None + + _port = None + + def __init__(self, *args, **kwargs): + self.args = args + if 'reactor' in kwargs: + self.reactor = kwargs.pop("reactor") + self.kwargs = kwargs + + + def privilegedStartService(self): + service.Service.privilegedStartService(self) + self._port = self._getPort() + + + def startService(self): + service.Service.startService(self) + if self._port is None: + self._port = self._getPort() + + + def stopService(self): + service.Service.stopService(self) + # TODO: if startup failed, should shutdown skip stopListening? + # _port won't exist + if self._port is not None: + d = self._port.stopListening() + del self._port + return d + + + def _getPort(self): + """ + Wrapper around the appropriate listen method of the reactor. + + @return: the port object returned by the listen method. + @rtype: an object providing + L{twisted.internet.interfaces.IListeningPort}. + """ + return getattr(_maybeGlobalReactor(self.reactor), + 'listen%s' % (self.method,))(*self.args, **self.kwargs) + + + +class _AbstractClient(_VolatileDataService): + """ + @cvar volatile: list of attribute to remove from pickling. + @type volatile: C{list} + + @ivar method: the type of method to call on the reactor, one of B{TCP}, + B{UDP}, B{SSL} or B{UNIX}. + @type method: C{str} + + @ivar reactor: the current running reactor. + @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP}, + C{IReactorSSL} or C{IReactorUnix}. + + @ivar _connection: instance of connection set when the service is started. + @type _connection: a provider of L{twisted.internet.interfaces.IConnector}. + """ + + volatile = ['_connection'] + method = None + reactor = None + + _connection = None + + def __init__(self, *args, **kwargs): + self.args = args + if 'reactor' in kwargs: + self.reactor = kwargs.pop("reactor") + self.kwargs = kwargs + + + def startService(self): + service.Service.startService(self) + self._connection = self._getConnection() + + + def stopService(self): + service.Service.stopService(self) + if self._connection is not None: + self._connection.disconnect() + del self._connection + + + def _getConnection(self): + """ + Wrapper around the appropriate connect method of the reactor. + + @return: the port object returned by the connect method. + @rtype: an object providing L{twisted.internet.interfaces.IConnector}. + """ + return getattr(_maybeGlobalReactor(self.reactor), + 'connect%s' % (self.method,))(*self.args, **self.kwargs) + + + +_doc={ +'Client': +"""Connect to %(tran)s + +Call reactor.connect%(tran)s when the service starts, with the +arguments given to the constructor. +""", +'Server': +"""Serve %(tran)s clients + +Call reactor.listen%(tran)s when the service starts, with the +arguments given to the constructor. When the service stops, +stop listening. See twisted.internet.interfaces for documentation +on arguments to the reactor method. +""", +} + +for tran in 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split(): + for side in 'Server Client'.split(): + if tran == "Multicast" and side == "Client": + continue + if tran == "UDP" and side == "Client": + continue + base = globals()['_Abstract'+side] + doc = _doc[side] % vars() + + klass = type(tran+side, (base,), {'method': tran, '__doc__': doc}) + globals()[tran+side] = klass + + + +class TimerService(_VolatileDataService): + """ + Service to periodically call a function + + Every C{step} seconds call the given function with the given arguments. + The service starts the calls when it starts, and cancels them + when it stops. + + @ivar clock: Source of time. This defaults to L{None} which is + causes L{twisted.internet.reactor} to be used. + Feel free to set this to something else, but it probably ought to be + set *before* calling L{startService}. + @type clock: L{IReactorTime<twisted.internet.interfaces.IReactorTime>} + + @ivar call: Function and arguments to call periodically. + @type call: L{tuple} of C{(callable, args, kwargs)} + """ + + volatile = ['_loop', '_loopFinished'] + + def __init__(self, step, callable, *args, **kwargs): + """ + @param step: The number of seconds between calls. + @type step: L{float} + + @param callable: Function to call + @type callable: L{callable} + + @param args: Positional arguments to pass to function + @param kwargs: Keyword arguments to pass to function + """ + self.step = step + self.call = (callable, args, kwargs) + self.clock = None + + def startService(self): + service.Service.startService(self) + callable, args, kwargs = self.call + # we have to make a new LoopingCall each time we're started, because + # an active LoopingCall remains active when serialized. If + # LoopingCall were a _VolatileDataService, we wouldn't need to do + # this. + self._loop = task.LoopingCall(callable, *args, **kwargs) + self._loop.clock = _maybeGlobalReactor(self.clock) + self._loopFinished = self._loop.start(self.step, now=True) + self._loopFinished.addErrback(self._failed) + + def _failed(self, why): + # make a note that the LoopingCall is no longer looping, so we don't + # try to shut it down a second time in stopService. I think this + # should be in LoopingCall. -warner + self._loop.running = False + log.err(why) + + def stopService(self): + """ + Stop the service. + + @rtype: L{Deferred<defer.Deferred>} + @return: a L{Deferred<defer.Deferred>} which is fired when the + currently running call (if any) is finished. + """ + if self._loop.running: + self._loop.stop() + self._loopFinished.addCallback(lambda _: + service.Service.stopService(self)) + return self._loopFinished + + + +class CooperatorService(service.Service): + """ + Simple L{service.IService} which starts and stops a L{twisted.internet.task.Cooperator}. + """ + def __init__(self): + self.coop = task.Cooperator(started=False) + + + def coiterate(self, iterator): + return self.coop.coiterate(iterator) + + + def startService(self): + self.coop.start() + + + def stopService(self): + self.coop.stop() + + + +class StreamServerEndpointService(service.Service, object): + """ + A L{StreamServerEndpointService} is an L{IService} which runs a server on a + listening port described by an L{IStreamServerEndpoint + <twisted.internet.interfaces.IStreamServerEndpoint>}. + + @ivar factory: A server factory which will be used to listen on the + endpoint. + + @ivar endpoint: An L{IStreamServerEndpoint + <twisted.internet.interfaces.IStreamServerEndpoint>} provider + which will be used to listen when the service starts. + + @ivar _waitingForPort: a Deferred, if C{listen} has yet been invoked on the + endpoint, otherwise None. + + @ivar _raiseSynchronously: Defines error-handling behavior for the case + where C{listen(...)} raises an exception before C{startService} or + C{privilegedStartService} have completed. + + @type _raiseSynchronously: C{bool} + + @since: 10.2 + """ + + _raiseSynchronously = False + + def __init__(self, endpoint, factory): + self.endpoint = endpoint + self.factory = factory + self._waitingForPort = None + + + def privilegedStartService(self): + """ + Start listening on the endpoint. + """ + service.Service.privilegedStartService(self) + self._waitingForPort = self.endpoint.listen(self.factory) + raisedNow = [] + def handleIt(err): + if self._raiseSynchronously: + raisedNow.append(err) + elif not err.check(CancelledError): + log.err(err) + self._waitingForPort.addErrback(handleIt) + if raisedNow: + raisedNow[0].raiseException() + self._raiseSynchronously = False + + + def startService(self): + """ + Start listening on the endpoint, unless L{privilegedStartService} got + around to it already. + """ + service.Service.startService(self) + if self._waitingForPort is None: + self.privilegedStartService() + + + def stopService(self): + """ + Stop listening on the port if it is already listening, otherwise, + cancel the attempt to listen. + + @return: a L{Deferred<twisted.internet.defer.Deferred>} which fires + with L{None} when the port has stopped listening. + """ + self._waitingForPort.cancel() + def stopIt(port): + if port is not None: + return port.stopListening() + d = self._waitingForPort.addCallback(stopIt) + def stop(passthrough): + self.running = False + return passthrough + d.addBoth(stop) + return d + + + +class _ReconnectingProtocolProxy(object): + """ + A proxy for a Protocol to provide connectionLost notification to a client + connection service, in support of reconnecting when connections are lost. + """ + + def __init__(self, protocol, lostNotification): + """ + Create a L{_ReconnectingProtocolProxy}. + + @param protocol: the application-provided L{interfaces.IProtocol} + provider. + @type protocol: provider of L{interfaces.IProtocol} which may + additionally provide L{interfaces.IHalfCloseableProtocol} and + L{interfaces.IFileDescriptorReceiver}. + + @param lostNotification: a 1-argument callable to invoke with the + C{reason} when the connection is lost. + """ + self._protocol = protocol + self._lostNotification = lostNotification + + + def connectionLost(self, reason): + """ + The connection was lost. Relay this information. + + @param reason: The reason the connection was lost. + + @return: the underlying protocol's result + """ + try: + return self._protocol.connectionLost(reason) + finally: + self._lostNotification(reason) + + + def __getattr__(self, item): + return getattr(self._protocol, item) + + + def __repr__(self): + return '<%s wrapping %r>' % ( + self.__class__.__name__, self._protocol) + + + +class _DisconnectFactory(object): + """ + A L{_DisconnectFactory} is a proxy for L{IProtocolFactory} that catches + C{connectionLost} notifications and relays them. + """ + + def __init__(self, protocolFactory, protocolDisconnected): + self._protocolFactory = protocolFactory + self._protocolDisconnected = protocolDisconnected + + + def buildProtocol(self, addr): + """ + Create a L{_ReconnectingProtocolProxy} with the disconnect-notification + callback we were called with. + + @param addr: The address the connection is coming from. + + @return: a L{_ReconnectingProtocolProxy} for a protocol produced by + C{self._protocolFactory} + """ + return _ReconnectingProtocolProxy( + self._protocolFactory.buildProtocol(addr), + self._protocolDisconnected + ) + + + def __getattr__(self, item): + return getattr(self._protocolFactory, item) + + + def __repr__(self): + return '<%s wrapping %r>' % ( + self.__class__.__name__, self._protocolFactory) + + + +def backoffPolicy(initialDelay=1.0, maxDelay=60.0, factor=1.5, + jitter=_goodEnoughRandom): + """ + A timeout policy for L{ClientService} which computes an exponential backoff + interval with configurable parameters. + + @since: 16.1.0 + + @param initialDelay: Delay for the first reconnection attempt (default + 1.0s). + @type initialDelay: L{float} + + @param maxDelay: Maximum number of seconds between connection attempts + (default 60 seconds, or one minute). Note that this value is before + jitter is applied, so the actual maximum possible delay is this value + plus the maximum possible result of C{jitter()}. + @type maxDelay: L{float} + + @param factor: A multiplicative factor by which the delay grows on each + failed reattempt. Default: 1.5. + @type factor: L{float} + + @param jitter: A 0-argument callable that introduces noise into the delay. + By default, C{random.random}, i.e. a pseudorandom floating-point value + between zero and one. + @type jitter: 0-argument callable returning L{float} + + @return: a 1-argument callable that, given an attempt count, returns a + floating point number; the number of seconds to delay. + @rtype: see L{ClientService.__init__}'s C{retryPolicy} argument. + """ + def policy(attempt): + try: + delay = min(initialDelay * (factor ** min(100, attempt)), maxDelay) + except OverflowError: + delay = maxDelay + return delay + jitter() + return policy + + + +_defaultPolicy = backoffPolicy() + + +def _firstResult(gen): + """ + Return the first element of a generator and exhaust it. + + C{MethodicalMachine.upon}'s C{collector} argument takes a generator of + output results. If the generator is exhausted, the later outputs aren't + actually run. + + @param gen: Generator to extract values from + + @return: The first element of the generator. + """ + return list(gen)[0] + + + +class _ClientMachine(object): + """ + State machine for maintaining a single outgoing connection to an endpoint. + + @see: L{ClientService} + """ + + _machine = MethodicalMachine() + + def __init__(self, endpoint, factory, retryPolicy, clock, + prepareConnection, log): + """ + @see: L{ClientService.__init__} + + @param log: The logger for the L{ClientService} instance this state + machine is associated to. + @type log: L{Logger} + + @ivar _awaitingConnected: notifications to make when connection + succeeds, fails, or is cancelled + @type _awaitingConnected: list of (Deferred, count) tuples + """ + self._endpoint = endpoint + self._failedAttempts = 0 + self._stopped = False + self._factory = factory + self._timeoutForAttempt = retryPolicy + self._clock = clock + self._prepareConnection = prepareConnection + self._connectionInProgress = succeed(None) + + self._awaitingConnected = [] + + self._stopWaiters = [] + self._log = log + + + @_machine.state(initial=True) + def _init(self): + """ + The service has not been started. + """ + + @_machine.state() + def _connecting(self): + """ + The service has started connecting. + """ + + @_machine.state() + def _waiting(self): + """ + The service is waiting for the reconnection period + before reconnecting. + """ + + @_machine.state() + def _connected(self): + """ + The service is connected. + """ + + @_machine.state() + def _disconnecting(self): + """ + The service is disconnecting after being asked to shutdown. + """ + + @_machine.state() + def _restarting(self): + """ + The service is disconnecting and has been asked to restart. + """ + + @_machine.state() + def _stopped(self): + """ + The service has been stopped and is disconnected. + """ + + @_machine.input() + def start(self): + """ + Start this L{ClientService}, initiating the connection retry loop. + """ + + @_machine.output() + def _connect(self): + """ + Start a connection attempt. + """ + factoryProxy = _DisconnectFactory(self._factory, + lambda _: self._clientDisconnected()) + + self._connectionInProgress = ( + self._endpoint.connect(factoryProxy) + .addCallback(self._runPrepareConnection) + .addCallback(self._connectionMade) + .addErrback(self._connectionFailed)) + + + def _runPrepareConnection(self, protocol): + """ + Run any C{prepareConnection} callback with the connected protocol, + ignoring its return value but propagating any failure. + + @param protocol: The protocol of the connection. + @type protocol: L{IProtocol} + + @return: Either: + + - A L{Deferred} that succeeds with the protocol when the + C{prepareConnection} callback has executed successfully. + + - A L{Deferred} that fails when the C{prepareConnection} callback + throws or returns a failed L{Deferred}. + + - The protocol, when no C{prepareConnection} callback is defined. + """ + if self._prepareConnection: + return (maybeDeferred(self._prepareConnection, protocol) + .addCallback(lambda _: protocol)) + return protocol + + + @_machine.output() + def _resetFailedAttempts(self): + """ + Reset the number of failed attempts. + """ + self._failedAttempts = 0 + + + @_machine.input() + def stop(self): + """ + Stop trying to connect and disconnect any current connection. + + @return: a L{Deferred} that fires when all outstanding connections are + closed and all in-progress connection attempts halted. + """ + + @_machine.output() + def _waitForStop(self): + """ + Return a deferred that will fire when the service has finished + disconnecting. + + @return: L{Deferred} that fires when the service has finished + disconnecting. + """ + self._stopWaiters.append(Deferred()) + return self._stopWaiters[-1] + + + @_machine.output() + def _stopConnecting(self): + """ + Stop pending connection attempt. + """ + self._connectionInProgress.cancel() + + + @_machine.output() + def _stopRetrying(self): + """ + Stop pending attempt to reconnect. + """ + self._retryCall.cancel() + del self._retryCall + + + @_machine.output() + def _disconnect(self): + """ + Disconnect the current connection. + """ + self._currentConnection.transport.loseConnection() + + + @_machine.input() + def _connectionMade(self, protocol): + """ + A connection has been made. + + @param protocol: The protocol of the connection. + @type protocol: L{IProtocol} + """ + + @_machine.output() + def _notifyWaiters(self, protocol): + """ + Notify all pending requests for a connection that a connection has been + made. + + @param protocol: The protocol of the connection. + @type protocol: L{IProtocol} + """ + # This should be in _resetFailedAttempts but the signature doesn't + # match. + self._failedAttempts = 0 + + self._currentConnection = protocol._protocol + self._unawait(self._currentConnection) + + + @_machine.input() + def _connectionFailed(self, f): + """ + The current connection attempt failed. + """ + + + @_machine.output() + def _wait(self): + """ + Schedule a retry attempt. + """ + self._doWait() + + @_machine.output() + def _ignoreAndWait(self, f): + """ + Schedule a retry attempt, and ignore the Failure passed in. + """ + return self._doWait() + + def _doWait(self): + self._failedAttempts += 1 + delay = self._timeoutForAttempt(self._failedAttempts) + self._log.info("Scheduling retry {attempt} to connect {endpoint} " + "in {delay} seconds.", attempt=self._failedAttempts, + endpoint=self._endpoint, delay=delay) + self._retryCall = self._clock.callLater(delay, self._reconnect) + + + @_machine.input() + def _reconnect(self): + """ + The wait between connection attempts is done. + """ + + @_machine.input() + def _clientDisconnected(self): + """ + The current connection has been disconnected. + """ + + @_machine.output() + def _forgetConnection(self): + """ + Forget the current connection. + """ + del self._currentConnection + + + @_machine.output() + def _cancelConnectWaiters(self): + """ + Notify all pending requests for a connection that no more connections + are expected. + """ + self._unawait(Failure(CancelledError())) + + @_machine.output() + def _ignoreAndCancelConnectWaiters(self, f): + """ + Notify all pending requests for a connection that no more connections + are expected, after ignoring the Failure passed in. + """ + self._unawait(Failure(CancelledError())) + + + @_machine.output() + def _finishStopping(self): + """ + Notify all deferreds waiting on the service stopping. + """ + self._doFinishStopping() + + @_machine.output() + def _ignoreAndFinishStopping(self, f): + """ + Notify all deferreds waiting on the service stopping, and ignore the + Failure passed in. + """ + self._doFinishStopping() + + def _doFinishStopping(self): + self._stopWaiters, waiting = [], self._stopWaiters + for w in waiting: + w.callback(None) + + + @_machine.input() + def whenConnected(self, failAfterFailures=None): + """ + Retrieve the currently-connected L{Protocol}, or the next one to + connect. + + @param failAfterFailures: number of connection failures after which + the Deferred will deliver a Failure (None means the Deferred will + only fail if/when the service is stopped). Set this to 1 to make + the very first connection failure signal an error. Use 2 to + allow one failure but signal an error if the subsequent retry + then fails. + @type failAfterFailures: L{int} or None + + @return: a Deferred that fires with a protocol produced by the + factory passed to C{__init__} + @rtype: L{Deferred} that may: + + - fire with L{IProtocol} + + - fail with L{CancelledError} when the service is stopped + + - fail with e.g. + L{DNSLookupError<twisted.internet.error.DNSLookupError>} or + L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>} + when the number of consecutive failed connection attempts + equals the value of "failAfterFailures" + """ + + @_machine.output() + def _currentConnection(self, failAfterFailures=None): + """ + Return the currently connected protocol. + + @return: L{Deferred} that is fired with currently connected protocol. + """ + return succeed(self._currentConnection) + + + @_machine.output() + def _noConnection(self, failAfterFailures=None): + """ + Notify the caller that no connection is expected. + + @return: L{Deferred} that is fired with L{CancelledError}. + """ + return fail(CancelledError()) + + + @_machine.output() + def _awaitingConnection(self, failAfterFailures=None): + """ + Return a deferred that will fire with the next connected protocol. + + @return: L{Deferred} that will fire with the next connected protocol. + """ + result = Deferred() + self._awaitingConnected.append((result, failAfterFailures)) + return result + + + @_machine.output() + def _deferredSucceededWithNone(self): + """ + Return a deferred that has already fired with L{None}. + + @return: A L{Deferred} that has already fired with L{None}. + """ + return succeed(None) + + + def _unawait(self, value): + """ + Fire all outstanding L{ClientService.whenConnected} L{Deferred}s. + + @param value: the value to fire the L{Deferred}s with. + """ + self._awaitingConnected, waiting = [], self._awaitingConnected + for (w, remaining) in waiting: + w.callback(value) + + @_machine.output() + def _deliverConnectionFailure(self, f): + """ + Deliver connection failures to any L{ClientService.whenConnected} + L{Deferred}s that have met their failAfterFailures threshold. + + @param f: the Failure to fire the L{Deferred}s with. + """ + ready = [] + notReady = [] + for (w, remaining) in self._awaitingConnected: + if remaining is None: + notReady.append((w, remaining)) + elif remaining <= 1: + ready.append(w) + else: + notReady.append((w, remaining-1)) + self._awaitingConnected = notReady + for w in ready: + w.callback(f) + + # State Transitions + + _init.upon(start, enter=_connecting, + outputs=[_connect]) + _init.upon(stop, enter=_stopped, + outputs=[_deferredSucceededWithNone], + collector=_firstResult) + + _connecting.upon(start, enter=_connecting, outputs=[]) + # Note that this synchonously triggers _connectionFailed in the + # _disconnecting state. + _connecting.upon(stop, enter=_disconnecting, + outputs=[_waitForStop, _stopConnecting], + collector=_firstResult) + _connecting.upon(_connectionMade, enter=_connected, + outputs=[_notifyWaiters]) + _connecting.upon(_connectionFailed, enter=_waiting, + outputs=[_ignoreAndWait, _deliverConnectionFailure]) + + _waiting.upon(start, enter=_waiting, + outputs=[]) + _waiting.upon(stop, enter=_stopped, + outputs=[_waitForStop, + _cancelConnectWaiters, + _stopRetrying, + _finishStopping], + collector=_firstResult) + _waiting.upon(_reconnect, enter=_connecting, + outputs=[_connect]) + + _connected.upon(start, enter=_connected, + outputs=[]) + _connected.upon(stop, enter=_disconnecting, + outputs=[_waitForStop, _disconnect], + collector=_firstResult) + _connected.upon(_clientDisconnected, enter=_waiting, + outputs=[_forgetConnection, _wait]) + + _disconnecting.upon(start, enter=_restarting, + outputs=[_resetFailedAttempts]) + _disconnecting.upon(stop, enter=_disconnecting, + outputs=[_waitForStop], + collector=_firstResult) + _disconnecting.upon(_clientDisconnected, enter=_stopped, + outputs=[_cancelConnectWaiters, + _finishStopping, + _forgetConnection]) + # Note that this is triggered synchonously with the transition from + # _connecting + _disconnecting.upon(_connectionFailed, enter=_stopped, + outputs=[_ignoreAndCancelConnectWaiters, + _ignoreAndFinishStopping]) + + _restarting.upon(start, enter=_restarting, + outputs=[]) + _restarting.upon(stop, enter=_disconnecting, + outputs=[_waitForStop], + collector=_firstResult) + _restarting.upon(_clientDisconnected, enter=_connecting, + outputs=[_finishStopping, _connect]) + + _stopped.upon(start, enter=_connecting, + outputs=[_connect]) + _stopped.upon(stop, enter=_stopped, + outputs=[_deferredSucceededWithNone], + collector=_firstResult) + + _init.upon(whenConnected, enter=_init, + outputs=[_awaitingConnection], + collector=_firstResult) + _connecting.upon(whenConnected, enter=_connecting, + outputs=[_awaitingConnection], + collector=_firstResult) + _waiting.upon(whenConnected, enter=_waiting, + outputs=[_awaitingConnection], + collector=_firstResult) + _connected.upon(whenConnected, enter=_connected, + outputs=[_currentConnection], + collector=_firstResult) + _disconnecting.upon(whenConnected, enter=_disconnecting, + outputs=[_awaitingConnection], + collector=_firstResult) + _restarting.upon(whenConnected, enter=_restarting, + outputs=[_awaitingConnection], + collector=_firstResult) + _stopped.upon(whenConnected, enter=_stopped, + outputs=[_noConnection], + collector=_firstResult) + + + +class ClientService(service.Service, object): + """ + A L{ClientService} maintains a single outgoing connection to a client + endpoint, reconnecting after a configurable timeout when a connection + fails, either before or after connecting. + + @since: 16.1.0 + """ + + _log = Logger() + + def __init__(self, endpoint, factory, retryPolicy=None, clock=None, + prepareConnection=None): + """ + @param endpoint: A L{stream client endpoint + <interfaces.IStreamClientEndpoint>} provider which will be used to + connect when the service starts. + + @param factory: A L{protocol factory <interfaces.IProtocolFactory>} + which will be used to create clients for the endpoint. + + @param retryPolicy: A policy configuring how long L{ClientService} will + wait between attempts to connect to C{endpoint}. + @type retryPolicy: callable taking (the number of failed connection + attempts made in a row (L{int})) and returning the number of + seconds to wait before making another attempt. + + @param clock: The clock used to schedule reconnection. It's mainly + useful to be parametrized in tests. If the factory is serialized, + this attribute will not be serialized, and the default value (the + reactor) will be restored when deserialized. + @type clock: L{IReactorTime} + + @param prepareConnection: A single argument L{callable} that may return + a L{Deferred}. It will be called once with the L{protocol + <interfaces.IProtocol>} each time a new connection is made. It may + call methods on the protocol to prepare it for use (e.g. + authenticate) or validate it (check its health). + + The C{prepareConnection} callable may raise an exception or return + a L{Deferred} which fails to reject the connection. A rejected + connection is not used to fire an L{Deferred} returned by + L{whenConnected}. Instead, L{ClientService} handles the failure + and continues as if the connection attempt were a failure + (incrementing the counter passed to C{retryPolicy}). + + L{Deferred}s returned by L{whenConnected} will not fire until + any L{Deferred} returned by the C{prepareConnection} callable + fire. Otherwise its successful return value is consumed, but + ignored. + + Present Since Twisted 18.7.0 + + @type prepareConnection: L{callable} + + """ + clock = _maybeGlobalReactor(clock) + retryPolicy = _defaultPolicy if retryPolicy is None else retryPolicy + + self._machine = _ClientMachine( + endpoint, factory, retryPolicy, clock, + prepareConnection=prepareConnection, log=self._log, + ) + + + def whenConnected(self, failAfterFailures=None): + """ + Retrieve the currently-connected L{Protocol}, or the next one to + connect. + + @param failAfterFailures: number of connection failures after which + the Deferred will deliver a Failure (None means the Deferred will + only fail if/when the service is stopped). Set this to 1 to make + the very first connection failure signal an error. Use 2 to + allow one failure but signal an error if the subsequent retry + then fails. + @type failAfterFailures: L{int} or None + + @return: a Deferred that fires with a protocol produced by the + factory passed to C{__init__} + @rtype: L{Deferred} that may: + + - fire with L{IProtocol} + + - fail with L{CancelledError} when the service is stopped + + - fail with e.g. + L{DNSLookupError<twisted.internet.error.DNSLookupError>} or + L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>} + when the number of consecutive failed connection attempts + equals the value of "failAfterFailures" + """ + return self._machine.whenConnected(failAfterFailures) + + + def startService(self): + """ + Start this L{ClientService}, initiating the connection retry loop. + """ + if self.running: + self._log.warn("Duplicate ClientService.startService {log_source}") + return + super(ClientService, self).startService() + self._machine.start() + + + def stopService(self): + """ + Stop attempting to reconnect and close any existing connections. + + @return: a L{Deferred} that fires when all outstanding connections are + closed and all in-progress connection attempts halted. + """ + super(ClientService, self).stopService() + return self._machine.stop() + + +__all__ = (['TimerService', 'CooperatorService', 'MulticastServer', + 'StreamServerEndpointService', 'UDPServer', + 'ClientService'] + + [tran + side + for tran in 'TCP UNIX SSL UNIXDatagram'.split() + for side in 'Server Client'.split()]) |