aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/application/internet.py
blob: 9e702f7e8441395aefdf9f7276416bf69e4cb896 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
# -*- test-case-name: twisted.application.test.test_internet,twisted.test.test_application,twisted.test.test_cooperator -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Reactor-based Services

Here are services to run clients, servers and periodic services using
the reactor.

If you want to run a server service, L{StreamServerEndpointService} defines a
service that can wrap an arbitrary L{IStreamServerEndpoint
<twisted.internet.interfaces.IStreamServerEndpoint>}
as an L{IService}. See also L{twisted.application.strports.service} for
constructing one of these directly from a descriptive string.

Additionally, this module (dynamically) defines various Service subclasses that
let you represent clients and servers in a Service hierarchy.  Endpoints APIs
should be preferred for stream server services, but since those APIs do not yet
exist for clients or datagram services, many of these are still useful.

They are as follows::

  TCPServer, TCPClient,
  UNIXServer, UNIXClient,
  SSLServer, SSLClient,
  UDPServer,
  UNIXDatagramServer, UNIXDatagramClient,
  MulticastServer

These classes take arbitrary arguments in their constructors and pass
them straight on to their respective reactor.listenXXX or
reactor.connectXXX calls.

For example, the following service starts a web server on port 8080:
C{TCPServer(8080, server.Site(r))}.  See the documentation for the
reactor.listen/connect* methods for more information.
"""


from random import random as _goodEnoughRandom
from typing import List

from automat import MethodicalMachine

from twisted.application import service
from twisted.internet import task
from twisted.internet.defer import (
    CancelledError,
    Deferred,
    fail,
    maybeDeferred,
    succeed,
)
from twisted.logger import Logger
from twisted.python import log
from twisted.python.failure import Failure


def _maybeGlobalReactor(maybeReactor):
    """
    @return: the argument, or the global reactor if the argument is L{None}.
    """
    if maybeReactor is None:
        from twisted.internet import reactor

        return reactor
    else:
        return maybeReactor


class _VolatileDataService(service.Service):
    volatile: List[str] = []

    def __getstate__(self):
        d = service.Service.__getstate__(self)
        for attr in self.volatile:
            if attr in d:
                del d[attr]
        return d


class _AbstractServer(_VolatileDataService):
    """
    @cvar volatile: list of attribute to remove from pickling.
    @type volatile: C{list}

    @ivar method: the type of method to call on the reactor, one of B{TCP},
        B{UDP}, B{SSL} or B{UNIX}.
    @type method: C{str}

    @ivar reactor: the current running reactor.
    @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP},
        C{IReactorSSL} or C{IReactorUnix}.

    @ivar _port: instance of port set when the service is started.
    @type _port: a provider of L{twisted.internet.interfaces.IListeningPort}.
    """

    volatile = ["_port"]
    method: str = ""
    reactor = None

    _port = None

    def __init__(self, *args, **kwargs):
        self.args = args
        if "reactor" in kwargs:
            self.reactor = kwargs.pop("reactor")
        self.kwargs = kwargs

    def privilegedStartService(self):
        service.Service.privilegedStartService(self)
        self._port = self._getPort()

    def startService(self):
        service.Service.startService(self)
        if self._port is None:
            self._port = self._getPort()

    def stopService(self):
        service.Service.stopService(self)
        # TODO: if startup failed, should shutdown skip stopListening?
        # _port won't exist
        if self._port is not None:
            d = self._port.stopListening()
            del self._port
            return d

    def _getPort(self):
        """
        Wrapper around the appropriate listen method of the reactor.

        @return: the port object returned by the listen method.
        @rtype: an object providing
            L{twisted.internet.interfaces.IListeningPort}.
        """
        return getattr(
            _maybeGlobalReactor(self.reactor),
            "listen{}".format(
                self.method,
            ),
        )(*self.args, **self.kwargs)


class _AbstractClient(_VolatileDataService):
    """
    @cvar volatile: list of attribute to remove from pickling.
    @type volatile: C{list}

    @ivar method: the type of method to call on the reactor, one of B{TCP},
        B{UDP}, B{SSL} or B{UNIX}.
    @type method: C{str}

    @ivar reactor: the current running reactor.
    @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP},
        C{IReactorSSL} or C{IReactorUnix}.

    @ivar _connection: instance of connection set when the service is started.
    @type _connection: a provider of L{twisted.internet.interfaces.IConnector}.
    """

    volatile = ["_connection"]
    method: str = ""
    reactor = None

    _connection = None

    def __init__(self, *args, **kwargs):
        self.args = args
        if "reactor" in kwargs:
            self.reactor = kwargs.pop("reactor")
        self.kwargs = kwargs

    def startService(self):
        service.Service.startService(self)
        self._connection = self._getConnection()

    def stopService(self):
        service.Service.stopService(self)
        if self._connection is not None:
            self._connection.disconnect()
            del self._connection

    def _getConnection(self):
        """
        Wrapper around the appropriate connect method of the reactor.

        @return: the port object returned by the connect method.
        @rtype: an object providing L{twisted.internet.interfaces.IConnector}.
        """
        return getattr(_maybeGlobalReactor(self.reactor), f"connect{self.method}")(
            *self.args, **self.kwargs
        )


_clientDoc = """Connect to {tran}

