aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/trial/_dist/workerreporter.py
blob: 5f2d7a0cab12f852a8bd25c357532ed5c02b8c98 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# -*- test-case-name: twisted.trial._dist.test.test_workerreporter -*-
#
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Test reporter forwarding test results over trial distributed AMP commands.

@since: 12.3
"""

from types import TracebackType
from typing import Callable, List, Optional, Sequence, Type, TypeVar
from unittest import TestCase as PyUnitTestCase

from attrs import Factory, define
from typing_extensions import Literal

from twisted.internet.defer import Deferred, maybeDeferred
from twisted.protocols.amp import AMP, MAX_VALUE_LENGTH
from twisted.python.failure import Failure
from twisted.python.reflect import qual
from twisted.trial._dist import managercommands
from twisted.trial.reporter import TestResult
from ..reporter import TrialFailure
from .stream import chunk, stream

T = TypeVar("T")


async def addError(
    amp: AMP, testName: str, errorClass: str, error: str, frames: List[str]
) -> None:
    """
    Send an error to the worker manager over an AMP connection.

    First the pieces which can be large are streamed over the connection.
    Then, L{managercommands.AddError} is called with the rest of the
    information and the stream IDs.

    :param amp: The connection to use.
    :param testName: The name (or ID) of the test the error relates to.
    :param errorClass: The fully qualified name of the error type.
    :param error: The string representation of the error.
    :param frames: The lines of the traceback associated with the error.
    """

    errorStreamId = await stream(amp, chunk(error.encode("utf-8"), MAX_VALUE_LENGTH))
    framesStreamId = await stream(amp, (frame.encode("utf-8") for frame in frames))

    await amp.callRemote(
        managercommands.AddError,
        testName=testName,
        errorClass=errorClass,
        errorStreamId=errorStreamId,
        framesStreamId=framesStreamId,
    )


async def addFailure(
    amp: AMP, testName: str, fail: str, failClass: str, frames: List[str]
) -> None:
    """
    Like L{addError} but for failures.

    :param amp: See L{addError}
    :param testName: See L{addError}
    :param failClass: The fully qualified name of the exception associated
        with the failure.
    :param fail: The string representation of the failure.
    :param frames: The lines of the traceback associated with the error.
    """
    failStreamId = await stream(amp, chunk(fail.encode("utf-8"), MAX_VALUE_LENGTH))
    framesStreamId = await stream(amp, (frame.encode("utf-8") for frame in frames))

    await amp.callRemote(
        managercommands.AddFailure,
        testName=testName,
        failClass=failClass,
        failStreamId=failStreamId,
        framesStreamId=framesStreamId,
    )


async def addExpectedFailure(amp: AMP, testName: str, error: str, todo: str) -> None:
    """
    Like L{addError} but for expected failures.

    :param amp: See L{addError}
    :param testName: See L{addError}
    :param error: The string representation of the expected failure.
    :param todo: The string description of the expectation.
    """
    errorStreamId = await stream(amp, chunk(error.encode("utf-8"), MAX_VALUE_LENGTH))

    await amp.callRemote(
        managercommands.AddExpectedFailure,
        testName=testName,
        errorStreamId=errorStreamId,
        todo=todo,
    )


@define
class ReportingResults:
    """
    A mutable container for the result of sending test results back to the
    parent process.

    Since it is possible for these sends to fail asynchronously but the
    L{TestResult} protocol is not well suited for asynchronous result
    reporting, results are collected on an instance of this class and when the
    runner believes the test is otherwise complete, it can collect the results
    and do something with any errors.

    :ivar _reporter: The L{WorkerReporter} this object is associated with.
        This is the object doing the result reporting.

    :ivar _results: A list of L{Deferred} instances representing the results
        of reporting operations.  This is expected to grow over the course of
        the test run and then be inspected by the runner once the test is
        over.  The public interface to this list is via the context manager
        interface.
    """

    _reporter: "WorkerReporter"
    _results: List[Deferred[object]] = Factory(list)

    def __enter__(self) -> Sequence[Deferred[object]]:
        """
        Begin a new reportable context in which results can be collected.

        :return: A sequence which will contain the L{Deferred} instances
            representing the results of all test result reporting that happens
            while the context manager is active.  The sequence is extended as
            the test runs so its value should not be consumed until the test
            is over.
        """
        return self._results

    def __exit__(
        self,
        excType: Type[BaseException],
        excValue: BaseException,
        excTraceback: TracebackType,
    ) -> Literal[False]:
        """
        End the reportable context.
        """
        self._reporter._reporting = None
        return False

    def record(self, result: Deferred[object]) -> None:
        """
        Record a L{Deferred} instance representing one test result reporting
        operation.
        """
        self._results.append(result)


class WorkerReporter(TestResult):
    """
    Reporter for trial's distributed workers. We send things not through a
    stream, but through an C{AMP} protocol's C{callRemote} method.

    @ivar _DEFAULT_TODO: Default message for expected failures and
        unexpected successes, used only if a C{Todo} is not provided.

    @ivar _reporting: When a "result reporting" context is active, the
        corresponding context manager.  Otherwise, L{None}.
    """

    _DEFAULT_TODO = "Test expected to fail"

    ampProtocol: AMP
    _reporting: Optional[ReportingResults] = None

    def __init__(self, ampProtocol):
        """
        @param ampProtocol: The communication channel with the trial
            distributed manager which collects all test results.
        """
        super().__init__()
        self.ampProtocol = ampProtocol

    def gatherReportingResults(self) -> ReportingResults:
        """
        Get a "result reporting" context manager.

        In a "result reporting" context, asynchronous test result reporting
        methods may be used safely.  Their results (in particular, failures)
        are available from the context manager.
        """
        self._reporting = ReportingResults(self)
        return self._reporting

    def _getFailure(self, error: TrialFailure) -> Failure:
        """
        Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary.
        """
        if isinstance(error, tuple):
            return Failure(error[1], error[0], error[2])
        return error

    def _getFrames(self, failure: Failure) -> List[str]:
        """
        Extract frames from a C{Failure} instance.
        """
        frames: List[str] = []
        for frame in failure.frames:
            # The code object's name, the code object's filename, and the line
            # number.
            frames.extend([frame[0], frame[1], str(frame[2])])
        return frames

    def _call(self, f: Callable[[], T]) -> None:
        """
        Call L{f} if and only if a "result reporting" context is active.

        @param f: A function to call.  Its result is accumulated into the
            result reporting context.  It may return a L{Deferred} or a
            coroutine or synchronously raise an exception or return a result
            value.

        @raise ValueError: If no result reporting context is active.
        """
        if self._reporting is not None:
            self._reporting.record(maybeDeferred(f))
        else:
            raise ValueError(
                "Cannot call command outside of reporting context manager."
            )

    def addSuccess(self, test: PyUnitTestCase) -> None:
        """
        Send a success to the parent process.

        This must be called in context managed by L{gatherReportingResults}.
        """
        super().addSuccess(test)
        testName = test.id()
        self._call(
            lambda: self.ampProtocol.callRemote(
                managercommands.AddSuccess, testName=testName
            )
        )

    async def addErrorFallible(self, testName: str, errorObj: TrialFailure) -> None:
        """
        Attempt to report an error to the parent process.

        Unlike L{addError} this can fail asynchronously.  This version is for
        infrastructure code that can apply its own failure handling.

        @return: A L{Deferred} that fires with the result of the attempt.
        """
        failure = self._getFailure(errorObj)
        errorStr = failure.getErrorMessage()
        errorClass = qual(failure.type)
        frames = self._getFrames(failure)
        await addError(
            self.ampProtocol,
            testName,
            errorClass,
            errorStr,
            frames,
        )

    def addError(self, test: PyUnitTestCase, error: TrialFailure) -> None:
        """
        Send an error to the parent process.
        """
        super().addError(test, error)
        testName = test.id()
        self._call(lambda: self.addErrorFallible(testName, error))

    def addFailure(self, test: PyUnitTestCase, fail: TrialFailure) -> None:
        """
        Send a Failure over.
        """
        super().addFailure(test, fail)
        testName = test.id()
        failure = self._getFailure(fail)
        failureMessage = failure.getErrorMessage()
        failClass = qual(failure.type)
        frames = self._getFrames(failure)
        self._call(
            lambda: addFailure(
                self.ampProtocol,
                testName,
                failureMessage,
                failClass,
                frames,
            ),
        )

    def addSkip(self, test, reason):
        """
        Send a skip over.
        """
        super().addSkip(test, reason)
        reason = str(reason)
        testName = test.id()
        self._call(
            lambda: self.ampProtocol.callRemote(
                managercommands.AddSkip, testName=testName, reason=reason
            )
        )

    def _getTodoReason(self, todo):
        """
        Get the reason for a C{Todo}.

        If C{todo} is L{None}, return a sensible default.
        """
        if todo is None:
            return self._DEFAULT_TODO
        else:
            return todo.reason

    def addExpectedFailure(self, test, error, todo=None):
        """
        Send an expected failure over.
        """
        super().addExpectedFailure(test, error, todo)
        errorMessage = error.getErrorMessage()
        testName = test.id()
        self._call(
            lambda: addExpectedFailure(
                self.ampProtocol,
                testName=testName,
                error=errorMessage,
                todo=self._getTodoReason(todo),
            )
        )

    def addUnexpectedSuccess(self, test, todo=None):
        """
        Send an unexpected success over.
        """
        super().addUnexpectedSuccess(test, todo)
        testName = test.id()
        self._call(
            lambda: self.ampProtocol.callRemote(
                managercommands.AddUnexpectedSuccess,
                testName=testName,
                todo=self._getTodoReason(todo),
            )
        )

    def printSummary(self):
        """
        I{Don't} print a summary
        """