diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-12 07:54:50 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-12 08:05:59 +0300 |
commit | 55cec9f6b0618fb3570fc8ef66aad151f4932591 (patch) | |
tree | 9198c2ca0b0305269062c3674ce79f19c4990e65 /contrib/python/Twisted/py3/twisted/application/internet.py | |
parent | b77b1fbf262ea4f40e33a60ce32c4db4e5e49015 (diff) | |
download | ydb-55cec9f6b0618fb3570fc8ef66aad151f4932591.tar.gz |
Intermediate changes
commit_hash:c229701a8b4f4d9ee57ce1ed763099d862d53fa6
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/application/internet.py')
-rw-r--r-- | contrib/python/Twisted/py3/twisted/application/internet.py | 786 |
1 files changed, 4 insertions, 782 deletions
diff --git a/contrib/python/Twisted/py3/twisted/application/internet.py b/contrib/python/Twisted/py3/twisted/application/internet.py index 9e702f7e844..8bcc9722a0a 100644 --- a/contrib/python/Twisted/py3/twisted/application/internet.py +++ b/contrib/python/Twisted/py3/twisted/application/internet.py @@ -38,35 +38,13 @@ reactor.listen/connect* methods for more information. """ -from random import random as _goodEnoughRandom from typing import List -from automat import MethodicalMachine - from twisted.application import service from twisted.internet import task -from twisted.internet.defer import ( - CancelledError, - Deferred, - fail, - maybeDeferred, - succeed, -) -from twisted.logger import Logger +from twisted.internet.defer import CancelledError from twisted.python import log -from twisted.python.failure import Failure - - -def _maybeGlobalReactor(maybeReactor): - """ - @return: the argument, or the global reactor if the argument is L{None}. - """ - if maybeReactor is None: - from twisted.internet import reactor - - return reactor - else: - return maybeReactor +from ._client_service import ClientService, _maybeGlobalReactor, backoffPolicy class _VolatileDataService(service.Service): @@ -429,764 +407,6 @@ class StreamServerEndpointService(service.Service): return d -class _ReconnectingProtocolProxy: - """ - A proxy for a Protocol to provide connectionLost notification to a client - connection service, in support of reconnecting when connections are lost. - """ - - def __init__(self, protocol, lostNotification): - """ - Create a L{_ReconnectingProtocolProxy}. - - @param protocol: the application-provided L{interfaces.IProtocol} - provider. - @type protocol: provider of L{interfaces.IProtocol} which may - additionally provide L{interfaces.IHalfCloseableProtocol} and - L{interfaces.IFileDescriptorReceiver}. - - @param lostNotification: a 1-argument callable to invoke with the - C{reason} when the connection is lost. - """ - self._protocol = protocol - self._lostNotification = lostNotification - - def connectionLost(self, reason): - """ - The connection was lost. Relay this information. - - @param reason: The reason the connection was lost. - - @return: the underlying protocol's result - """ - try: - return self._protocol.connectionLost(reason) - finally: - self._lostNotification(reason) - - def __getattr__(self, item): - return getattr(self._protocol, item) - - def __repr__(self) -> str: - return f"<{self.__class__.__name__} wrapping {self._protocol!r}>" - - -class _DisconnectFactory: - """ - A L{_DisconnectFactory} is a proxy for L{IProtocolFactory} that catches - C{connectionLost} notifications and relays them. - """ - - def __init__(self, protocolFactory, protocolDisconnected): - self._protocolFactory = protocolFactory - self._protocolDisconnected = protocolDisconnected - - def buildProtocol(self, addr): - """ - Create a L{_ReconnectingProtocolProxy} with the disconnect-notification - callback we were called with. - - @param addr: The address the connection is coming from. - - @return: a L{_ReconnectingProtocolProxy} for a protocol produced by - C{self._protocolFactory} - """ - return _ReconnectingProtocolProxy( - self._protocolFactory.buildProtocol(addr), self._protocolDisconnected - ) - - def __getattr__(self, item): - return getattr(self._protocolFactory, item) - - def __repr__(self) -> str: - return "<{} wrapping {!r}>".format( - self.__class__.__name__, self._protocolFactory - ) - - -def backoffPolicy( - initialDelay=1.0, maxDelay=60.0, factor=1.5, jitter=_goodEnoughRandom -): - """ - A timeout policy for L{ClientService} which computes an exponential backoff - interval with configurable parameters. - - @since: 16.1.0 - - @param initialDelay: Delay for the first reconnection attempt (default - 1.0s). - @type initialDelay: L{float} - - @param maxDelay: Maximum number of seconds between connection attempts - (default 60 seconds, or one minute). Note that this value is before - jitter is applied, so the actual maximum possible delay is this value - plus the maximum possible result of C{jitter()}. - @type maxDelay: L{float} - - @param factor: A multiplicative factor by which the delay grows on each - failed reattempt. Default: 1.5. - @type factor: L{float} - - @param jitter: A 0-argument callable that introduces noise into the delay. - By default, C{random.random}, i.e. a pseudorandom floating-point value - between zero and one. - @type jitter: 0-argument callable returning L{float} - - @return: a 1-argument callable that, given an attempt count, returns a - floating point number; the number of seconds to delay. - @rtype: see L{ClientService.__init__}'s C{retryPolicy} argument. - """ - - def policy(attempt): - try: - delay = min(initialDelay * (factor ** min(100, attempt)), maxDelay) - except OverflowError: - delay = maxDelay - return delay + jitter() - - return policy - - -_defaultPolicy = backoffPolicy() - - -def _firstResult(gen): - """ - Return the first element of a generator and exhaust it. - - C{MethodicalMachine.upon}'s C{collector} argument takes a generator of - output results. If the generator is exhausted, the later outputs aren't - actually run. - - @param gen: Generator to extract values from - - @return: The first element of the generator. - """ - return list(gen)[0] - - -class _ClientMachine: - """ - State machine for maintaining a single outgoing connection to an endpoint. - - @ivar _awaitingConnected: notifications to make when connection - succeeds, fails, or is cancelled - @type _awaitingConnected: list of (Deferred, count) tuples - - @see: L{ClientService} - """ - - _machine = MethodicalMachine() - - def __init__(self, endpoint, factory, retryPolicy, clock, prepareConnection, log): - """ - @see: L{ClientService.__init__} - - @param log: The logger for the L{ClientService} instance this state - machine is associated to. - @type log: L{Logger} - """ - self._endpoint = endpoint - self._failedAttempts = 0 - self._stopped = False - self._factory = factory - self._timeoutForAttempt = retryPolicy - self._clock = clock - self._prepareConnection = prepareConnection - self._connectionInProgress = succeed(None) - - self._awaitingConnected = [] - - self._stopWaiters = [] - self._log = log - - @_machine.state(initial=True) - def _init(self): - """ - The service has not been started. - """ - - @_machine.state() - def _connecting(self): - """ - The service has started connecting. - """ - - @_machine.state() - def _waiting(self): - """ - The service is waiting for the reconnection period - before reconnecting. - """ - - @_machine.state() - def _connected(self): - """ - The service is connected. - """ - - @_machine.state() - def _disconnecting(self): - """ - The service is disconnecting after being asked to shutdown. - """ - - @_machine.state() - def _restarting(self): - """ - The service is disconnecting and has been asked to restart. - """ - - @_machine.state() - def _stopped(self): - """ - The service has been stopped and is disconnected. - """ - - @_machine.input() - def start(self): - """ - Start this L{ClientService}, initiating the connection retry loop. - """ - - @_machine.output() - def _connect(self): - """ - Start a connection attempt. - """ - factoryProxy = _DisconnectFactory( - self._factory, lambda _: self._clientDisconnected() - ) - - self._connectionInProgress = ( - self._endpoint.connect(factoryProxy) - .addCallback(self._runPrepareConnection) - .addCallback(self._connectionMade) - .addErrback(self._connectionFailed) - ) - - def _runPrepareConnection(self, protocol): - """ - Run any C{prepareConnection} callback with the connected protocol, - ignoring its return value but propagating any failure. - - @param protocol: The protocol of the connection. - @type protocol: L{IProtocol} - - @return: Either: - - - A L{Deferred} that succeeds with the protocol when the - C{prepareConnection} callback has executed successfully. - - - A L{Deferred} that fails when the C{prepareConnection} callback - throws or returns a failed L{Deferred}. - - - The protocol, when no C{prepareConnection} callback is defined. - """ - if self._prepareConnection: - return maybeDeferred(self._prepareConnection, protocol).addCallback( - lambda _: protocol - ) - return protocol - - @_machine.output() - def _resetFailedAttempts(self): - """ - Reset the number of failed attempts. - """ - self._failedAttempts = 0 - - @_machine.input() - def stop(self): - """ - Stop trying to connect and disconnect any current connection. - - @return: a L{Deferred} that fires when all outstanding connections are - closed and all in-progress connection attempts halted. - """ - - @_machine.output() - def _waitForStop(self): - """ - Return a deferred that will fire when the service has finished - disconnecting. - - @return: L{Deferred} that fires when the service has finished - disconnecting. - """ - self._stopWaiters.append(Deferred()) - return self._stopWaiters[-1] - - @_machine.output() - def _stopConnecting(self): - """ - Stop pending connection attempt. - """ - self._connectionInProgress.cancel() - - @_machine.output() - def _stopRetrying(self): - """ - Stop pending attempt to reconnect. - """ - self._retryCall.cancel() - del self._retryCall - - @_machine.output() - def _disconnect(self): - """ - Disconnect the current connection. - """ - self._currentConnection.transport.loseConnection() - - @_machine.input() - def _connectionMade(self, protocol): - """ - A connection has been made. - - @param protocol: The protocol of the connection. - @type protocol: L{IProtocol} - """ - - @_machine.output() - def _notifyWaiters(self, protocol): - """ - Notify all pending requests for a connection that a connection has been - made. - - @param protocol: The protocol of the connection. - @type protocol: L{IProtocol} - """ - # This should be in _resetFailedAttempts but the signature doesn't - # match. - self._failedAttempts = 0 - - self._currentConnection = protocol._protocol - self._unawait(self._currentConnection) - - @_machine.input() - def _connectionFailed(self, f): - """ - The current connection attempt failed. - """ - - @_machine.output() - def _wait(self): - """ - Schedule a retry attempt. - """ - self._doWait() - - @_machine.output() - def _ignoreAndWait(self, f): - """ - Schedule a retry attempt, and ignore the Failure passed in. - """ - return self._doWait() - - def _doWait(self): - self._failedAttempts += 1 - delay = self._timeoutForAttempt(self._failedAttempts) - self._log.info( - "Scheduling retry {attempt} to connect {endpoint} " "in {delay} seconds.", - attempt=self._failedAttempts, - endpoint=self._endpoint, - delay=delay, - ) - self._retryCall = self._clock.callLater(delay, self._reconnect) - - @_machine.input() - def _reconnect(self): - """ - The wait between connection attempts is done. - """ - - @_machine.input() - def _clientDisconnected(self): - """ - The current connection has been disconnected. - """ - - @_machine.output() - def _forgetConnection(self): - """ - Forget the current connection. - """ - del self._currentConnection - - @_machine.output() - def _cancelConnectWaiters(self): - """ - Notify all pending requests for a connection that no more connections - are expected. - """ - self._unawait(Failure(CancelledError())) - - @_machine.output() - def _ignoreAndCancelConnectWaiters(self, f): - """ - Notify all pending requests for a connection that no more connections - are expected, after ignoring the Failure passed in. - """ - self._unawait(Failure(CancelledError())) - - @_machine.output() - def _finishStopping(self): - """ - Notify all deferreds waiting on the service stopping. - """ - self._doFinishStopping() - - @_machine.output() - def _ignoreAndFinishStopping(self, f): - """ - Notify all deferreds waiting on the service stopping, and ignore the - Failure passed in. - """ - self._doFinishStopping() - - def _doFinishStopping(self): - self._stopWaiters, waiting = [], self._stopWaiters - for w in waiting: - w.callback(None) - - @_machine.input() - def whenConnected(self, failAfterFailures=None): - """ - Retrieve the currently-connected L{Protocol}, or the next one to - connect. - - @param failAfterFailures: number of connection failures after which - the Deferred will deliver a Failure (None means the Deferred will - only fail if/when the service is stopped). Set this to 1 to make - the very first connection failure signal an error. Use 2 to - allow one failure but signal an error if the subsequent retry - then fails. - @type failAfterFailures: L{int} or None - - @return: a Deferred that fires with a protocol produced by the - factory passed to C{__init__} - @rtype: L{Deferred} that may: - - - fire with L{IProtocol} - - - fail with L{CancelledError} when the service is stopped - - - fail with e.g. - L{DNSLookupError<twisted.internet.error.DNSLookupError>} or - L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>} - when the number of consecutive failed connection attempts - equals the value of "failAfterFailures" - """ - - @_machine.output() - def _currentConnection(self, failAfterFailures=None): - """ - Return the currently connected protocol. - - @return: L{Deferred} that is fired with currently connected protocol. - """ - return succeed(self._currentConnection) - - @_machine.output() - def _noConnection(self, failAfterFailures=None): - """ - Notify the caller that no connection is expected. - - @return: L{Deferred} that is fired with L{CancelledError}. - """ - return fail(CancelledError()) - - @_machine.output() - def _awaitingConnection(self, failAfterFailures=None): - """ - Return a deferred that will fire with the next connected protocol. - - @return: L{Deferred} that will fire with the next connected protocol. - """ - result = Deferred() - self._awaitingConnected.append((result, failAfterFailures)) - return result - - @_machine.output() - def _deferredSucceededWithNone(self): - """ - Return a deferred that has already fired with L{None}. - - @return: A L{Deferred} that has already fired with L{None}. - """ - return succeed(None) - - def _unawait(self, value): - """ - Fire all outstanding L{ClientService.whenConnected} L{Deferred}s. - - @param value: the value to fire the L{Deferred}s with. - """ - self._awaitingConnected, waiting = [], self._awaitingConnected - for w, remaining in waiting: - w.callback(value) - - @_machine.output() - def _deliverConnectionFailure(self, f): - """ - Deliver connection failures to any L{ClientService.whenConnected} - L{Deferred}s that have met their failAfterFailures threshold. - - @param f: the Failure to fire the L{Deferred}s with. - """ - ready = [] - notReady = [] - for w, remaining in self._awaitingConnected: - if remaining is None: - notReady.append((w, remaining)) - elif remaining <= 1: - ready.append(w) - else: - notReady.append((w, remaining - 1)) - self._awaitingConnected = notReady - for w in ready: - w.callback(f) - - # State Transitions - - _init.upon(start, enter=_connecting, outputs=[_connect]) - _init.upon( - stop, - enter=_stopped, - outputs=[_deferredSucceededWithNone], - collector=_firstResult, - ) - - _connecting.upon(start, enter=_connecting, outputs=[]) - # Note that this synchonously triggers _connectionFailed in the - # _disconnecting state. - _connecting.upon( - stop, - enter=_disconnecting, - outputs=[_waitForStop, _stopConnecting], - collector=_firstResult, - ) - _connecting.upon(_connectionMade, enter=_connected, outputs=[_notifyWaiters]) - _connecting.upon( - _connectionFailed, - enter=_waiting, - outputs=[_ignoreAndWait, _deliverConnectionFailure], - ) - - _waiting.upon(start, enter=_waiting, outputs=[]) - _waiting.upon( - stop, - enter=_stopped, - outputs=[_waitForStop, _cancelConnectWaiters, _stopRetrying, _finishStopping], - collector=_firstResult, - ) - _waiting.upon(_reconnect, enter=_connecting, outputs=[_connect]) - - _connected.upon(start, enter=_connected, outputs=[]) - _connected.upon( - stop, - enter=_disconnecting, - outputs=[_waitForStop, _disconnect], - collector=_firstResult, - ) - _connected.upon( - _clientDisconnected, enter=_waiting, outputs=[_forgetConnection, _wait] - ) - - _disconnecting.upon(start, enter=_restarting, outputs=[_resetFailedAttempts]) - _disconnecting.upon( - stop, enter=_disconnecting, outputs=[_waitForStop], collector=_firstResult - ) - _disconnecting.upon( - _clientDisconnected, - enter=_stopped, - outputs=[_cancelConnectWaiters, _finishStopping, _forgetConnection], - ) - # Note that this is triggered synchonously with the transition from - # _connecting - _disconnecting.upon( - _connectionFailed, - enter=_stopped, - outputs=[_ignoreAndCancelConnectWaiters, _ignoreAndFinishStopping], - ) - - _restarting.upon(start, enter=_restarting, outputs=[]) - _restarting.upon( - stop, enter=_disconnecting, outputs=[_waitForStop], collector=_firstResult - ) - _restarting.upon( - _clientDisconnected, enter=_connecting, outputs=[_finishStopping, _connect] - ) - - _stopped.upon(start, enter=_connecting, outputs=[_connect]) - _stopped.upon( - stop, - enter=_stopped, - outputs=[_deferredSucceededWithNone], - collector=_firstResult, - ) - - _init.upon( - whenConnected, - enter=_init, - outputs=[_awaitingConnection], - collector=_firstResult, - ) - _connecting.upon( - whenConnected, - enter=_connecting, - outputs=[_awaitingConnection], - collector=_firstResult, - ) - _waiting.upon( - whenConnected, - enter=_waiting, - outputs=[_awaitingConnection], - collector=_firstResult, - ) - _connected.upon( - whenConnected, - enter=_connected, - outputs=[_currentConnection], - collector=_firstResult, - ) - _disconnecting.upon( - whenConnected, - enter=_disconnecting, - outputs=[_awaitingConnection], - collector=_firstResult, - ) - _restarting.upon( - whenConnected, - enter=_restarting, - outputs=[_awaitingConnection], - collector=_firstResult, - ) - _stopped.upon( - whenConnected, enter=_stopped, outputs=[_noConnection], collector=_firstResult - ) - - -class ClientService(service.Service): - """ - A L{ClientService} maintains a single outgoing connection to a client - endpoint, reconnecting after a configurable timeout when a connection - fails, either before or after connecting. - - @since: 16.1.0 - """ - - _log = Logger() - - def __init__( - self, endpoint, factory, retryPolicy=None, clock=None, prepareConnection=None - ): - """ - @param endpoint: A L{stream client endpoint - <interfaces.IStreamClientEndpoint>} provider which will be used to - connect when the service starts. - - @param factory: A L{protocol factory <interfaces.IProtocolFactory>} - which will be used to create clients for the endpoint. - - @param retryPolicy: A policy configuring how long L{ClientService} will - wait between attempts to connect to C{endpoint}. - @type retryPolicy: callable taking (the number of failed connection - attempts made in a row (L{int})) and returning the number of - seconds to wait before making another attempt. - - @param clock: The clock used to schedule reconnection. It's mainly - useful to be parametrized in tests. If the factory is serialized, - this attribute will not be serialized, and the default value (the - reactor) will be restored when deserialized. - @type clock: L{IReactorTime} - - @param prepareConnection: A single argument L{callable} that may return - a L{Deferred}. It will be called once with the L{protocol - <interfaces.IProtocol>} each time a new connection is made. It may - call methods on the protocol to prepare it for use (e.g. - authenticate) or validate it (check its health). - - The C{prepareConnection} callable may raise an exception or return - a L{Deferred} which fails to reject the connection. A rejected - connection is not used to fire an L{Deferred} returned by - L{whenConnected}. Instead, L{ClientService} handles the failure - and continues as if the connection attempt were a failure - (incrementing the counter passed to C{retryPolicy}). - - L{Deferred}s returned by L{whenConnected} will not fire until - any L{Deferred} returned by the C{prepareConnection} callable - fire. Otherwise its successful return value is consumed, but - ignored. - - Present Since Twisted 18.7.0 - - @type prepareConnection: L{callable} - - """ - clock = _maybeGlobalReactor(clock) - retryPolicy = _defaultPolicy if retryPolicy is None else retryPolicy - - self._machine = _ClientMachine( - endpoint, - factory, - retryPolicy, - clock, - prepareConnection=prepareConnection, - log=self._log, - ) - - def whenConnected(self, failAfterFailures=None): - """ - Retrieve the currently-connected L{Protocol}, or the next one to - connect. - - @param failAfterFailures: number of connection failures after which - the Deferred will deliver a Failure (None means the Deferred will - only fail if/when the service is stopped). Set this to 1 to make - the very first connection failure signal an error. Use 2 to - allow one failure but signal an error if the subsequent retry - then fails. - @type failAfterFailures: L{int} or None - - @return: a Deferred that fires with a protocol produced by the - factory passed to C{__init__} - @rtype: L{Deferred} that may: - - - fire with L{IProtocol} - - - fail with L{CancelledError} when the service is stopped - - - fail with e.g. - L{DNSLookupError<twisted.internet.error.DNSLookupError>} or - L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>} - when the number of consecutive failed connection attempts - equals the value of "failAfterFailures" - """ - return self._machine.whenConnected(failAfterFailures) - - def startService(self): - """ - Start this L{ClientService}, initiating the connection retry loop. - """ - if self.running: - self._log.warn("Duplicate ClientService.startService {log_source}") - return - super().startService() - self._machine.start() - - def stopService(self): - """ - Stop attempting to reconnect and close any existing connections. - - @return: a L{Deferred} that fires when all outstanding connections are - closed and all in-progress connection attempts halted. - """ - super().stopService() - return self._machine.stop() - - __all__ = [ "TimerService", "CooperatorService", @@ -1202,4 +422,6 @@ __all__ = [ "SSLClient", "UNIXDatagramServer", "UNIXDatagramClient", + "ClientService", + "backoffPolicy", ] |