Call reactor.connect{tran} when the service starts, with the
arguments given to the constructor.
"""

_serverDoc = """Serve {tran} clients

Call reactor.listen{tran} when the service starts, with the
arguments given to the constructor. When the service stops,
stop listening. See twisted.internet.interfaces for documentation
on arguments to the reactor method.
"""


class TCPServer(_AbstractServer):
    __doc__ = _serverDoc.format(tran="TCP")
    method = "TCP"


class TCPClient(_AbstractClient):
    __doc__ = _clientDoc.format(tran="TCP")
    method = "TCP"


class UNIXServer(_AbstractServer):
    __doc__ = _serverDoc.format(tran="UNIX")
    method = "UNIX"


class UNIXClient(_AbstractClient):
    __doc__ = _clientDoc.format(tran="UNIX")
    method = "UNIX"


class SSLServer(_AbstractServer):
    __doc__ = _serverDoc.format(tran="SSL")
    method = "SSL"


class SSLClient(_AbstractClient):
    __doc__ = _clientDoc.format(tran="SSL")
    method = "SSL"


class UDPServer(_AbstractServer):
    __doc__ = _serverDoc.format(tran="UDP")
    method = "UDP"


class UNIXDatagramServer(_AbstractServer):
    __doc__ = _serverDoc.format(tran="UNIXDatagram")
    method = "UNIXDatagram"


class UNIXDatagramClient(_AbstractClient):
    __doc__ = _clientDoc.format(tran="UNIXDatagram")
    method = "UNIXDatagram"


class MulticastServer(_AbstractServer):
    __doc__ = _serverDoc.format(tran="Multicast")
    method = "Multicast"


class TimerService(_VolatileDataService):
    """
    Service to periodically call a function

    Every C{step} seconds call the given function with the given arguments.
    The service starts the calls when it starts, and cancels them
    when it stops.

    @ivar clock: Source of time. This defaults to L{None} which is
        causes L{twisted.internet.reactor} to be used.
        Feel free to set this to something else, but it probably ought to be
        set *before* calling L{startService}.
    @type clock: L{IReactorTime<twisted.internet.interfaces.IReactorTime>}

    @ivar call: Function and arguments to call periodically.
    @type call: L{tuple} of C{(callable, args, kwargs)}
    """

    volatile = ["_loop", "_loopFinished"]

    def __init__(self, step, callable, *args, **kwargs):
        """
        @param step: The number of seconds between calls.
        @type step: L{float}

        @param callable: Function to call
        @type callable: L{callable}

        @param args: Positional arguments to pass to function
        @param kwargs: Keyword arguments to pass to function
        """
        self.step = step
        self.call = (callable, args, kwargs)
        self.clock = None

    def startService(self):
        service.Service.startService(self)
        callable, args, kwargs = self.call
        # we have to make a new LoopingCall each time we're started, because
        # an active LoopingCall remains active when serialized. If
        # LoopingCall were a _VolatileDataService, we wouldn't need to do
        # this.
        self._loop = task.LoopingCall(callable, *args, **kwargs)
        self._loop.clock = _maybeGlobalReactor(self.clock)
        self._loopFinished = self._loop.start(self.step, now=True)
        self._loopFinished.addErrback(self._failed)

    def _failed(self, why):
        # make a note that the LoopingCall is no longer looping, so we don't
        # try to shut it down a second time in stopService. I think this
        # should be in LoopingCall. -warner
        self._loop.running = False
        log.err(why)

    def stopService(self):
        """
        Stop the service.

        @rtype: L{Deferred<defer.Deferred>}
        @return: a L{Deferred<defer.Deferred>} which is fired when the
            currently running call (if any) is finished.
        """
        if self._loop.running:
            self._loop.stop()
        self._loopFinished.addCallback(lambda _: service.Service.stopService(self))
        return self._loopFinished


class CooperatorService(service.Service):
    """
    Simple L{service.IService} which starts and stops a L{twisted.internet.task.Cooperator}.
    """

    def __init__(self):
        self.coop = task.Cooperator(started=False)

    def coiterate(self, iterator):
        return self.coop.coiterate(iterator)

    def startService(self):
        self.coop.start()

    def stopService(self):
        self.coop.stop()


class StreamServerEndpointService(service.Service):
    """
    A L{StreamServerEndpointService} is an L{IService} which runs a server on a
    listening port described by an L{IStreamServerEndpoint
    <twisted.internet.interfaces.IStreamServerEndpoint>}.

    @ivar factory: A server factory which will be used to listen on the
        endpoint.

    @ivar endpoint: An L{IStreamServerEndpoint
        <twisted.internet.interfaces.IStreamServerEndpoint>} provider
        which will be used to listen when the service starts.

    @ivar _waitingForPort: a Deferred, if C{listen} has yet been invoked on the
        endpoint, otherwise None.

    @ivar _raiseSynchronously: Defines error-handling behavior for the case
        where C{listen(...)} raises an exception before C{startService} or
        C{privilegedStartService} have completed.

    @type _raiseSynchronously: C{bool}

    @since: 10.2
    """

    _raiseSynchronously = False

    def __init__(self, endpoint, factory):
        self.endpoint = endpoint
        self.factory = factory
        self._waitingForPort = None

    def privilegedStartService(self):
        """
        Start listening on the endpoint.
        """
        service.Service.privilegedStartService(self)
        self._waitingForPort = self.endpoint.listen(self.factory)
        raisedNow = []

        def handleIt(err):
            if self._raiseSynchronously:
                raisedNow.append(err)
            elif not err.check(CancelledError):
                log.err(err)

        self._waitingForPort.addErrback(handleIt)
        if raisedNow:
            raisedNow[0].raiseException()
        self._raiseSynchronously = False

    def startService(self):
        """
        Start listening on the endpoint, unless L{privilegedStartService} got
        around to it already.
        """
        service.Service.startService(self)
        if self._waitingForPort is None:
            self.privilegedStartService()

    def stopService(self):
        """
        Stop listening on the port if it is already listening, otherwise,
        cancel the attempt to listen.

        @return: a L{Deferred<twisted.internet.defer.Deferred>} which fires
            with L{None} when the port has stopped listening.
        """
        self._waitingForPort.cancel()

        def stopIt(port):
            if port is not None:
                return port.stopListening()

        d = self._waitingForPort.addCallback(stopIt)

        def stop(passthrough):
            self.running = False
            return passthrough

        d.addBoth(stop)
        return d


class _ReconnectingProtocolProxy:
    """
    A proxy for a Protocol to provide connectionLost notification to a client
    connection service, in support of reconnecting when connections are lost.
    """

    def __init__(self, protocol, lostNotification):
        """
        Create a L{_ReconnectingProtocolProxy}.

        @param protocol: the application-provided L{interfaces.IProtocol}
            provider.
        @type protocol: provider of L{interfaces.IProtocol} which may
            additionally provide L{interfaces.IHalfCloseableProtocol} and
            L{interfaces.IFileDescriptorReceiver}.

        @param lostNotification: a 1-argument callable to invoke with the
            C{reason} when the connection is lost.
        """
        self._protocol = protocol
        self._lostNotification = lostNotification

    def connectionLost(self, reason):
        """
        The connection was lost.  Relay this information.

        @param reason: The reason the connection was lost.

        @return: the underlying protocol's result
        """
        try:
            return self._protocol.connectionLost(reason)
        finally:
            self._lostNotification(reason)

    def __getattr__(self, item):
        return getattr(self._protocol, item)

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__} wrapping {self._protocol!r}>"


class _DisconnectFactory:
    """
    A L{_DisconnectFactory} is a proxy for L{IProtocolFactory} that catches
    C{connectionLost} notifications and relays them.
    """

    def __init__(self, protocolFactory, protocolDisconnected):
        self._protocolFactory = protocolFactory
        self._protocolDisconnected = protocolDisconnected

    def buildProtocol(self, addr):
        """
        Create a L{_ReconnectingProtocolProxy} with the disconnect-notification
        callback we were called with.

        @param addr: The address the connection is coming from.

        @return: a L{_ReconnectingProtocolProxy} for a protocol produced by
            C{self._protocolFactory}
        """
        return _ReconnectingProtocolProxy(
            self._protocolFactory.buildProtocol(addr), self._protocolDisconnected
        )

    def __getattr__(self, item):
        return getattr(self._protocolFactory, item)

    def __repr__(self) -> str:
        return "<{} wrapping {!r}>".format(
            self.__class__.__name__, self._protocolFactory
        )


def backoffPolicy(
    initialDelay=1.0, maxDelay=60.0, factor=1.5, jitter=_goodEnoughRandom
):
    """
    A timeout policy for L{ClientService} which computes an exponential backoff
    interval with configurable parameters.

    @since: 16.1.0

    @param initialDelay: Delay for the first reconnection attempt (default
        1.0s).
    @type initialDelay: L{float}

    @param maxDelay: Maximum number of seconds between connection attempts
        (default 60 seconds, or one minute).  Note that this value is before
        jitter is applied, so the actual maximum possible delay is this value
        plus the maximum possible result of C{jitter()}.
    @type maxDelay: L{float}

    @param factor: A multiplicative factor by which the delay grows on each
        failed reattempt.  Default: 1.5.
    @type factor: L{float}

    @param jitter: A 0-argument callable that introduces noise into the delay.
        By default, C{random.random}, i.e. a pseudorandom floating-point value
        between zero and one.
    @type jitter: 0-argument callable returning L{float}

    @return: a 1-argument callable that, given an attempt count, returns a
        floating point number; the number of seconds to delay.
    @rtype: see L{ClientService.__init__}'s C{retryPolicy} argument.
    """

    def policy(attempt):
        try:
            delay = min(initialDelay * (factor ** min(100, attempt)), maxDelay)
        except OverflowError:
            delay = maxDelay
        return delay + jitter()

    return policy


_defaultPolicy = backoffPolicy()


def _firstResult(gen):
    """
    Return the first element of a generator and exhaust it.

    C{MethodicalMachine.upon}'s C{collector} argument takes a generator of
    output results. If the generator is exhausted, the later outputs aren't
    actually run.

    @param gen: Generator to extract values from

    @return: The first element of the generator.
    """
    return list(gen)[0]


class _ClientMachine:
    """
    State machine for maintaining a single outgoing connection to an endpoint.

    @ivar _awaitingConnected: notifications to make when connection
        succeeds, fails, or is cancelled
    @type _awaitingConnected: list of (Deferred, count) tuples

    @see: L{ClientService}
    """

    _machine = MethodicalMachine()

    def __init__(self, endpoint, factory, retryPolicy, clock, prepareConnection, log):
        """
        @see: L{ClientService.__init__}

        @param log: The logger for the L{ClientService} instance this state
            machine is associated to.
        @type log: L{Logger}
        """
        self._endpoint = endpoint
        self._failedAttempts = 0
        self._stopped = False
        self._factory = factory
        self._timeoutForAttempt = retryPolicy
        self._clock = clock
        self._prepareConnection = prepareConnection
        self._connectionInProgress = succeed(None)

        self._awaitingConnected = []

        self._stopWaiters = []
        self._log = log

    @_machine.state(initial=True)
    def _init(self):
        """
        The service has not been started.
        """

    @_machine.state()
    def _connecting(self):
        """
        The service has started connecting.
        """

    @_machine.state()
    def _waiting(self):
        """
        The service is waiting for the reconnection period
        before reconnecting.
        """

    @_machine.state()
    def _connected(self):
        """
        The service is connected.
        """

    @_machine.state()
    def _disconnecting(self):
        """
        The service is disconnecting after being asked to shutdown.
        """

    @_machine.state()
    def _restarting(self):
        """
        The service is disconnecting and has been asked to restart.
        """

    @_machine.state()
    def _stopped(self):
        """
        The service has been stopped and is disconnected.
        """

    @_machine.input()
    def start(self):
        """
        Start this L{ClientService}, initiating the connection retry loop.
        """

    @_machine.output()
    def _connect(self):
        """
        Start a connection attempt.
        """
        factoryProxy = _DisconnectFactory(
            self._factory, lambda _: self._clientDisconnected()
        )

        self._connectionInProgress = (
            self._endpoint.connect(factoryProxy)
            .addCallback(self._runPrepareConnection)
            .addCallback(self._connectionMade)
            .addErrback(self._connectionFailed)
        )

    def _runPrepareConnection(self, protocol):
        """
        Run any C{prepareConnection} callback with the connected protocol,
        ignoring its return value but propagating any failure.

        @param protocol: The protocol of the connection.
        @type protocol: L{IProtocol}

        @return: Either:

            - A L{Deferred} that succeeds with the protocol when the
              C{prepareConnection} callback has executed successfully.

            - A L{Deferred} that fails when the C{prepareConnection} callback
              throws or returns a failed L{Deferred}.

            - The protocol, when no C{prepareConnection} callback is defined.
        """
        if self._prepareConnection:
            return maybeDeferred(self._prepareConnection, protocol).addCallback(
                lambda _: protocol
            )
        return protocol

    @_machine.output()
    def _resetFailedAttempts(self):
        """
        Reset the number of failed attempts.
        """
        self._failedAttempts = 0

    @_machine.input()
    def stop(self):
        """
        Stop trying to connect and disconnect any current connection.

        @return: a L{Deferred} that fires when all outstanding connections are
            closed and all in-progress connection attempts halted.
        """

    @_machine.output()
    def _waitForStop(self):
        """
        Return a deferred that will fire when the service has finished
        disconnecting.

        @return: L{Deferred} that fires when the service has finished
            disconnecting.
        """
        self._stopWaiters.append(Deferred())
        return self._stopWaiters[-1]

    @_machine.output()
    def _stopConnecting(self):
        """
        Stop pending connection attempt.
        """
        self._connectionInProgress.cancel()

    @_machine.output()
    def _stopRetrying(self):
        """
        Stop pending attempt to reconnect.
        """
        self._retryCall.cancel()
        del self._retryCall

    @_machine.output()
    def _disconnect(self):
        """
        Disconnect the current connection.
        """
        self._currentConnection.transport.loseConnection()

    @_machine.input()
    def _connectionMade(self, protocol):
        """
        A connection has been made.

        @param protocol: The protocol of the connection.
        @type protocol: L{IProtocol}
        """

    @_machine.output()
    def _notifyWaiters(self, protocol):
        """
        Notify all pending requests for a connection that a connection has been
        made.

        @param protocol: The protocol of the connection.
        @type protocol: L{IProtocol}
        """
        # This should be in _resetFailedAttempts but the signature doesn't
        # match.
        self._failedAttempts = 0

        self._currentConnection = protocol._protocol
        self._unawait(self._currentConnection)

    @_machine.input()
    def _connectionFailed(self, f):
        """
        The current connection attempt failed.
        """

    @_machine.output()
    def _wait(self):
        """
        Schedule a retry attempt.
        """
        self._doWait()

    @_machine.output()
    def _ignoreAndWait(self, f):
        """
        Schedule a retry attempt, and ignore the Failure passed in.
        """
        return self._doWait()

    def _doWait(self):
        self._failedAttempts += 1
        delay = self._timeoutForAttempt(self._failedAttempts)
        self._log.info(
            "Scheduling retry {attempt} to connect {endpoint} " "in {delay} seconds.",
            attempt=self._failedAttempts,
            endpoint=self._endpoint,
            delay=delay,
        )
        self._retryCall = self._clock.callLater(delay, self._reconnect)

    @_machine.input()
    def _reconnect(self):
        """
        The wait between connection attempts is done.
        """

    @_machine.input()
    def _clientDisconnected(self):
        """
        The current connection has been disconnected.
        """

    @_machine.output()
    def _forgetConnection(self):
        """
        Forget the current connection.
        """
        del self._currentConnection

    @_machine.output()
    def _cancelConnectWaiters(self):
        """
        Notify all pending requests for a connection that no more connections
        are expected.
        """
        self._unawait(Failure(CancelledError()))

    @_machine.output()
    def _ignoreAndCancelConnectWaiters(self, f):
        """
        Notify all pending requests for a connection that no more connections
        are expected, after ignoring the Failure passed in.
        """
        self._unawait(Failure(CancelledError()))

    @_machine.output()
    def _finishStopping(self):
        """
        Notify all deferreds waiting on the service stopping.
        """
        self._doFinishStopping()

    @_machine.output()
    def _ignoreAndFinishStopping(self, f):
        """
        Notify all deferreds waiting on the service stopping, and ignore the
        Failure passed in.
        """
        self._doFinishStopping()

    def _doFinishStopping(self):
        self._stopWaiters, waiting = [], self._stopWaiters
        for w in waiting:
            w.callback(None)

    @_machine.input()
    def whenConnected(self, failAfterFailures=None):
        """
        Retrieve the currently-connected L{Protocol}, or the next one to
        connect.

        @param failAfterFailures: number of connection failures after which
            the Deferred will deliver a Failure (None means the Deferred will
            only fail if/when the service is stopped).  Set this to 1 to make
            the very first connection failure signal an error.  Use 2 to
            allow one failure but signal an error if the subsequent retry
            then fails.
        @type failAfterFailures: L{int} or None

        @return: a Deferred that fires with a protocol produced by the
            factory passed to C{__init__}
        @rtype: L{Deferred} that may:

            - fire with L{IProtocol}

            - fail with L{CancelledError} when the service is stopped

            - fail with e.g.
              L{DNSLookupError<twisted.internet.error.DNSLookupError>} or
              L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>}
              when the number of consecutive failed connection attempts
              equals the value of "failAfterFailures"
        """

    @_machine.output()
    def _currentConnection(self, failAfterFailures=None):
        """
        Return the currently connected protocol.

        @return: L{Deferred} that is fired with currently connected protocol.
        """
        return succeed(self._currentConnection)

    @_machine.output()
    def _noConnection(self, failAfterFailures=None):
        """
        Notify the caller that no connection is expected.

        @return: L{Deferred} that is fired with L{CancelledError}.
        """
        return fail(CancelledError())

    @_machine.output()
    def _awaitingConnection(self, failAfterFailures=None):
        """
        Return a deferred that will fire with the next connected protocol.

        @return: L{Deferred} that will fire with the next connected protocol.
        """
        result = Deferred()
        self._awaitingConnected.append((result, failAfterFailures))
        return result

    @_machine.output()
    def _deferredSucceededWithNone(self):
        """
        Return a deferred that has already fired with L{None}.

        @return: A L{Deferred} that has already fired with L{None}.
        """
        return succeed(None)

    def _unawait(self, value):
        """
        Fire all outstanding L{ClientService.whenConnected} L{Deferred}s.

        @param value: the value to fire the L{Deferred}s with.
        """
        self._awaitingConnected, waiting = [], self._awaitingConnected
        for w, remaining in waiting:
            w.callback(value)

    @_machine.output()
    def _deliverConnectionFailure(self, f):
        """
        Deliver connection failures to any L{ClientService.whenConnected}
        L{Deferred}s that have met their failAfterFailures threshold.

        @param f: the Failure to fire the L{Deferred}s with.
        """
        ready = []
        notReady = []
        for w, remaining in self._awaitingConnected:
            if remaining is None:
                notReady.append((w, remaining))
            elif remaining <= 1:
                ready.append(w)
            else:
                notReady.append((w, remaining - 1))
        self._awaitingConnected = notReady
        for w in ready:
            w.callback(f)

    # State Transitions

    _init.upon(start, enter=_connecting, outputs=[_connect])
    _init.upon(
        stop,
        enter=_stopped,
        outputs=[_deferredSucceededWithNone],
        collector=_firstResult,
    )

    _connecting.upon(start, enter=_connecting, outputs=[])
    # Note that this synchonously triggers _connectionFailed in the
    # _disconnecting state.
    _connecting.upon(
        stop,
        enter=_disconnecting,
        outputs=[_waitForStop, _stopConnecting],
        collector=_firstResult,
    )
    _connecting.upon(_connectionMade, enter=_connected, outputs=[_notifyWaiters])
    _connecting.upon(
        _connectionFailed,
        enter=_waiting,
        outputs=[_ignoreAndWait, _deliverConnectionFailure],
    )

    _waiting.upon(start, enter=_waiting, outputs=[])
    _waiting.upon(
        stop,
        enter=_stopped,
        outputs=[_waitForStop, _cancelConnectWaiters, _stopRetrying, _finishStopping],
        collector=_firstResult,
    )
    _waiting.upon(_reconnect, enter=_connecting, outputs=[_connect])

    _connected.upon(start, enter=_connected, outputs=[])
    _connected.upon(
        stop,
        enter=_disconnecting,
        outputs=[_waitForStop, _disconnect],
        collector=_firstResult,
    )
    _connected.upon(
        _clientDisconnected, enter=_waiting, outputs=[_forgetConnection, _wait]
    )

    _disconnecting.upon(start, enter=_restarting, outputs=[_resetFailedAttempts])
    _disconnecting.upon(
        stop, enter=_disconnecting, outputs=[_waitForStop], collector=_firstResult
    )
    _disconnecting.upon(
        _clientDisconnected,
        enter=_stopped,
        outputs=[_cancelConnectWaiters, _finishStopping, _forgetConnection],
    )
    # Note that this is triggered synchonously with the transition from
    # _connecting
    _disconnecting.upon(
        _connectionFailed,
        enter=_stopped,
        outputs=[_ignoreAndCancelConnectWaiters, _ignoreAndFinishStopping],
    )

    _restarting.upon(start, enter=_restarting, outputs=[])
    _restarting.upon(
        stop, enter=_disconnecting, outputs=[_waitForStop], collector=_firstResult
    )
    _restarting.upon(
        _clientDisconnected, enter=_connecting, outputs=[_finishStopping, _connect]
    )

    _stopped.upon(start, enter=_connecting, outputs=[_connect])
    _stopped.upon(
        stop,
        enter=_stopped,
        outputs=[_deferredSucceededWithNone],
        collector=_firstResult,
    )

    _init.upon(
        whenConnected,
        enter=_init,
        outputs=[_awaitingConnection],
        collector=_firstResult,
    )
    _connecting.upon(
        whenConnected,
        enter=_connecting,
        outputs=[_awaitingConnection],
        collector=_firstResult,
    )
    _waiting.upon(
        whenConnected,
        enter=_waiting,
        outputs=[_awaitingConnection],
        collector=_firstResult,
    )
    _connected.upon(
        whenConnected,
        enter=_connected,
        outputs=[_currentConnection],
        collector=_firstResult,
    )
    _disconnecting.upon(
        whenConnected,
        enter=_disconnecting,
        outputs=[_awaitingConnection],
        collector=_firstResult,
    )
    _restarting.upon(
        whenConnected,
        enter=_restarting,
        outputs=[_awaitingConnection],
        collector=_firstResult,
    )
    _stopped.upon(
        whenConnected, enter=_stopped, outputs=[_noConnection], collector=_firstResult
    )


class ClientService(service.Service):
    """
    A L{ClientService} maintains a single outgoing connection to a client
    endpoint, reconnecting after a configurable timeout when a connection
    fails, either before or after connecting.

    @since: 16.1.0
    """

    _log = Logger()

    def __init__(
        self, endpoint, factory, retryPolicy=None, clock=None, prepareConnection=None
    ):
        """
        @param endpoint: A L{stream client endpoint
            <interfaces.IStreamClientEndpoint>} provider which will be used to
            connect when the service starts.

        @param factory: A L{protocol factory <interfaces.IProtocolFactory>}
            which will be used to create clients for the endpoint.

        @param retryPolicy: A policy configuring how long L{ClientService} will
            wait between attempts to connect to C{endpoint}.
        @type retryPolicy: callable taking (the number of failed connection
            attempts made in a row (L{int})) and returning the number of
            seconds to wait before making another attempt.

        @param clock: The clock used to schedule reconnection.  It's mainly
            useful to be parametrized in tests.  If the factory is serialized,
            this attribute will not be serialized, and the default value (the
            reactor) will be restored when deserialized.
        @type clock: L{IReactorTime}

        @param prepareConnection: A single argument L{callable} that may return
            a L{Deferred}. It will be called once with the L{protocol
            <interfaces.IProtocol>} each time a new connection is made.  It may
            call methods on the protocol to prepare it for use (e.g.
            authenticate) or validate it (check its health).

            The C{prepareConnection} callable may raise an exception or return
            a L{Deferred} which fails to reject the connection.  A rejected
            connection is not used to fire an L{Deferred} returned by
            L{whenConnected}.  Instead, L{ClientService} handles the failure
            and continues as if the connection attempt were a failure
            (incrementing the counter passed to C{retryPolicy}).

            L{Deferred}s returned by L{whenConnected} will not fire until
            any L{Deferred} returned by the C{prepareConnection} callable
            fire. Otherwise its successful return value is consumed, but
            ignored.

            Present Since Twisted 18.7.0

        @type prepareConnection: L{callable}

        """
        clock = _maybeGlobalReactor(clock)
        retryPolicy = _defaultPolicy if retryPolicy is None else retryPolicy

        self._machine = _ClientMachine(
            endpoint,
            factory,
            retryPolicy,
            clock,
            prepareConnection=prepareConnection,
            log=self._log,
        )

    def whenConnected(self, failAfterFailures=None):
        """
        Retrieve the currently-connected L{Protocol}, or the next one to
        connect.

        @param failAfterFailures: number of connection failures after which
            the Deferred will deliver a Failure (None means the Deferred will
            only fail if/when the service is stopped).  Set this to 1 to make
            the very first connection failure signal an error.  Use 2 to
            allow one failure but signal an error if the subsequent retry
            then fails.
        @type failAfterFailures: L{int} or None

        @return: a Deferred that fires with a protocol produced by the
            factory passed to C{__init__}
        @rtype: L{Deferred} that may:

            - fire with L{IProtocol}

            - fail with L{CancelledError} when the service is stopped

            - fail with e.g.
              L{DNSLookupError<twisted.internet.error.DNSLookupError>} or
              L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>}
              when the number of consecutive failed connection attempts
              equals the value of "failAfterFailures"
        """
        return self._machine.whenConnected(failAfterFailures)

    def startService(self):
        """
        Start this L{ClientService}, initiating the connection retry loop.
        """
        if self.running:
            self._log.warn("Duplicate ClientService.startService {log_source}")
            return
        super().startService()
        self._machine.start()

    def stopService(self):
        """
        Stop attempting to reconnect and close any existing connections.

        @return: a L{Deferred} that fires when all outstanding connections are
            closed and all in-progress connection attempts halted.
        """
        super().stopService()
        return self._machine.stop()


__all__ = [
    "TimerService",
    "CooperatorService",
    "MulticastServer",
    "StreamServerEndpointService",
    "UDPServer",
    "ClientService",
    "TCPServer",
    "TCPClient",
    "UNIXServer",
    "UNIXClient",
    "SSLServer",
    "SSLClient",
    "UNIXDatagramServer",
    "UNIXDatagramClient",
]