aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/trial/_dist/functional.py
blob: 8bb5ecf7a4febc48626fd887380c03753a444bc4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
General functional-style helpers for disttrial.
"""

from functools import partial, wraps
from typing import Awaitable, Callable, Iterable, Optional, TypeVar

from twisted.internet.defer import Deferred, succeed

_A = TypeVar("_A")
_B = TypeVar("_B")
_C = TypeVar("_C")


def fromOptional(default: _A, optional: Optional[_A]) -> _A:
    """
    Get a definite value from an optional value.

    @param default: The value to return if the optional value is missing.

    @param optional: The optional value to return if it exists.
    """
    if optional is None:
        return default
    return optional


def takeWhile(condition: Callable[[_A], bool], xs: Iterable[_A]) -> Iterable[_A]:
    """
    :return: An iterable over C{xs} that stops when C{condition} returns
        ``False`` based on the value of iterated C{xs}.
    """
    for x in xs:
        if condition(x):
            yield x
        else:
            break


async def sequence(a: Awaitable[_A], b: Awaitable[_B]) -> _B:
    """
    Wait for one action to complete and then another.

    If either action fails, failure is propagated.  If the first action fails,
    the second action is not waited on.
    """
    await a
    return await b


def flip(f: Callable[[_A, _B], _C]) -> Callable[[_B, _A], _C]:
    """
    Create a function like another but with the order of the first two
    arguments flipped.
    """

    @wraps(f)
    def g(b, a):
        return f(a, b)

    return g


def compose(fx: Callable[[_B], _C], fy: Callable[[_A], _B]) -> Callable[[_A], _C]:
    """
    Create a function that calls one function with an argument and then
    another function with the result of the first function.
    """

    @wraps(fx)
    @wraps(fy)
    def g(a):
        return fx(fy(a))

    return g


# Discard the result of an awaitable and substitute None in its place.
discardResult: Callable[[Awaitable[_A]], Deferred[None]] = compose(
    Deferred.fromCoroutine,
    partial(flip(sequence), succeed(None)),
)


async def iterateWhile(
    predicate: Callable[[_A], bool],
    action: Callable[[], Awaitable[_A]],
) -> _A:
    """
    Call a function repeatedly until its result fails to satisfy a predicate.

    @param predicate: The check to apply.

    @param action: The function to call.

    @return: The result of C{action} which did not satisfy C{predicate}.
    """
    while True:
        result = await action()
        if not predicate(result):
            return result


def countingCalls(f: Callable[[int], _A]) -> Callable[[], _A]:
    """
    Wrap a function with another that automatically passes an integer counter
    of the number of calls that have gone through the wrapper.
    """
    counter = 0

    def g() -> _A:
        nonlocal counter
        try:
            result = f(counter)
        finally:
            counter += 1
        return result

    return g