aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/internet/_producer_helpers.py
blob: 9b51ffaf302db431adda04a0eea152d383032c72 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# -*- test-case-name: twisted.test.test_producer_helpers -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Helpers for working with producers.
"""
from __future__ import division, absolute_import

from zope.interface import implementer

from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import cooperate
from twisted.python import log
from twisted.python.reflect import safe_str


# This module exports nothing public, it's for internal Twisted use only.
__all__ = []


@implementer(IPushProducer)
class _PullToPush(object):
    """
    An adapter that converts a non-streaming to a streaming producer.

    Because of limitations of the producer API, this adapter requires the
    cooperation of the consumer. When the consumer's C{registerProducer} is
    called with a non-streaming producer, it must wrap it with L{_PullToPush}
    and then call C{startStreaming} on the resulting object. When the
    consumer's C{unregisterProducer} is called, it must call
    C{stopStreaming} on the L{_PullToPush} instance.

    If the underlying producer throws an exception from C{resumeProducing},
    the producer will be unregistered from the consumer.

    @ivar _producer: the underling non-streaming producer.

    @ivar _consumer: the consumer with which the underlying producer was
                     registered.

    @ivar _finished: C{bool} indicating whether the producer has finished.

    @ivar _coopTask: the result of calling L{cooperate}, the task driving the
                     streaming producer.
    """

    _finished = False


    def __init__(self, pullProducer, consumer):
        self._producer = pullProducer
        self._consumer = consumer


    def _pull(self):
        """
        A generator that calls C{resumeProducing} on the underlying producer
        forever.

        If C{resumeProducing} throws an exception, the producer is
        unregistered, which should result in streaming stopping.
        """
        while True:
            try:
                self._producer.resumeProducing()
            except:
                log.err(None, "%s failed, producing will be stopped:" %
                        (safe_str(self._producer),))
                try:
                    self._consumer.unregisterProducer()
                    # The consumer should now call stopStreaming() on us,
                    # thus stopping the streaming.
                except:
                    # Since the consumer blew up, we may not have had
                    # stopStreaming() called, so we just stop on our own:
                    log.err(None, "%s failed to unregister producer:" %
                            (safe_str(self._consumer),))
                    self._finished = True
                    return
            yield None


    def startStreaming(self):
        """
        This should be called by the consumer when the producer is registered.

        Start streaming data to the consumer.
        """
        self._coopTask = cooperate(self._pull())


    def stopStreaming(self):
        """
        This should be called by the consumer when the producer is
        unregistered.

        Stop streaming data to the consumer.
        """
        if self._finished:
            return
        self._finished = True
        self._coopTask.stop()


    def pauseProducing(self):
        """
        @see: C{IPushProducer.pauseProducing}
        """
        self._coopTask.pause()


    def resumeProducing(self):
        """
        @see: C{IPushProducer.resumeProducing}
        """
        self._coopTask.resume()


    def stopProducing(self):
        """
        @see: C{IPushProducer.stopProducing}
        """
        self.stopStreaming()
        self._producer.stopProducing()