1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
"""
Posix asyncio event loop.
"""
from __future__ import unicode_literals
from ..terminal.vt100_input import InputStream
from .asyncio_base import AsyncioTimeout
from .base import EventLoop, INPUT_TIMEOUT
from .callbacks import EventLoopCallbacks
from .posix_utils import PosixStdinReader
import asyncio
import signal
__all__ = (
'PosixAsyncioEventLoop',
)
class PosixAsyncioEventLoop(EventLoop):
def __init__(self, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.closed = False
self._stopped_f = asyncio.Future(loop=self.loop)
@asyncio.coroutine
def run_as_coroutine(self, stdin, callbacks):
"""
The input 'event loop'.
"""
assert isinstance(callbacks, EventLoopCallbacks)
# Create reader class.
stdin_reader = PosixStdinReader(stdin.fileno())
if self.closed:
raise Exception('Event loop already closed.')
inputstream = InputStream(callbacks.feed_key)
try:
# Create a new Future every time.
self._stopped_f = asyncio.Future(loop=self.loop)
# Handle input timouts
def timeout_handler():
"""
When no input has been received for INPUT_TIMEOUT seconds,
flush the input stream and fire the timeout event.
"""
inputstream.flush()
callbacks.input_timeout()
timeout = AsyncioTimeout(INPUT_TIMEOUT, timeout_handler, self.loop)
# Catch sigwinch
def received_winch():
self.call_from_executor(callbacks.terminal_size_changed)
self.loop.add_signal_handler(signal.SIGWINCH, received_winch)
# Read input data.
def stdin_ready():
data = stdin_reader.read()
inputstream.feed(data)
timeout.reset()
# Quit when the input stream was closed.
if stdin_reader.closed:
self.stop()
self.loop.add_reader(stdin.fileno(), stdin_ready)
# Block this coroutine until stop() has been called.
for f in self._stopped_f:
yield f
finally:
# Clean up.
self.loop.remove_reader(stdin.fileno())
self.loop.remove_signal_handler(signal.SIGWINCH)
# Don't trigger any timeout events anymore.
timeout.stop()
def stop(self):
# Trigger the 'Stop' future.
self._stopped_f.set_result(True)
def close(self):
# Note: we should not close the asyncio loop itself, because that one
# was not created here.
self.closed = True
def run_in_executor(self, callback):
self.loop.run_in_executor(None, callback)
def call_from_executor(self, callback, _max_postpone_until=None):
"""
Call this function in the main event loop.
Similar to Twisted's ``callFromThread``.
"""
self.loop.call_soon_threadsafe(callback)
def add_reader(self, fd, callback):
" Start watching the file descriptor for read availability. "
self.loop.add_reader(fd, callback)
def remove_reader(self, fd):
" Stop watching the file descriptor for read availability. "
self.loop.remove_reader(fd)
|