aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/protocols/htb.py
blob: 66409d36fe6208cf7305ff5a36eb5c54ee41783d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# -*- test-case-name: twisted.test.test_htb -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.


"""
Hierarchical Token Bucket traffic shaping.

Patterned after U{Martin Devera's Hierarchical Token Bucket traffic
shaper for the Linux kernel<http://luxik.cdi.cz/~devik/qos/htb/>}.

@seealso: U{HTB Linux queuing discipline manual - user guide
  <http://luxik.cdi.cz/~devik/qos/htb/manual/userg.htm>}
@seealso: U{Token Bucket Filter in Linux Advanced Routing & Traffic Control
    HOWTO<http://lartc.org/howto/lartc.qdisc.classless.html#AEN682>}
"""


# TODO: Investigate whether we should be using os.times()[-1] instead of
# time.time.  time.time, it has been pointed out, can go backwards.  Is
# the same true of os.times?
from time import time
from typing import Optional

from zope.interface import Interface, implementer

from twisted.protocols import pcp


class Bucket:
    """
    Implementation of a Token bucket.

    A bucket can hold a certain number of tokens and it drains over time.

    @cvar maxburst: The maximum number of tokens that the bucket can
        hold at any given time. If this is L{None}, the bucket has
        an infinite size.
    @type maxburst: C{int}
    @cvar rate: The rate at which the bucket drains, in number
        of tokens per second. If the rate is L{None}, the bucket
        drains instantaneously.
    @type rate: C{int}
    """

    maxburst: Optional[int] = None
    rate: Optional[int] = None

    _refcount = 0

    def __init__(self, parentBucket=None):
        """
        Create a L{Bucket} that may have a parent L{Bucket}.

        @param parentBucket: If a parent Bucket is specified,
            all L{add} and L{drip} operations on this L{Bucket}
            will be applied on the parent L{Bucket} as well.
        @type parentBucket: L{Bucket}
        """
        self.content = 0
        self.parentBucket = parentBucket
        self.lastDrip = time()

    def add(self, amount):
        """
        Adds tokens to the L{Bucket} and its C{parentBucket}.

        This will add as many of the C{amount} tokens as will fit into both
        this L{Bucket} and its C{parentBucket}.

        @param amount: The number of tokens to try to add.
        @type amount: C{int}

        @returns: The number of tokens that actually fit.
        @returntype: C{int}
        """
        self.drip()
        if self.maxburst is None:
            allowable = amount
        else:
            allowable = min(amount, self.maxburst - self.content)

        if self.parentBucket is not None:
            allowable = self.parentBucket.add(allowable)
        self.content += allowable
        return allowable

    def drip(self):
        """
        Let some of the bucket drain.

        The L{Bucket} drains at the rate specified by the class
        variable C{rate}.

        @returns: C{True} if the bucket is empty after this drip.
        @returntype: C{bool}
        """
        if self.parentBucket is not None:
            self.parentBucket.drip()

        if self.rate is None:
            self.content = 0
        else:
            now = time()
            deltaTime = now - self.lastDrip
            deltaTokens = deltaTime * self.rate
            self.content = max(0, self.content - deltaTokens)
            self.lastDrip = now
        return self.content == 0


class IBucketFilter(Interface):
    def getBucketFor(*somethings, **some_kw):
        """
        Return a L{Bucket} corresponding to the provided parameters.

        @returntype: L{Bucket}
        """


@implementer(IBucketFilter)
class HierarchicalBucketFilter:
    """
    Filter things into buckets that can be nested.

    @cvar bucketFactory: Class of buckets to make.
    @type bucketFactory: L{Bucket}
    @cvar sweepInterval: Seconds between sweeping out the bucket cache.
    @type sweepInterval: C{int}
    """

    bucketFactory = Bucket
    sweepInterval: Optional[int] = None

    def __init__(self, parentFilter=None):
        self.buckets = {}
        self.parentFilter = parentFilter
        self.lastSweep = time()

    def getBucketFor(self, *a, **kw):
        """
        Find or create a L{Bucket} corresponding to the provided parameters.

        Any parameters are passed on to L{getBucketKey}, from them it
        decides which bucket you get.

        @returntype: L{Bucket}
        """
        if (self.sweepInterval is not None) and (
            (time() - self.lastSweep) > self.sweepInterval
        ):
            self.sweep()

        if self.parentFilter:
            parentBucket = self.parentFilter.getBucketFor(self, *a, **kw)
        else:
            parentBucket = None

        key = self.getBucketKey(*a, **kw)
        bucket = self.buckets.get(key)
        if bucket is None:
            bucket = self.bucketFactory(parentBucket)
            self.buckets[key] = bucket
        return bucket

    def getBucketKey(self, *a, **kw):
        """
        Construct a key based on the input parameters to choose a L{Bucket}.

        The default implementation returns the same key for all
        arguments. Override this method to provide L{Bucket} selection.

        @returns: Something to be used as a key in the bucket cache.
        """
        return None

    def sweep(self):
        """
        Remove empty buckets.
        """
        for key, bucket in self.buckets.items():
            bucket_is_empty = bucket.drip()
            if (bucket._refcount == 0) and bucket_is_empty:
                del self.buckets[key]

        self.lastSweep = time()


