aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/asyncio/unix_events.py
diff options
context:
space:
mode:
authororivej <orivej@yandex-team.ru>2022-02-10 16:44:49 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:49 +0300
commit718c552901d703c502ccbefdfc3c9028d608b947 (patch)
tree46534a98bbefcd7b1f3faa5b52c138ab27db75b7 /contrib/tools/python3/src/Lib/asyncio/unix_events.py
parente9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (diff)
downloadydb-718c552901d703c502ccbefdfc3c9028d608b947.tar.gz
Restoring authorship annotation for <orivej@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio/unix_events.py')
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/unix_events.py2162
1 files changed, 1081 insertions, 1081 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/unix_events.py b/contrib/tools/python3/src/Lib/asyncio/unix_events.py
index eecbc101ee1..9f3794b4201 100644
--- a/contrib/tools/python3/src/Lib/asyncio/unix_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/unix_events.py
@@ -1,191 +1,191 @@
-"""Selector event loop for Unix with signal handling."""
-
-import errno
-import io
+"""Selector event loop for Unix with signal handling."""
+
+import errno
+import io
import itertools
-import os
-import selectors
-import signal
-import socket
-import stat
-import subprocess
-import sys
-import threading
-import warnings
-
-from . import base_events
-from . import base_subprocess
-from . import constants
-from . import coroutines
-from . import events
+import os
+import selectors
+import signal
+import socket
+import stat
+import subprocess
+import sys
+import threading
+import warnings
+
+from . import base_events
+from . import base_subprocess
+from . import constants
+from . import coroutines
+from . import events
from . import exceptions
-from . import futures
-from . import selector_events
-from . import tasks
-from . import transports
-from .log import logger
-
-
-__all__ = (
- 'SelectorEventLoop',
- 'AbstractChildWatcher', 'SafeChildWatcher',
+from . import futures
+from . import selector_events
+from . import tasks
+from . import transports
+from .log import logger
+
+
+__all__ = (
+ 'SelectorEventLoop',
+ 'AbstractChildWatcher', 'SafeChildWatcher',
'FastChildWatcher', 'PidfdChildWatcher',
'MultiLoopChildWatcher', 'ThreadedChildWatcher',
'DefaultEventLoopPolicy',
-)
-
-
-if sys.platform == 'win32': # pragma: no cover
- raise ImportError('Signals are not really supported on Windows')
-
-
-def _sighandler_noop(signum, frame):
- """Dummy signal handler."""
- pass
-
-
-class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
- """Unix event loop.
-
- Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
- """
-
- def __init__(self, selector=None):
- super().__init__(selector)
- self._signal_handlers = {}
-
- def close(self):
- super().close()
- if not sys.is_finalizing():
- for sig in list(self._signal_handlers):
- self.remove_signal_handler(sig)
- else:
- if self._signal_handlers:
- warnings.warn(f"Closing the loop {self!r} "
- f"on interpreter shutdown "
- f"stage, skipping signal handlers removal",
- ResourceWarning,
- source=self)
- self._signal_handlers.clear()
-
- def _process_self_data(self, data):
- for signum in data:
- if not signum:
- # ignore null bytes written by _write_to_self()
- continue
- self._handle_signal(signum)
-
- def add_signal_handler(self, sig, callback, *args):
- """Add a handler for a signal. UNIX only.
-
- Raise ValueError if the signal number is invalid or uncatchable.
- Raise RuntimeError if there is a problem setting up the handler.
- """
- if (coroutines.iscoroutine(callback) or
- coroutines.iscoroutinefunction(callback)):
- raise TypeError("coroutines cannot be used "
- "with add_signal_handler()")
- self._check_signal(sig)
- self._check_closed()
- try:
- # set_wakeup_fd() raises ValueError if this is not the
- # main thread. By calling it early we ensure that an
- # event loop running in another thread cannot add a signal
- # handler.
- signal.set_wakeup_fd(self._csock.fileno())
- except (ValueError, OSError) as exc:
- raise RuntimeError(str(exc))
-
- handle = events.Handle(callback, args, self, None)
- self._signal_handlers[sig] = handle
-
- try:
- # Register a dummy signal handler to ask Python to write the signal
+)
+
+
+if sys.platform == 'win32': # pragma: no cover
+ raise ImportError('Signals are not really supported on Windows')
+
+
+def _sighandler_noop(signum, frame):
+ """Dummy signal handler."""
+ pass
+
+
+class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
+ """Unix event loop.
+
+ Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
+ """
+
+ def __init__(self, selector=None):
+ super().__init__(selector)
+ self._signal_handlers = {}
+
+ def close(self):
+ super().close()
+ if not sys.is_finalizing():
+ for sig in list(self._signal_handlers):
+ self.remove_signal_handler(sig)
+ else:
+ if self._signal_handlers:
+ warnings.warn(f"Closing the loop {self!r} "
+ f"on interpreter shutdown "
+ f"stage, skipping signal handlers removal",
+ ResourceWarning,
+ source=self)
+ self._signal_handlers.clear()
+
+ def _process_self_data(self, data):
+ for signum in data:
+ if not signum:
+ # ignore null bytes written by _write_to_self()
+ continue
+ self._handle_signal(signum)
+
+ def add_signal_handler(self, sig, callback, *args):
+ """Add a handler for a signal. UNIX only.
+
+ Raise ValueError if the signal number is invalid or uncatchable.
+ Raise RuntimeError if there is a problem setting up the handler.
+ """
+ if (coroutines.iscoroutine(callback) or
+ coroutines.iscoroutinefunction(callback)):
+ raise TypeError("coroutines cannot be used "
+ "with add_signal_handler()")
+ self._check_signal(sig)
+ self._check_closed()
+ try:
+ # set_wakeup_fd() raises ValueError if this is not the
+ # main thread. By calling it early we ensure that an
+ # event loop running in another thread cannot add a signal
+ # handler.
+ signal.set_wakeup_fd(self._csock.fileno())
+ except (ValueError, OSError) as exc:
+ raise RuntimeError(str(exc))
+
+ handle = events.Handle(callback, args, self, None)
+ self._signal_handlers[sig] = handle
+
+ try:
+ # Register a dummy signal handler to ask Python to write the signal
# number in the wakeup file descriptor. _process_self_data() will
- # read signal numbers from this file descriptor to handle signals.
- signal.signal(sig, _sighandler_noop)
-
- # Set SA_RESTART to limit EINTR occurrences.
- signal.siginterrupt(sig, False)
- except OSError as exc:
- del self._signal_handlers[sig]
- if not self._signal_handlers:
- try:
- signal.set_wakeup_fd(-1)
- except (ValueError, OSError) as nexc:
- logger.info('set_wakeup_fd(-1) failed: %s', nexc)
-
- if exc.errno == errno.EINVAL:
- raise RuntimeError(f'sig {sig} cannot be caught')
- else:
- raise
-
- def _handle_signal(self, sig):
- """Internal helper that is the actual signal handler."""
- handle = self._signal_handlers.get(sig)
- if handle is None:
- return # Assume it's some race condition.
- if handle._cancelled:
- self.remove_signal_handler(sig) # Remove it properly.
- else:
- self._add_callback_signalsafe(handle)
-
- def remove_signal_handler(self, sig):
- """Remove a handler for a signal. UNIX only.
-
- Return True if a signal handler was removed, False if not.
- """
- self._check_signal(sig)
- try:
- del self._signal_handlers[sig]
- except KeyError:
- return False
-
- if sig == signal.SIGINT:
- handler = signal.default_int_handler
- else:
- handler = signal.SIG_DFL
-
- try:
- signal.signal(sig, handler)
- except OSError as exc:
- if exc.errno == errno.EINVAL:
- raise RuntimeError(f'sig {sig} cannot be caught')
- else:
- raise
-
- if not self._signal_handlers:
- try:
- signal.set_wakeup_fd(-1)
- except (ValueError, OSError) as exc:
- logger.info('set_wakeup_fd(-1) failed: %s', exc)
-
- return True
-
- def _check_signal(self, sig):
- """Internal helper to validate a signal.
-
- Raise ValueError if the signal number is invalid or uncatchable.
- Raise RuntimeError if there is a problem setting up the handler.
- """
- if not isinstance(sig, int):
- raise TypeError(f'sig must be an int, not {sig!r}')
-
+ # read signal numbers from this file descriptor to handle signals.
+ signal.signal(sig, _sighandler_noop)
+
+ # Set SA_RESTART to limit EINTR occurrences.
+ signal.siginterrupt(sig, False)
+ except OSError as exc:
+ del self._signal_handlers[sig]
+ if not self._signal_handlers:
+ try:
+ signal.set_wakeup_fd(-1)
+ except (ValueError, OSError) as nexc:
+ logger.info('set_wakeup_fd(-1) failed: %s', nexc)
+
+ if exc.errno == errno.EINVAL:
+ raise RuntimeError(f'sig {sig} cannot be caught')
+ else:
+ raise
+
+ def _handle_signal(self, sig):
+ """Internal helper that is the actual signal handler."""
+ handle = self._signal_handlers.get(sig)
+ if handle is None:
+ return # Assume it's some race condition.
+ if handle._cancelled:
+ self.remove_signal_handler(sig) # Remove it properly.
+ else:
+ self._add_callback_signalsafe(handle)
+
+ def remove_signal_handler(self, sig):
+ """Remove a handler for a signal. UNIX only.
+
+ Return True if a signal handler was removed, False if not.
+ """
+ self._check_signal(sig)
+ try:
+ del self._signal_handlers[sig]
+ except KeyError:
+ return False
+
+ if sig == signal.SIGINT:
+ handler = signal.default_int_handler
+ else:
+ handler = signal.SIG_DFL
+
+ try:
+ signal.signal(sig, handler)
+ except OSError as exc:
+ if exc.errno == errno.EINVAL:
+ raise RuntimeError(f'sig {sig} cannot be caught')
+ else:
+ raise
+
+ if not self._signal_handlers:
+ try:
+ signal.set_wakeup_fd(-1)
+ except (ValueError, OSError) as exc:
+ logger.info('set_wakeup_fd(-1) failed: %s', exc)
+
+ return True
+
+ def _check_signal(self, sig):
+ """Internal helper to validate a signal.
+
+ Raise ValueError if the signal number is invalid or uncatchable.
+ Raise RuntimeError if there is a problem setting up the handler.
+ """
+ if not isinstance(sig, int):
+ raise TypeError(f'sig must be an int, not {sig!r}')
+
if sig not in signal.valid_signals():
raise ValueError(f'invalid signal number {sig}')
-
- def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
- extra=None):
- return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
-
- def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
- extra=None):
- return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
-
- async def _make_subprocess_transport(self, protocol, args, shell,
- stdin, stdout, stderr, bufsize,
- extra=None, **kwargs):
- with events.get_child_watcher() as watcher:
+
+ def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
+
+ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
+
+ async def _make_subprocess_transport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ extra=None, **kwargs):
+ with events.get_child_watcher() as watcher:
if not watcher.is_active():
# Check early.
# Raising exception before process creation
@@ -193,598 +193,598 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
# is not ready to handle it.
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
"subprocess support is not installed.")
- waiter = self.create_future()
- transp = _UnixSubprocessTransport(self, protocol, args, shell,
- stdin, stdout, stderr, bufsize,
- waiter=waiter, extra=extra,
- **kwargs)
-
- watcher.add_child_handler(transp.get_pid(),
- self._child_watcher_callback, transp)
- try:
- await waiter
+ waiter = self.create_future()
+ transp = _UnixSubprocessTransport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ waiter=waiter, extra=extra,
+ **kwargs)
+
+ watcher.add_child_handler(transp.get_pid(),
+ self._child_watcher_callback, transp)
+ try:
+ await waiter
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
- transp.close()
- await transp._wait()
- raise
-
- return transp
-
- def _child_watcher_callback(self, pid, returncode, transp):
- self.call_soon_threadsafe(transp._process_exited, returncode)
-
- async def create_unix_connection(
- self, protocol_factory, path=None, *,
- ssl=None, sock=None,
- server_hostname=None,
- ssl_handshake_timeout=None):
- assert server_hostname is None or isinstance(server_hostname, str)
- if ssl:
- if server_hostname is None:
- raise ValueError(
- 'you have to pass server_hostname when using ssl')
- else:
- if server_hostname is not None:
- raise ValueError('server_hostname is only meaningful with ssl')
- if ssl_handshake_timeout is not None:
- raise ValueError(
- 'ssl_handshake_timeout is only meaningful with ssl')
-
- if path is not None:
- if sock is not None:
- raise ValueError(
- 'path and sock can not be specified at the same time')
-
- path = os.fspath(path)
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
- try:
- sock.setblocking(False)
- await self.sock_connect(sock, path)
- except:
- sock.close()
- raise
-
- else:
- if sock is None:
- raise ValueError('no path and sock were specified')
- if (sock.family != socket.AF_UNIX or
- sock.type != socket.SOCK_STREAM):
- raise ValueError(
- f'A UNIX Domain Stream Socket was expected, got {sock!r}')
- sock.setblocking(False)
-
- transport, protocol = await self._create_connection_transport(
- sock, protocol_factory, ssl, server_hostname,
- ssl_handshake_timeout=ssl_handshake_timeout)
- return transport, protocol
-
- async def create_unix_server(
- self, protocol_factory, path=None, *,
- sock=None, backlog=100, ssl=None,
- ssl_handshake_timeout=None,
- start_serving=True):
- if isinstance(ssl, bool):
- raise TypeError('ssl argument must be an SSLContext or None')
-
- if ssl_handshake_timeout is not None and not ssl:
- raise ValueError(
- 'ssl_handshake_timeout is only meaningful with ssl')
-
- if path is not None:
- if sock is not None:
- raise ValueError(
- 'path and sock can not be specified at the same time')
-
- path = os.fspath(path)
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-
- # Check for abstract socket. `str` and `bytes` paths are supported.
- if path[0] not in (0, '\x00'):
- try:
- if stat.S_ISSOCK(os.stat(path).st_mode):
- os.remove(path)
- except FileNotFoundError:
- pass
- except OSError as err:
- # Directory may have permissions only to create socket.
- logger.error('Unable to check or remove stale UNIX socket '
- '%r: %r', path, err)
-
- try:
- sock.bind(path)
- except OSError as exc:
- sock.close()
- if exc.errno == errno.EADDRINUSE:
- # Let's improve the error message by adding
- # with what exact address it occurs.
- msg = f'Address {path!r} is already in use'
- raise OSError(errno.EADDRINUSE, msg) from None
- else:
- raise
- except:
- sock.close()
- raise
- else:
- if sock is None:
- raise ValueError(
- 'path was not specified, and no sock specified')
-
- if (sock.family != socket.AF_UNIX or
- sock.type != socket.SOCK_STREAM):
- raise ValueError(
- f'A UNIX Domain Stream Socket was expected, got {sock!r}')
-
- sock.setblocking(False)
- server = base_events.Server(self, [sock], protocol_factory,
- ssl, backlog, ssl_handshake_timeout)
- if start_serving:
- server._start_serving()
- # Skip one loop iteration so that all 'loop.add_reader'
- # go through.
+ transp.close()
+ await transp._wait()
+ raise
+
+ return transp
+
+ def _child_watcher_callback(self, pid, returncode, transp):
+ self.call_soon_threadsafe(transp._process_exited, returncode)
+
+ async def create_unix_connection(
+ self, protocol_factory, path=None, *,
+ ssl=None, sock=None,
+ server_hostname=None,
+ ssl_handshake_timeout=None):
+ assert server_hostname is None or isinstance(server_hostname, str)
+ if ssl:
+ if server_hostname is None:
+ raise ValueError(
+ 'you have to pass server_hostname when using ssl')
+ else:
+ if server_hostname is not None:
+ raise ValueError('server_hostname is only meaningful with ssl')
+ if ssl_handshake_timeout is not None:
+ raise ValueError(
+ 'ssl_handshake_timeout is only meaningful with ssl')
+
+ if path is not None:
+ if sock is not None:
+ raise ValueError(
+ 'path and sock can not be specified at the same time')
+
+ path = os.fspath(path)
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
+ try:
+ sock.setblocking(False)
+ await self.sock_connect(sock, path)
+ except:
+ sock.close()
+ raise
+
+ else:
+ if sock is None:
+ raise ValueError('no path and sock were specified')
+ if (sock.family != socket.AF_UNIX or
+ sock.type != socket.SOCK_STREAM):
+ raise ValueError(
+ f'A UNIX Domain Stream Socket was expected, got {sock!r}')
+ sock.setblocking(False)
+
+ transport, protocol = await self._create_connection_transport(
+ sock, protocol_factory, ssl, server_hostname,
+ ssl_handshake_timeout=ssl_handshake_timeout)
+ return transport, protocol
+
+ async def create_unix_server(
+ self, protocol_factory, path=None, *,
+ sock=None, backlog=100, ssl=None,
+ ssl_handshake_timeout=None,
+ start_serving=True):
+ if isinstance(ssl, bool):
+ raise TypeError('ssl argument must be an SSLContext or None')
+
+ if ssl_handshake_timeout is not None and not ssl:
+ raise ValueError(
+ 'ssl_handshake_timeout is only meaningful with ssl')
+
+ if path is not None:
+ if sock is not None:
+ raise ValueError(
+ 'path and sock can not be specified at the same time')
+
+ path = os.fspath(path)
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+
+ # Check for abstract socket. `str` and `bytes` paths are supported.
+ if path[0] not in (0, '\x00'):
+ try:
+ if stat.S_ISSOCK(os.stat(path).st_mode):
+ os.remove(path)
+ except FileNotFoundError:
+ pass
+ except OSError as err:
+ # Directory may have permissions only to create socket.
+ logger.error('Unable to check or remove stale UNIX socket '
+ '%r: %r', path, err)
+
+ try:
+ sock.bind(path)
+ except OSError as exc:
+ sock.close()
+ if exc.errno == errno.EADDRINUSE:
+ # Let's improve the error message by adding
+ # with what exact address it occurs.
+ msg = f'Address {path!r} is already in use'
+ raise OSError(errno.EADDRINUSE, msg) from None
+ else:
+ raise
+ except:
+ sock.close()
+ raise
+ else:
+ if sock is None:
+ raise ValueError(
+ 'path was not specified, and no sock specified')
+
+ if (sock.family != socket.AF_UNIX or
+ sock.type != socket.SOCK_STREAM):
+ raise ValueError(
+ f'A UNIX Domain Stream Socket was expected, got {sock!r}')
+
+ sock.setblocking(False)
+ server = base_events.Server(self, [sock], protocol_factory,
+ ssl, backlog, ssl_handshake_timeout)
+ if start_serving:
+ server._start_serving()
+ # Skip one loop iteration so that all 'loop.add_reader'
+ # go through.
await tasks.sleep(0)
-
- return server
-
- async def _sock_sendfile_native(self, sock, file, offset, count):
- try:
- os.sendfile
+
+ return server
+
+ async def _sock_sendfile_native(self, sock, file, offset, count):
+ try:
+ os.sendfile
except AttributeError:
raise exceptions.SendfileNotAvailableError(
- "os.sendfile() is not available")
- try:
- fileno = file.fileno()
- except (AttributeError, io.UnsupportedOperation) as err:
+ "os.sendfile() is not available")
+ try:
+ fileno = file.fileno()
+ except (AttributeError, io.UnsupportedOperation) as err:
raise exceptions.SendfileNotAvailableError("not a regular file")
- try:
- fsize = os.fstat(fileno).st_size
+ try:
+ fsize = os.fstat(fileno).st_size
except OSError:
raise exceptions.SendfileNotAvailableError("not a regular file")
- blocksize = count if count else fsize
- if not blocksize:
- return 0 # empty file
-
- fut = self.create_future()
- self._sock_sendfile_native_impl(fut, None, sock, fileno,
- offset, count, blocksize, 0)
- return await fut
-
- def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
- offset, count, blocksize, total_sent):
- fd = sock.fileno()
- if registered_fd is not None:
- # Remove the callback early. It should be rare that the
- # selector says the fd is ready but the call still returns
- # EAGAIN, and I am willing to take a hit in that case in
- # order to simplify the common case.
- self.remove_writer(registered_fd)
- if fut.cancelled():
- self._sock_sendfile_update_filepos(fileno, offset, total_sent)
- return
- if count:
- blocksize = count - total_sent
- if blocksize <= 0:
- self._sock_sendfile_update_filepos(fileno, offset, total_sent)
- fut.set_result(total_sent)
- return
-
- try:
- sent = os.sendfile(fd, fileno, offset, blocksize)
- except (BlockingIOError, InterruptedError):
- if registered_fd is None:
- self._sock_add_cancellation_callback(fut, sock)
- self.add_writer(fd, self._sock_sendfile_native_impl, fut,
- fd, sock, fileno,
- offset, count, blocksize, total_sent)
- except OSError as exc:
- if (registered_fd is not None and
- exc.errno == errno.ENOTCONN and
- type(exc) is not ConnectionError):
- # If we have an ENOTCONN and this isn't a first call to
- # sendfile(), i.e. the connection was closed in the middle
- # of the operation, normalize the error to ConnectionError
- # to make it consistent across all Posix systems.
- new_exc = ConnectionError(
- "socket is not connected", errno.ENOTCONN)
- new_exc.__cause__ = exc
- exc = new_exc
- if total_sent == 0:
- # We can get here for different reasons, the main
- # one being 'file' is not a regular mmap(2)-like
- # file, in which case we'll fall back on using
- # plain send().
+ blocksize = count if count else fsize
+ if not blocksize:
+ return 0 # empty file
+
+ fut = self.create_future()
+ self._sock_sendfile_native_impl(fut, None, sock, fileno,
+ offset, count, blocksize, 0)
+ return await fut
+
+ def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
+ offset, count, blocksize, total_sent):
+ fd = sock.fileno()
+ if registered_fd is not None:
+ # Remove the callback early. It should be rare that the
+ # selector says the fd is ready but the call still returns
+ # EAGAIN, and I am willing to take a hit in that case in
+ # order to simplify the common case.
+ self.remove_writer(registered_fd)
+ if fut.cancelled():
+ self._sock_sendfile_update_filepos(fileno, offset, total_sent)
+ return
+ if count:
+ blocksize = count - total_sent
+ if blocksize <= 0:
+ self._sock_sendfile_update_filepos(fileno, offset, total_sent)
+ fut.set_result(total_sent)
+ return
+
+ try:
+ sent = os.sendfile(fd, fileno, offset, blocksize)
+ except (BlockingIOError, InterruptedError):
+ if registered_fd is None:
+ self._sock_add_cancellation_callback(fut, sock)
+ self.add_writer(fd, self._sock_sendfile_native_impl, fut,
+ fd, sock, fileno,
+ offset, count, blocksize, total_sent)
+ except OSError as exc:
+ if (registered_fd is not None and
+ exc.errno == errno.ENOTCONN and
+ type(exc) is not ConnectionError):
+ # If we have an ENOTCONN and this isn't a first call to
+ # sendfile(), i.e. the connection was closed in the middle
+ # of the operation, normalize the error to ConnectionError
+ # to make it consistent across all Posix systems.
+ new_exc = ConnectionError(
+ "socket is not connected", errno.ENOTCONN)
+ new_exc.__cause__ = exc
+ exc = new_exc
+ if total_sent == 0:
+ # We can get here for different reasons, the main
+ # one being 'file' is not a regular mmap(2)-like
+ # file, in which case we'll fall back on using
+ # plain send().
err = exceptions.SendfileNotAvailableError(
- "os.sendfile call failed")
- self._sock_sendfile_update_filepos(fileno, offset, total_sent)
- fut.set_exception(err)
- else:
- self._sock_sendfile_update_filepos(fileno, offset, total_sent)
- fut.set_exception(exc)
+ "os.sendfile call failed")
+ self._sock_sendfile_update_filepos(fileno, offset, total_sent)
+ fut.set_exception(err)
+ else:
+ self._sock_sendfile_update_filepos(fileno, offset, total_sent)
+ fut.set_exception(exc)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
- self._sock_sendfile_update_filepos(fileno, offset, total_sent)
- fut.set_exception(exc)
- else:
- if sent == 0:
- # EOF
- self._sock_sendfile_update_filepos(fileno, offset, total_sent)
- fut.set_result(total_sent)
- else:
- offset += sent
- total_sent += sent
- if registered_fd is None:
- self._sock_add_cancellation_callback(fut, sock)
- self.add_writer(fd, self._sock_sendfile_native_impl, fut,
- fd, sock, fileno,
- offset, count, blocksize, total_sent)
-
- def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
- if total_sent > 0:
- os.lseek(fileno, offset, os.SEEK_SET)
-
- def _sock_add_cancellation_callback(self, fut, sock):
- def cb(fut):
- if fut.cancelled():
- fd = sock.fileno()
- if fd != -1:
- self.remove_writer(fd)
- fut.add_done_callback(cb)
-
-
-class _UnixReadPipeTransport(transports.ReadTransport):
-
- max_size = 256 * 1024 # max bytes we read in one event loop iteration
-
- def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
- super().__init__(extra)
- self._extra['pipe'] = pipe
- self._loop = loop
- self._pipe = pipe
- self._fileno = pipe.fileno()
- self._protocol = protocol
- self._closing = False
+ self._sock_sendfile_update_filepos(fileno, offset, total_sent)
+ fut.set_exception(exc)
+ else:
+ if sent == 0:
+ # EOF
+ self._sock_sendfile_update_filepos(fileno, offset, total_sent)
+ fut.set_result(total_sent)
+ else:
+ offset += sent
+ total_sent += sent
+ if registered_fd is None:
+ self._sock_add_cancellation_callback(fut, sock)
+ self.add_writer(fd, self._sock_sendfile_native_impl, fut,
+ fd, sock, fileno,
+ offset, count, blocksize, total_sent)
+
+ def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
+ if total_sent > 0:
+ os.lseek(fileno, offset, os.SEEK_SET)
+
+ def _sock_add_cancellation_callback(self, fut, sock):
+ def cb(fut):
+ if fut.cancelled():
+ fd = sock.fileno()
+ if fd != -1:
+ self.remove_writer(fd)
+ fut.add_done_callback(cb)
+
+
+class _UnixReadPipeTransport(transports.ReadTransport):
+
+ max_size = 256 * 1024 # max bytes we read in one event loop iteration
+
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+ super().__init__(extra)
+ self._extra['pipe'] = pipe
+ self._loop = loop
+ self._pipe = pipe
+ self._fileno = pipe.fileno()
+ self._protocol = protocol
+ self._closing = False
self._paused = False
-
- mode = os.fstat(self._fileno).st_mode
- if not (stat.S_ISFIFO(mode) or
- stat.S_ISSOCK(mode) or
- stat.S_ISCHR(mode)):
- self._pipe = None
- self._fileno = None
- self._protocol = None
- raise ValueError("Pipe transport is for pipes/sockets only.")
-
- os.set_blocking(self._fileno, False)
-
- self._loop.call_soon(self._protocol.connection_made, self)
- # only start reading when connection_made() has been called
- self._loop.call_soon(self._loop._add_reader,
- self._fileno, self._read_ready)
- if waiter is not None:
- # only wake up the waiter when connection_made() has been called
- self._loop.call_soon(futures._set_result_unless_cancelled,
- waiter, None)
-
- def __repr__(self):
- info = [self.__class__.__name__]
- if self._pipe is None:
- info.append('closed')
- elif self._closing:
- info.append('closing')
- info.append(f'fd={self._fileno}')
- selector = getattr(self._loop, '_selector', None)
- if self._pipe is not None and selector is not None:
- polling = selector_events._test_selector_event(
- selector, self._fileno, selectors.EVENT_READ)
- if polling:
- info.append('polling')
- else:
- info.append('idle')
- elif self._pipe is not None:
- info.append('open')
- else:
- info.append('closed')
- return '<{}>'.format(' '.join(info))
-
- def _read_ready(self):
- try:
- data = os.read(self._fileno, self.max_size)
- except (BlockingIOError, InterruptedError):
- pass
- except OSError as exc:
- self._fatal_error(exc, 'Fatal read error on pipe transport')
- else:
- if data:
- self._protocol.data_received(data)
- else:
- if self._loop.get_debug():
- logger.info("%r was closed by peer", self)
- self._closing = True
- self._loop._remove_reader(self._fileno)
- self._loop.call_soon(self._protocol.eof_received)
- self._loop.call_soon(self._call_connection_lost, None)
-
- def pause_reading(self):
+
+ mode = os.fstat(self._fileno).st_mode
+ if not (stat.S_ISFIFO(mode) or
+ stat.S_ISSOCK(mode) or
+ stat.S_ISCHR(mode)):
+ self._pipe = None
+ self._fileno = None
+ self._protocol = None
+ raise ValueError("Pipe transport is for pipes/sockets only.")
+
+ os.set_blocking(self._fileno, False)
+
+ self._loop.call_soon(self._protocol.connection_made, self)
+ # only start reading when connection_made() has been called
+ self._loop.call_soon(self._loop._add_reader,
+ self._fileno, self._read_ready)
+ if waiter is not None:
+ # only wake up the waiter when connection_made() has been called
+ self._loop.call_soon(futures._set_result_unless_cancelled,
+ waiter, None)
+
+ def __repr__(self):
+ info = [self.__class__.__name__]
+ if self._pipe is None:
+ info.append('closed')
+ elif self._closing:
+ info.append('closing')
+ info.append(f'fd={self._fileno}')
+ selector = getattr(self._loop, '_selector', None)
+ if self._pipe is not None and selector is not None:
+ polling = selector_events._test_selector_event(
+ selector, self._fileno, selectors.EVENT_READ)
+ if polling:
+ info.append('polling')
+ else:
+ info.append('idle')
+ elif self._pipe is not None:
+ info.append('open')
+ else:
+ info.append('closed')
+ return '<{}>'.format(' '.join(info))
+
+ def _read_ready(self):
+ try:
+ data = os.read(self._fileno, self.max_size)
+ except (BlockingIOError, InterruptedError):
+ pass
+ except OSError as exc:
+ self._fatal_error(exc, 'Fatal read error on pipe transport')
+ else:
+ if data:
+ self._protocol.data_received(data)
+ else:
+ if self._loop.get_debug():
+ logger.info("%r was closed by peer", self)
+ self._closing = True
+ self._loop._remove_reader(self._fileno)
+ self._loop.call_soon(self._protocol.eof_received)
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def pause_reading(self):
if self._closing or self._paused:
return
self._paused = True
- self._loop._remove_reader(self._fileno)
+ self._loop._remove_reader(self._fileno)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
-
- def resume_reading(self):
+
+ def resume_reading(self):
if self._closing or not self._paused:
return
self._paused = False
- self._loop._add_reader(self._fileno, self._read_ready)
+ self._loop._add_reader(self._fileno, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
-
- def set_protocol(self, protocol):
- self._protocol = protocol
-
- def get_protocol(self):
- return self._protocol
-
- def is_closing(self):
- return self._closing
-
- def close(self):
- if not self._closing:
- self._close(None)
-
+
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_protocol(self):
+ return self._protocol
+
+ def is_closing(self):
+ return self._closing
+
+ def close(self):
+ if not self._closing:
+ self._close(None)
+
def __del__(self, _warn=warnings.warn):
- if self._pipe is not None:
+ if self._pipe is not None:
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
- self._pipe.close()
-
- def _fatal_error(self, exc, message='Fatal error on pipe transport'):
- # should be called by exception handler only
- if (isinstance(exc, OSError) and exc.errno == errno.EIO):
- if self._loop.get_debug():
- logger.debug("%r: %s", self, message, exc_info=True)
- else:
- self._loop.call_exception_handler({
- 'message': message,
- 'exception': exc,
- 'transport': self,
- 'protocol': self._protocol,
- })
- self._close(exc)
-
- def _close(self, exc):
- self._closing = True
- self._loop._remove_reader(self._fileno)
- self._loop.call_soon(self._call_connection_lost, exc)
-
- def _call_connection_lost(self, exc):
- try:
- self._protocol.connection_lost(exc)
- finally:
- self._pipe.close()
- self._pipe = None
- self._protocol = None
- self._loop = None
-
-
-class _UnixWritePipeTransport(transports._FlowControlMixin,
- transports.WriteTransport):
-
- def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
- super().__init__(extra, loop)
- self._extra['pipe'] = pipe
- self._pipe = pipe
- self._fileno = pipe.fileno()
- self._protocol = protocol
- self._buffer = bytearray()
- self._conn_lost = 0
- self._closing = False # Set when close() or write_eof() called.
-
- mode = os.fstat(self._fileno).st_mode
- is_char = stat.S_ISCHR(mode)
- is_fifo = stat.S_ISFIFO(mode)
- is_socket = stat.S_ISSOCK(mode)
- if not (is_char or is_fifo or is_socket):
- self._pipe = None
- self._fileno = None
- self._protocol = None
- raise ValueError("Pipe transport is only for "
- "pipes, sockets and character devices")
-
- os.set_blocking(self._fileno, False)
- self._loop.call_soon(self._protocol.connection_made, self)
-
- # On AIX, the reader trick (to be notified when the read end of the
- # socket is closed) only works for sockets. On other platforms it
- # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
- if is_socket or (is_fifo and not sys.platform.startswith("aix")):
- # only start reading when connection_made() has been called
- self._loop.call_soon(self._loop._add_reader,
- self._fileno, self._read_ready)
-
- if waiter is not None:
- # only wake up the waiter when connection_made() has been called
- self._loop.call_soon(futures._set_result_unless_cancelled,
- waiter, None)
-
- def __repr__(self):
- info = [self.__class__.__name__]
- if self._pipe is None:
- info.append('closed')
- elif self._closing:
- info.append('closing')
- info.append(f'fd={self._fileno}')
- selector = getattr(self._loop, '_selector', None)
- if self._pipe is not None and selector is not None:
- polling = selector_events._test_selector_event(
- selector, self._fileno, selectors.EVENT_WRITE)
- if polling:
- info.append('polling')
- else:
- info.append('idle')
-
- bufsize = self.get_write_buffer_size()
- info.append(f'bufsize={bufsize}')
- elif self._pipe is not None:
- info.append('open')
- else:
- info.append('closed')
- return '<{}>'.format(' '.join(info))
-
- def get_write_buffer_size(self):
- return len(self._buffer)
-
- def _read_ready(self):
- # Pipe was closed by peer.
- if self._loop.get_debug():
- logger.info("%r was closed by peer", self)
- if self._buffer:
- self._close(BrokenPipeError())
- else:
- self._close()
-
- def write(self, data):
- assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
- if isinstance(data, bytearray):
- data = memoryview(data)
- if not data:
- return
-
- if self._conn_lost or self._closing:
- if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
- logger.warning('pipe closed by peer or '
- 'os.write(pipe, data) raised exception.')
- self._conn_lost += 1
- return
-
- if not self._buffer:
- # Attempt to send it right away first.
- try:
- n = os.write(self._fileno, data)
- except (BlockingIOError, InterruptedError):
- n = 0
+ self._pipe.close()
+
+ def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+ # should be called by exception handler only
+ if (isinstance(exc, OSError) and exc.errno == errno.EIO):
+ if self._loop.get_debug():
+ logger.debug("%r: %s", self, message, exc_info=True)
+ else:
+ self._loop.call_exception_handler({
+ 'message': message,
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+ self._close(exc)
+
+ def _close(self, exc):
+ self._closing = True
+ self._loop._remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._pipe.close()
+ self._pipe = None
+ self._protocol = None
+ self._loop = None
+
+
+class _UnixWritePipeTransport(transports._FlowControlMixin,
+ transports.WriteTransport):
+
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+ super().__init__(extra, loop)
+ self._extra['pipe'] = pipe
+ self._pipe = pipe
+ self._fileno = pipe.fileno()
+ self._protocol = protocol
+ self._buffer = bytearray()
+ self._conn_lost = 0
+ self._closing = False # Set when close() or write_eof() called.
+
+ mode = os.fstat(self._fileno).st_mode
+ is_char = stat.S_ISCHR(mode)
+ is_fifo = stat.S_ISFIFO(mode)
+ is_socket = stat.S_ISSOCK(mode)
+ if not (is_char or is_fifo or is_socket):
+ self._pipe = None
+ self._fileno = None
+ self._protocol = None
+ raise ValueError("Pipe transport is only for "
+ "pipes, sockets and character devices")
+
+ os.set_blocking(self._fileno, False)
+ self._loop.call_soon(self._protocol.connection_made, self)
+
+ # On AIX, the reader trick (to be notified when the read end of the
+ # socket is closed) only works for sockets. On other platforms it
+ # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
+ if is_socket or (is_fifo and not sys.platform.startswith("aix")):
+ # only start reading when connection_made() has been called
+ self._loop.call_soon(self._loop._add_reader,
+ self._fileno, self._read_ready)
+
+ if waiter is not None:
+ # only wake up the waiter when connection_made() has been called
+ self._loop.call_soon(futures._set_result_unless_cancelled,
+ waiter, None)
+
+ def __repr__(self):
+ info = [self.__class__.__name__]
+ if self._pipe is None:
+ info.append('closed')
+ elif self._closing:
+ info.append('closing')
+ info.append(f'fd={self._fileno}')
+ selector = getattr(self._loop, '_selector', None)
+ if self._pipe is not None and selector is not None:
+ polling = selector_events._test_selector_event(
+ selector, self._fileno, selectors.EVENT_WRITE)
+ if polling:
+ info.append('polling')
+ else:
+ info.append('idle')
+
+ bufsize = self.get_write_buffer_size()
+ info.append(f'bufsize={bufsize}')
+ elif self._pipe is not None:
+ info.append('open')
+ else:
+ info.append('closed')
+ return '<{}>'.format(' '.join(info))
+
+ def get_write_buffer_size(self):
+ return len(self._buffer)
+
+ def _read_ready(self):
+ # Pipe was closed by peer.
+ if self._loop.get_debug():
+ logger.info("%r was closed by peer", self)
+ if self._buffer:
+ self._close(BrokenPipeError())
+ else:
+ self._close()
+
+ def write(self, data):
+ assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
+ if isinstance(data, bytearray):
+ data = memoryview(data)
+ if not data:
+ return
+
+ if self._conn_lost or self._closing:
+ if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+ logger.warning('pipe closed by peer or '
+ 'os.write(pipe, data) raised exception.')
+ self._conn_lost += 1
+ return
+
+ if not self._buffer:
+ # Attempt to send it right away first.
+ try:
+ n = os.write(self._fileno, data)
+ except (BlockingIOError, InterruptedError):
+ n = 0
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
- self._conn_lost += 1
- self._fatal_error(exc, 'Fatal write error on pipe transport')
- return
- if n == len(data):
- return
- elif n > 0:
- data = memoryview(data)[n:]
- self._loop._add_writer(self._fileno, self._write_ready)
-
- self._buffer += data
- self._maybe_pause_protocol()
-
- def _write_ready(self):
- assert self._buffer, 'Data should not be empty'
-
- try:
- n = os.write(self._fileno, self._buffer)
- except (BlockingIOError, InterruptedError):
- pass
+ self._conn_lost += 1
+ self._fatal_error(exc, 'Fatal write error on pipe transport')
+ return
+ if n == len(data):
+ return
+ elif n > 0:
+ data = memoryview(data)[n:]
+ self._loop._add_writer(self._fileno, self._write_ready)
+
+ self._buffer += data
+ self._maybe_pause_protocol()
+
+ def _write_ready(self):
+ assert self._buffer, 'Data should not be empty'
+
+ try:
+ n = os.write(self._fileno, self._buffer)
+ except (BlockingIOError, InterruptedError):
+ pass
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
- self._buffer.clear()
- self._conn_lost += 1
- # Remove writer here, _fatal_error() doesn't it
- # because _buffer is empty.
- self._loop._remove_writer(self._fileno)
- self._fatal_error(exc, 'Fatal write error on pipe transport')
- else:
- if n == len(self._buffer):
- self._buffer.clear()
- self._loop._remove_writer(self._fileno)
- self._maybe_resume_protocol() # May append to buffer.
- if self._closing:
- self._loop._remove_reader(self._fileno)
- self._call_connection_lost(None)
- return
- elif n > 0:
- del self._buffer[:n]
-
- def can_write_eof(self):
- return True
-
- def write_eof(self):
- if self._closing:
- return
- assert self._pipe
- self._closing = True
- if not self._buffer:
- self._loop._remove_reader(self._fileno)
- self._loop.call_soon(self._call_connection_lost, None)
-
- def set_protocol(self, protocol):
- self._protocol = protocol
-
- def get_protocol(self):
- return self._protocol
-
- def is_closing(self):
- return self._closing
-
- def close(self):
- if self._pipe is not None and not self._closing:
- # write_eof is all what we needed to close the write pipe
- self.write_eof()
-
+ self._buffer.clear()
+ self._conn_lost += 1
+ # Remove writer here, _fatal_error() doesn't it
+ # because _buffer is empty.
+ self._loop._remove_writer(self._fileno)
+ self._fatal_error(exc, 'Fatal write error on pipe transport')
+ else:
+ if n == len(self._buffer):
+ self._buffer.clear()
+ self._loop._remove_writer(self._fileno)
+ self._maybe_resume_protocol() # May append to buffer.
+ if self._closing:
+ self._loop._remove_reader(self._fileno)
+ self._call_connection_lost(None)
+ return
+ elif n > 0:
+ del self._buffer[:n]
+
+ def can_write_eof(self):
+ return True
+
+ def write_eof(self):
+ if self._closing:
+ return
+ assert self._pipe
+ self._closing = True
+ if not self._buffer:
+ self._loop._remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_protocol(self):
+ return self._protocol
+
+ def is_closing(self):
+ return self._closing
+
+ def close(self):
+ if self._pipe is not None and not self._closing:
+ # write_eof is all what we needed to close the write pipe
+ self.write_eof()
+
def __del__(self, _warn=warnings.warn):
- if self._pipe is not None:
+ if self._pipe is not None:
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
- self._pipe.close()
-
- def abort(self):
- self._close(None)
-
- def _fatal_error(self, exc, message='Fatal error on pipe transport'):
- # should be called by exception handler only
+ self._pipe.close()
+
+ def abort(self):
+ self._close(None)
+
+ def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+ # should be called by exception handler only
if isinstance(exc, OSError):
- if self._loop.get_debug():
- logger.debug("%r: %s", self, message, exc_info=True)
- else:
- self._loop.call_exception_handler({
- 'message': message,
- 'exception': exc,
- 'transport': self,
- 'protocol': self._protocol,
- })
- self._close(exc)
-
- def _close(self, exc=None):
- self._closing = True
- if self._buffer:
- self._loop._remove_writer(self._fileno)
- self._buffer.clear()
- self._loop._remove_reader(self._fileno)
- self._loop.call_soon(self._call_connection_lost, exc)
-
- def _call_connection_lost(self, exc):
- try:
- self._protocol.connection_lost(exc)
- finally:
- self._pipe.close()
- self._pipe = None
- self._protocol = None
- self._loop = None
-
-
-class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
-
- def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
- stdin_w = None
- if stdin == subprocess.PIPE:
- # Use a socket pair for stdin, since not all platforms
- # support selecting read events on the write end of a
- # socket (which we use in order to detect closing of the
- # other end). Notably this is needed on AIX, and works
- # just fine on other platforms.
- stdin, stdin_w = socket.socketpair()
+ if self._loop.get_debug():
+ logger.debug("%r: %s", self, message, exc_info=True)
+ else:
+ self._loop.call_exception_handler({
+ 'message': message,
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+ self._close(exc)
+
+ def _close(self, exc=None):
+ self._closing = True
+ if self._buffer:
+ self._loop._remove_writer(self._fileno)
+ self._buffer.clear()
+ self._loop._remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._pipe.close()
+ self._pipe = None
+ self._protocol = None
+ self._loop = None
+
+
+class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
+
+ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+ stdin_w = None
+ if stdin == subprocess.PIPE:
+ # Use a socket pair for stdin, since not all platforms
+ # support selecting read events on the write end of a
+ # socket (which we use in order to detect closing of the
+ # other end). Notably this is needed on AIX, and works
+ # just fine on other platforms.
+ stdin, stdin_w = socket.socketpair()
try:
self._proc = subprocess.Popen(
args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
@@ -797,67 +797,67 @@ class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
if stdin_w is not None:
stdin.close()
stdin_w.close()
-
-
-class AbstractChildWatcher:
- """Abstract base class for monitoring child processes.
-
- Objects derived from this class monitor a collection of subprocesses and
- report their termination or interruption by a signal.
-
- New callbacks are registered with .add_child_handler(). Starting a new
- process must be done within a 'with' block to allow the watcher to suspend
- its activity until the new process if fully registered (this is needed to
- prevent a race condition in some implementations).
-
- Example:
- with watcher:
- proc = subprocess.Popen("sleep 1")
- watcher.add_child_handler(proc.pid, callback)
-
- Notes:
- Implementations of this class must be thread-safe.
-
- Since child watcher objects may catch the SIGCHLD signal and call
- waitpid(-1), there should be only one active object per process.
- """
-
- def add_child_handler(self, pid, callback, *args):
- """Register a new child handler.
-
- Arrange for callback(pid, returncode, *args) to be called when
- process 'pid' terminates. Specifying another callback for the same
- process replaces the previous handler.
-
- Note: callback() must be thread-safe.
- """
- raise NotImplementedError()
-
- def remove_child_handler(self, pid):
- """Removes the handler for process 'pid'.
-
- The function returns True if the handler was successfully removed,
- False if there was nothing to remove."""
-
- raise NotImplementedError()
-
- def attach_loop(self, loop):
- """Attach the watcher to an event loop.
-
- If the watcher was previously attached to an event loop, then it is
- first detached before attaching to the new loop.
-
- Note: loop may be None.
- """
- raise NotImplementedError()
-
- def close(self):
- """Close the watcher.
-
- This must be called to make sure that any underlying resource is freed.
- """
- raise NotImplementedError()
-
+
+
+class AbstractChildWatcher:
+ """Abstract base class for monitoring child processes.
+
+ Objects derived from this class monitor a collection of subprocesses and
+ report their termination or interruption by a signal.
+
+ New callbacks are registered with .add_child_handler(). Starting a new
+ process must be done within a 'with' block to allow the watcher to suspend
+ its activity until the new process if fully registered (this is needed to
+ prevent a race condition in some implementations).
+
+ Example:
+ with watcher:
+ proc = subprocess.Popen("sleep 1")
+ watcher.add_child_handler(proc.pid, callback)
+
+ Notes:
+ Implementations of this class must be thread-safe.
+
+ Since child watcher objects may catch the SIGCHLD signal and call
+ waitpid(-1), there should be only one active object per process.
+ """
+
+ def add_child_handler(self, pid, callback, *args):
+ """Register a new child handler.
+
+ Arrange for callback(pid, returncode, *args) to be called when
+ process 'pid' terminates. Specifying another callback for the same
+ process replaces the previous handler.
+
+ Note: callback() must be thread-safe.
+ """
+ raise NotImplementedError()
+
+ def remove_child_handler(self, pid):
+ """Removes the handler for process 'pid'.
+
+ The function returns True if the handler was successfully removed,
+ False if there was nothing to remove."""
+
+ raise NotImplementedError()
+
+ def attach_loop(self, loop):
+ """Attach the watcher to an event loop.
+
+ If the watcher was previously attached to an event loop, then it is
+ first detached before attaching to the new loop.
+
+ Note: loop may be None.
+ """
+ raise NotImplementedError()
+
+ def close(self):
+ """Close the watcher.
+
+ This must be called to make sure that any underlying resource is freed.
+ """
+ raise NotImplementedError()
+
def is_active(self):
"""Return ``True`` if the watcher is active and is used by the event loop.
@@ -867,17 +867,17 @@ class AbstractChildWatcher:
"""
raise NotImplementedError()
- def __enter__(self):
- """Enter the watcher's context and allow starting new processes
-
- This function must return self"""
- raise NotImplementedError()
-
- def __exit__(self, a, b, c):
- """Exit the watcher's context"""
- raise NotImplementedError()
-
-
+ def __enter__(self):
+ """Enter the watcher's context and allow starting new processes
+
+ This function must return self"""
+ raise NotImplementedError()
+
+ def __exit__(self, a, b, c):
+ """Exit the watcher's context"""
+ raise NotImplementedError()
+
+
class PidfdChildWatcher(AbstractChildWatcher):
"""Child watcher implementation using Linux's pid file descriptors.
@@ -970,238 +970,238 @@ def _compute_returncode(status):
return status
-class BaseChildWatcher(AbstractChildWatcher):
-
- def __init__(self):
- self._loop = None
- self._callbacks = {}
-
- def close(self):
- self.attach_loop(None)
-
+class BaseChildWatcher(AbstractChildWatcher):
+
+ def __init__(self):
+ self._loop = None
+ self._callbacks = {}
+
+ def close(self):
+ self.attach_loop(None)
+
def is_active(self):
return self._loop is not None and self._loop.is_running()
- def _do_waitpid(self, expected_pid):
- raise NotImplementedError()
-
- def _do_waitpid_all(self):
- raise NotImplementedError()
-
- def attach_loop(self, loop):
- assert loop is None or isinstance(loop, events.AbstractEventLoop)
-
- if self._loop is not None and loop is None and self._callbacks:
- warnings.warn(
- 'A loop is being detached '
- 'from a child watcher with pending handlers',
- RuntimeWarning)
-
- if self._loop is not None:
- self._loop.remove_signal_handler(signal.SIGCHLD)
-
- self._loop = loop
- if loop is not None:
- loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
-
- # Prevent a race condition in case a child terminated
- # during the switch.
- self._do_waitpid_all()
-
- def _sig_chld(self):
- try:
- self._do_waitpid_all()
+ def _do_waitpid(self, expected_pid):
+ raise NotImplementedError()
+
+ def _do_waitpid_all(self):
+ raise NotImplementedError()
+
+ def attach_loop(self, loop):
+ assert loop is None or isinstance(loop, events.AbstractEventLoop)
+
+ if self._loop is not None and loop is None and self._callbacks:
+ warnings.warn(
+ 'A loop is being detached '
+ 'from a child watcher with pending handlers',
+ RuntimeWarning)
+
+ if self._loop is not None:
+ self._loop.remove_signal_handler(signal.SIGCHLD)
+
+ self._loop = loop
+ if loop is not None:
+ loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
+
+ # Prevent a race condition in case a child terminated
+ # during the switch.
+ self._do_waitpid_all()
+
+ def _sig_chld(self):
+ try:
+ self._do_waitpid_all()
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
- # self._loop should always be available here
- # as '_sig_chld' is added as a signal handler
- # in 'attach_loop'
- self._loop.call_exception_handler({
- 'message': 'Unknown exception in SIGCHLD handler',
- 'exception': exc,
- })
-
-
-class SafeChildWatcher(BaseChildWatcher):
- """'Safe' child watcher implementation.
-
- This implementation avoids disrupting other code spawning processes by
- polling explicitly each process in the SIGCHLD handler instead of calling
- os.waitpid(-1).
-
- This is a safe solution but it has a significant overhead when handling a
- big number of children (O(n) each time SIGCHLD is raised)
- """
-
- def close(self):
- self._callbacks.clear()
- super().close()
-
- def __enter__(self):
- return self
-
- def __exit__(self, a, b, c):
- pass
-
- def add_child_handler(self, pid, callback, *args):
- self._callbacks[pid] = (callback, args)
-
- # Prevent a race condition in case the child is already terminated.
- self._do_waitpid(pid)
-
- def remove_child_handler(self, pid):
- try:
- del self._callbacks[pid]
- return True
- except KeyError:
- return False
-
- def _do_waitpid_all(self):
-
- for pid in list(self._callbacks):
- self._do_waitpid(pid)
-
- def _do_waitpid(self, expected_pid):
- assert expected_pid > 0
-
- try:
- pid, status = os.waitpid(expected_pid, os.WNOHANG)
- except ChildProcessError:
- # The child process is already reaped
- # (may happen if waitpid() is called elsewhere).
- pid = expected_pid
- returncode = 255
- logger.warning(
- "Unknown child process pid %d, will report returncode 255",
- pid)
- else:
- if pid == 0:
- # The child process is still alive.
- return
-
+ # self._loop should always be available here
+ # as '_sig_chld' is added as a signal handler
+ # in 'attach_loop'
+ self._loop.call_exception_handler({
+ 'message': 'Unknown exception in SIGCHLD handler',
+ 'exception': exc,
+ })
+
+
+class SafeChildWatcher(BaseChildWatcher):
+ """'Safe' child watcher implementation.
+
+ This implementation avoids disrupting other code spawning processes by
+ polling explicitly each process in the SIGCHLD handler instead of calling
+ os.waitpid(-1).
+
+ This is a safe solution but it has a significant overhead when handling a
+ big number of children (O(n) each time SIGCHLD is raised)
+ """
+
+ def close(self):
+ self._callbacks.clear()
+ super().close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, a, b, c):
+ pass
+
+ def add_child_handler(self, pid, callback, *args):
+ self._callbacks[pid] = (callback, args)
+
+ # Prevent a race condition in case the child is already terminated.
+ self._do_waitpid(pid)
+
+ def remove_child_handler(self, pid):
+ try:
+ del self._callbacks[pid]
+ return True
+ except KeyError:
+ return False
+
+ def _do_waitpid_all(self):
+
+ for pid in list(self._callbacks):
+ self._do_waitpid(pid)
+
+ def _do_waitpid(self, expected_pid):
+ assert expected_pid > 0
+
+ try:
+ pid, status = os.waitpid(expected_pid, os.WNOHANG)
+ except ChildProcessError:
+ # The child process is already reaped
+ # (may happen if waitpid() is called elsewhere).
+ pid = expected_pid
+ returncode = 255
+ logger.warning(
+ "Unknown child process pid %d, will report returncode 255",
+ pid)
+ else:
+ if pid == 0:
+ # The child process is still alive.
+ return
+
returncode = _compute_returncode(status)
- if self._loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- expected_pid, returncode)
-
- try:
- callback, args = self._callbacks.pop(pid)
- except KeyError: # pragma: no cover
- # May happen if .remove_child_handler() is called
- # after os.waitpid() returns.
- if self._loop.get_debug():
- logger.warning("Child watcher got an unexpected pid: %r",
- pid, exc_info=True)
- else:
- callback(pid, returncode, *args)
-
-
-class FastChildWatcher(BaseChildWatcher):
- """'Fast' child watcher implementation.
-
- This implementation reaps every terminated processes by calling
- os.waitpid(-1) directly, possibly breaking other code spawning processes
- and waiting for their termination.
-
- There is no noticeable overhead when handling a big number of children
- (O(1) each time a child terminates).
- """
- def __init__(self):
- super().__init__()
- self._lock = threading.Lock()
- self._zombies = {}
- self._forks = 0
-
- def close(self):
- self._callbacks.clear()
- self._zombies.clear()
- super().close()
-
- def __enter__(self):
- with self._lock:
- self._forks += 1
-
- return self
-
- def __exit__(self, a, b, c):
- with self._lock:
- self._forks -= 1
-
- if self._forks or not self._zombies:
- return
-
- collateral_victims = str(self._zombies)
- self._zombies.clear()
-
- logger.warning(
- "Caught subprocesses termination from unknown pids: %s",
- collateral_victims)
-
- def add_child_handler(self, pid, callback, *args):
- assert self._forks, "Must use the context manager"
-
- with self._lock:
- try:
- returncode = self._zombies.pop(pid)
- except KeyError:
- # The child is running.
- self._callbacks[pid] = callback, args
- return
-
- # The child is dead already. We can fire the callback.
- callback(pid, returncode, *args)
-
- def remove_child_handler(self, pid):
- try:
- del self._callbacks[pid]
- return True
- except KeyError:
- return False
-
- def _do_waitpid_all(self):
- # Because of signal coalescing, we must keep calling waitpid() as
- # long as we're able to reap a child.
- while True:
- try:
- pid, status = os.waitpid(-1, os.WNOHANG)
- except ChildProcessError:
- # No more child processes exist.
- return
- else:
- if pid == 0:
- # A child process is still alive.
- return
-
+ if self._loop.get_debug():
+ logger.debug('process %s exited with returncode %s',
+ expected_pid, returncode)
+
+ try:
+ callback, args = self._callbacks.pop(pid)
+ except KeyError: # pragma: no cover
+ # May happen if .remove_child_handler() is called
+ # after os.waitpid() returns.
+ if self._loop.get_debug():
+ logger.warning("Child watcher got an unexpected pid: %r",
+ pid, exc_info=True)
+ else:
+ callback(pid, returncode, *args)
+
+
+class FastChildWatcher(BaseChildWatcher):
+ """'Fast' child watcher implementation.
+
+ This implementation reaps every terminated processes by calling
+ os.waitpid(-1) directly, possibly breaking other code spawning processes
+ and waiting for their termination.
+
+ There is no noticeable overhead when handling a big number of children
+ (O(1) each time a child terminates).
+ """
+ def __init__(self):
+ super().__init__()
+ self._lock = threading.Lock()
+ self._zombies = {}
+ self._forks = 0
+
+ def close(self):
+ self._callbacks.clear()
+ self._zombies.clear()
+ super().close()
+
+ def __enter__(self):
+ with self._lock:
+ self._forks += 1
+
+ return self
+
+ def __exit__(self, a, b, c):
+ with self._lock:
+ self._forks -= 1
+
+ if self._forks or not self._zombies:
+ return
+
+ collateral_victims = str(self._zombies)
+ self._zombies.clear()
+
+ logger.warning(
+ "Caught subprocesses termination from unknown pids: %s",
+ collateral_victims)
+
+ def add_child_handler(self, pid, callback, *args):
+ assert self._forks, "Must use the context manager"
+
+ with self._lock:
+ try:
+ returncode = self._zombies.pop(pid)
+ except KeyError:
+ # The child is running.
+ self._callbacks[pid] = callback, args
+ return
+
+ # The child is dead already. We can fire the callback.
+ callback(pid, returncode, *args)
+
+ def remove_child_handler(self, pid):
+ try:
+ del self._callbacks[pid]
+ return True
+ except KeyError:
+ return False
+
+ def _do_waitpid_all(self):
+ # Because of signal coalescing, we must keep calling waitpid() as
+ # long as we're able to reap a child.
+ while True:
+ try:
+ pid, status = os.waitpid(-1, os.WNOHANG)
+ except ChildProcessError:
+ # No more child processes exist.
+ return
+ else:
+ if pid == 0:
+ # A child process is still alive.
+ return
+
returncode = _compute_returncode(status)
-
- with self._lock:
- try:
- callback, args = self._callbacks.pop(pid)
- except KeyError:
- # unknown child
- if self._forks:
- # It may not be registered yet.
- self._zombies[pid] = returncode
- if self._loop.get_debug():
- logger.debug('unknown process %s exited '
- 'with returncode %s',
- pid, returncode)
- continue
- callback = None
- else:
- if self._loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- pid, returncode)
-
- if callback is None:
- logger.warning(
- "Caught subprocess termination from unknown pid: "
- "%d -> %d", pid, returncode)
- else:
- callback(pid, returncode, *args)
-
-
+
+ with self._lock:
+ try:
+ callback, args = self._callbacks.pop(pid)
+ except KeyError:
+ # unknown child
+ if self._forks:
+ # It may not be registered yet.
+ self._zombies[pid] = returncode
+ if self._loop.get_debug():
+ logger.debug('unknown process %s exited '
+ 'with returncode %s',
+ pid, returncode)
+ continue
+ callback = None
+ else:
+ if self._loop.get_debug():
+ logger.debug('process %s exited with returncode %s',
+ pid, returncode)
+
+ if callback is None:
+ logger.warning(
+ "Caught subprocess termination from unknown pid: "
+ "%d -> %d", pid, returncode)
+ else:
+ callback(pid, returncode, *args)
+
+
class MultiLoopChildWatcher(AbstractChildWatcher):
"""A watcher that doesn't require running loop in the main thread.
@@ -1416,55 +1416,55 @@ class ThreadedChildWatcher(AbstractChildWatcher):
self._threads.pop(expected_pid)
-class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
- """UNIX event loop policy with a watcher for child processes."""
- _loop_factory = _UnixSelectorEventLoop
-
- def __init__(self):
- super().__init__()
- self._watcher = None
-
- def _init_watcher(self):
- with events._lock:
- if self._watcher is None: # pragma: no branch
+class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
+ """UNIX event loop policy with a watcher for child processes."""
+ _loop_factory = _UnixSelectorEventLoop
+
+ def __init__(self):
+ super().__init__()
+ self._watcher = None
+
+ def _init_watcher(self):
+ with events._lock:
+ if self._watcher is None: # pragma: no branch
self._watcher = ThreadedChildWatcher()
if threading.current_thread() is threading.main_thread():
- self._watcher.attach_loop(self._local._loop)
-
- def set_event_loop(self, loop):
- """Set the event loop.
-
- As a side effect, if a child watcher was set before, then calling
- .set_event_loop() from the main thread will call .attach_loop(loop) on
- the child watcher.
- """
-
- super().set_event_loop(loop)
-
- if (self._watcher is not None and
+ self._watcher.attach_loop(self._local._loop)
+
+ def set_event_loop(self, loop):
+ """Set the event loop.
+
+ As a side effect, if a child watcher was set before, then calling
+ .set_event_loop() from the main thread will call .attach_loop(loop) on
+ the child watcher.
+ """
+
+ super().set_event_loop(loop)
+
+ if (self._watcher is not None and
threading.current_thread() is threading.main_thread()):
- self._watcher.attach_loop(loop)
-
- def get_child_watcher(self):
- """Get the watcher for child processes.
-
+ self._watcher.attach_loop(loop)
+
+ def get_child_watcher(self):
+ """Get the watcher for child processes.
+
If not yet set, a ThreadedChildWatcher object is automatically created.
- """
- if self._watcher is None:
- self._init_watcher()
-
- return self._watcher
-
- def set_child_watcher(self, watcher):
- """Set the watcher for child processes."""
-
- assert watcher is None or isinstance(watcher, AbstractChildWatcher)
-
- if self._watcher is not None:
- self._watcher.close()
-
- self._watcher = watcher
-
-
-SelectorEventLoop = _UnixSelectorEventLoop
-DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
+ """
+ if self._watcher is None:
+ self._init_watcher()
+
+ return self._watcher
+
+ def set_child_watcher(self, watcher):
+ """Set the watcher for child processes."""
+
+ assert watcher is None or isinstance(watcher, AbstractChildWatcher)
+
+ if self._watcher is not None:
+ self._watcher.close()
+
+ self._watcher = watcher
+
+
+SelectorEventLoop = _UnixSelectorEventLoop
+DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy