1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
General functional-style helpers for disttrial.
"""
from functools import partial, wraps
from typing import Awaitable, Callable, Iterable, Optional, TypeVar
from twisted.internet.defer import Deferred, succeed
_A = TypeVar("_A")
_B = TypeVar("_B")
_C = TypeVar("_C")
def fromOptional(default: _A, optional: Optional[_A]) -> _A:
"""
Get a definite value from an optional value.
@param default: The value to return if the optional value is missing.
@param optional: The optional value to return if it exists.
"""
if optional is None:
return default
return optional
def takeWhile(condition: Callable[[_A], bool], xs: Iterable[_A]) -> Iterable[_A]:
"""
:return: An iterable over C{xs} that stops when C{condition} returns
``False`` based on the value of iterated C{xs}.
"""
for x in xs:
if condition(x):
yield x
else:
break
async def sequence(a: Awaitable[_A], b: Awaitable[_B]) -> _B:
"""
Wait for one action to complete and then another.
If either action fails, failure is propagated. If the first action fails,
the second action is not waited on.
"""
await a
return await b
def flip(f: Callable[[_A, _B], _C]) -> Callable[[_B, _A], _C]:
"""
Create a function like another but with the order of the first two
arguments flipped.
"""
@wraps(f)
def g(b, a):
return f(a, b)
return g
def compose(fx: Callable[[_B], _C], fy: Callable[[_A], _B]) -> Callable[[_A], _C]:
"""
Create a function that calls one function with an argument and then
another function with the result of the first function.
"""
@wraps(fx)
@wraps(fy)
def g(a):
return fx(fy(a))
return g
# Discard the result of an awaitable and substitute None in its place.
#
# Ignore the `Cannot infer type argument 1 of "compose"`
# https://github.com/python/mypy/issues/6220
discardResult: Callable[[Awaitable[_A]], Deferred[None]] = compose( # type: ignore[misc]
Deferred.fromCoroutine,
partial(flip(sequence), succeed(None)),
)
async def iterateWhile(
predicate: Callable[[_A], bool],
action: Callable[[], Awaitable[_A]],
) -> _A:
"""
Call a function repeatedly until its result fails to satisfy a predicate.
@param predicate: The check to apply.
@param action: The function to call.
@return: The result of C{action} which did not satisfy C{predicate}.
"""
while True:
result = await action()
if not predicate(result):
return result
def countingCalls(f: Callable[[int], _A]) -> Callable[[], _A]:
"""
Wrap a function with another that automatically passes an integer counter
of the number of calls that have gone through the wrapper.
"""
counter = 0
def g() -> _A:
nonlocal counter
try:
result = f(counter)
finally:
counter += 1
return result
return g
|