class FilterByHost(HierarchicalBucketFilter):
    """
    A Hierarchical Bucket filter with a L{Bucket} for each host.
    """

    sweepInterval = 60 * 20

    def getBucketKey(self, transport):
        return transport.getPeer()[1]


class FilterByServer(HierarchicalBucketFilter):
    """
    A Hierarchical Bucket filter with a L{Bucket} for each service.
    """

    sweepInterval = None

    def getBucketKey(self, transport):
        return transport.getHost()[2]


class ShapedConsumer(pcp.ProducerConsumerProxy):
    """
    Wraps a C{Consumer} and shapes the rate at which it receives data.
    """

    # Providing a Pull interface means I don't have to try to schedule
    # traffic with callLaters.
    iAmStreaming = False

    def __init__(self, consumer, bucket):
        pcp.ProducerConsumerProxy.__init__(self, consumer)
        self.bucket = bucket
        self.bucket._refcount += 1

    def _writeSomeData(self, data):
        # In practice, this actually results in obscene amounts of
        # overhead, as a result of generating lots and lots of packets
        # with twelve-byte payloads.  We may need to do a version of
        # this with scheduled writes after all.
        amount = self.bucket.add(len(data))
        return pcp.ProducerConsumerProxy._writeSomeData(self, data[:amount])

    def stopProducing(self):
        pcp.ProducerConsumerProxy.stopProducing(self)
        self.bucket._refcount -= 1


class ShapedTransport(ShapedConsumer):
    """
    Wraps a C{Transport} and shapes the rate at which it receives data.

    This is a L{ShapedConsumer} with a little bit of magic to provide for
    the case where the consumer it wraps is also a C{Transport} and people
    will be attempting to access attributes this does not proxy as a
    C{Consumer} (e.g. C{loseConnection}).
    """

    # Ugh.  We only wanted to filter IConsumer, not ITransport.

    iAmStreaming = False

    def __getattr__(self, name):
        # Because people will be doing things like .getPeer and
        # .loseConnection on me.
        return getattr(self.consumer, name)


class ShapedProtocolFactory:
    """
    Dispense C{Protocols} with traffic shaping on their transports.

    Usage::

        myserver = SomeFactory()
        myserver.protocol = ShapedProtocolFactory(myserver.protocol,
                                                  bucketFilter)

    Where C{SomeServerFactory} is a L{twisted.internet.protocol.Factory}, and
    C{bucketFilter} is an instance of L{HierarchicalBucketFilter}.
    """

    def __init__(self, protoClass, bucketFilter):
        """
        Tell me what to wrap and where to get buckets.

        @param protoClass: The class of C{Protocol} this will generate
          wrapped instances of.
        @type protoClass: L{Protocol<twisted.internet.interfaces.IProtocol>}
          class
        @param bucketFilter: The filter which will determine how
          traffic is shaped.
        @type bucketFilter: L{HierarchicalBucketFilter}.
        """
        # More precisely, protoClass can be any callable that will return
        # instances of something that implements IProtocol.
        self.protocol = protoClass
        self.bucketFilter = bucketFilter

    def __call__(self, *a, **kw):
        """
        Make a C{Protocol} instance with a shaped transport.

        Any parameters will be passed on to the protocol's initializer.

        @returns: A C{Protocol} instance with a L{ShapedTransport}.
        """
        proto = self.protocol(*a, **kw)
        origMakeConnection = proto.makeConnection

        def makeConnection(transport):
            bucket = self.bucketFilter.getBucketFor(transport)
            shapedTransport = ShapedTransport(transport, bucket)
            return origMakeConnection(shapedTransport)

        proto.makeConnection = makeConnection
        return proto