diff options
| author | robot-piglet <[email protected]> | 2024-10-05 21:45:15 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2024-10-05 21:56:51 +0300 |
| commit | bb4b8d018c07ae166175cf29e5a3594ff942c7ce (patch) | |
| tree | 8c0e9822cb7f6dba611fde09851211b98fa55b40 /contrib/python/anyio | |
| parent | c78357dda6c6b22763df2371c01c5b405f500196 (diff) | |
Intermediate changes
commit_hash:1f9110413b1655b37aa301b93c0f03f212db79be
Diffstat (limited to 'contrib/python/anyio')
| -rw-r--r-- | contrib/python/anyio/.dist-info/METADATA | 9 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/__init__.py | 12 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_backends/_asyncio.py | 240 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_backends/_trio.py | 175 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_exceptions.py | 16 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_fileio.py | 22 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_sockets.py | 29 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_subprocesses.py | 125 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_synchronization.py | 247 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/abc/_eventloop.py | 68 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/from_thread.py | 69 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/pytest_plugin.py | 15 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/streams/memory.py | 10 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/to_process.py | 3 | ||||
| -rw-r--r-- | contrib/python/anyio/ya.make | 2 |
15 files changed, 767 insertions, 275 deletions
diff --git a/contrib/python/anyio/.dist-info/METADATA b/contrib/python/anyio/.dist-info/METADATA index be13c8aa0f4..3fee32ffa38 100644 --- a/contrib/python/anyio/.dist-info/METADATA +++ b/contrib/python/anyio/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: anyio -Version: 4.4.0 +Version: 4.5.0 Summary: High level compatibility layer for multiple asynchronous event loop implementations Author-email: Alex Grönholm <[email protected]> License: MIT @@ -20,6 +20,7 @@ Classifier: Programming Language :: Python :: 3.9 Classifier: Programming Language :: Python :: 3.10 Classifier: Programming Language :: Python :: 3.11 Classifier: Programming Language :: Python :: 3.12 +Classifier: Programming Language :: Python :: 3.13 Requires-Python: >=3.8 Description-Content-Type: text/x-rst License-File: LICENSE @@ -29,7 +30,7 @@ Requires-Dist: exceptiongroup >=1.0.2 ; python_version < "3.11" Requires-Dist: typing-extensions >=4.1 ; python_version < "3.11" Provides-Extra: doc Requires-Dist: packaging ; extra == 'doc' -Requires-Dist: Sphinx >=7 ; extra == 'doc' +Requires-Dist: Sphinx ~=7.4 ; extra == 'doc' Requires-Dist: sphinx-rtd-theme ; extra == 'doc' Requires-Dist: sphinx-autodoc-typehints >=1.2.0 ; extra == 'doc' Provides-Extra: test @@ -41,9 +42,9 @@ Requires-Dist: psutil >=5.9 ; extra == 'test' Requires-Dist: pytest >=7.0 ; extra == 'test' Requires-Dist: pytest-mock >=3.6.1 ; extra == 'test' Requires-Dist: trustme ; extra == 'test' -Requires-Dist: uvloop >=0.17 ; (platform_python_implementation == "CPython" and platform_system != "Windows") and extra == 'test' +Requires-Dist: uvloop >=0.21.0b1 ; (platform_python_implementation == "CPython" and platform_system != "Windows") and extra == 'test' Provides-Extra: trio -Requires-Dist: trio >=0.23 ; extra == 'trio' +Requires-Dist: trio >=0.26.1 ; extra == 'trio' .. image:: https://github.com/agronholm/anyio/actions/workflows/test.yml/badge.svg :target: https://github.com/agronholm/anyio/actions/workflows/test.yml diff --git a/contrib/python/anyio/anyio/__init__.py b/contrib/python/anyio/anyio/__init__.py index 7bfe231645c..fd9fe06bcfc 100644 --- a/contrib/python/anyio/anyio/__init__.py +++ b/contrib/python/anyio/anyio/__init__.py @@ -1,7 +1,5 @@ from __future__ import annotations -from typing import Any - from ._core._eventloop import current_time as current_time from ._core._eventloop import get_all_backends as get_all_backends from ._core._eventloop import get_cancelled_exc_class as get_cancelled_exc_class @@ -69,8 +67,8 @@ from ._core._typedattr import TypedAttributeSet as TypedAttributeSet from ._core._typedattr import typed_attribute as typed_attribute # Re-export imports so they look like they live directly in this package -key: str -value: Any -for key, value in list(locals().items()): - if getattr(value, "__module__", "").startswith("anyio."): - value.__module__ = __name__ +for __value in list(locals().values()): + if getattr(__value, "__module__", "").startswith("anyio."): + __value.__module__ = __name__ + +del __value diff --git a/contrib/python/anyio/anyio/_backends/_asyncio.py b/contrib/python/anyio/anyio/_backends/_asyncio.py index 43b7cb0e0ca..0d4cdf650d2 100644 --- a/contrib/python/anyio/anyio/_backends/_asyncio.py +++ b/contrib/python/anyio/anyio/_backends/_asyncio.py @@ -4,6 +4,7 @@ import array import asyncio import concurrent.futures import math +import os import socket import sys import threading @@ -19,7 +20,7 @@ from asyncio import ( ) from asyncio.base_events import _run_until_complete_cb # type: ignore[attr-defined] from collections import OrderedDict, deque -from collections.abc import AsyncIterator, Generator, Iterable +from collections.abc import AsyncIterator, Iterable from concurrent.futures import Future from contextlib import suppress from contextvars import Context, copy_context @@ -47,7 +48,6 @@ from typing import ( Collection, ContextManager, Coroutine, - Mapping, Optional, Sequence, Tuple, @@ -58,7 +58,13 @@ from weakref import WeakKeyDictionary import sniffio -from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc +from .. import ( + CapacityLimiterStatistics, + EventStatistics, + LockStatistics, + TaskInfo, + abc, +) from .._core._eventloop import claim_worker_thread, threadlocals from .._core._exceptions import ( BrokenResourceError, @@ -66,12 +72,20 @@ from .._core._exceptions import ( ClosedResourceError, EndOfStream, WouldBlock, + iterate_exceptions, ) from .._core._sockets import convert_ipv6_sockaddr from .._core._streams import create_memory_object_stream -from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter +from .._core._synchronization import ( + CapacityLimiter as BaseCapacityLimiter, +) from .._core._synchronization import Event as BaseEvent -from .._core._synchronization import ResourceGuard +from .._core._synchronization import Lock as BaseLock +from .._core._synchronization import ( + ResourceGuard, + SemaphoreStatistics, +) +from .._core._synchronization import Semaphore as BaseSemaphore from .._core._tasks import CancelScope as BaseCancelScope from ..abc import ( AsyncBackend, @@ -80,6 +94,7 @@ from ..abc import ( UDPPacketType, UNIXDatagramPacketType, ) +from ..abc._eventloop import StrOrBytesPath from ..lowlevel import RunVar from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream @@ -630,16 +645,6 @@ class _AsyncioTaskStatus(abc.TaskStatus): _task_states[task].parent_id = self._parent_id -def iterate_exceptions( - exception: BaseException, -) -> Generator[BaseException, None, None]: - if isinstance(exception, BaseExceptionGroup): - for exc in exception.exceptions: - yield from iterate_exceptions(exc) - else: - yield exception - - class TaskGroup(abc.TaskGroup): def __init__(self) -> None: self.cancel_scope: CancelScope = CancelScope() @@ -925,7 +930,7 @@ class StreamReaderWrapper(abc.ByteReceiveStream): raise EndOfStream async def aclose(self) -> None: - self._stream.feed_eof() + self._stream.set_exception(ClosedResourceError()) await AsyncIOBackend.checkpoint() @@ -1073,7 +1078,8 @@ class StreamProtocol(asyncio.Protocol): self.write_event.set() def data_received(self, data: bytes) -> None: - self.read_queue.append(data) + # ProactorEventloop sometimes sends bytearray instead of bytes + self.read_queue.append(bytes(data)) self.read_event.set() def eof_received(self) -> bool | None: @@ -1665,6 +1671,154 @@ class Event(BaseEvent): return EventStatistics(len(self._event._waiters)) +class Lock(BaseLock): + def __new__(cls, *, fast_acquire: bool = False) -> Lock: + return object.__new__(cls) + + def __init__(self, *, fast_acquire: bool = False) -> None: + self._fast_acquire = fast_acquire + self._owner_task: asyncio.Task | None = None + self._waiters: deque[tuple[asyncio.Task, asyncio.Future]] = deque() + + async def acquire(self) -> None: + if self._owner_task is None and not self._waiters: + await AsyncIOBackend.checkpoint_if_cancelled() + self._owner_task = current_task() + + # Unless on the "fast path", yield control of the event loop so that other + # tasks can run too + if not self._fast_acquire: + try: + await AsyncIOBackend.cancel_shielded_checkpoint() + except CancelledError: + self.release() + raise + + return + + task = cast(asyncio.Task, current_task()) + fut: asyncio.Future[None] = asyncio.Future() + item = task, fut + self._waiters.append(item) + try: + await fut + except CancelledError: + self._waiters.remove(item) + if self._owner_task is task: + self.release() + + raise + + self._waiters.remove(item) + + def acquire_nowait(self) -> None: + if self._owner_task is None and not self._waiters: + self._owner_task = current_task() + return + + raise WouldBlock + + def locked(self) -> bool: + return self._owner_task is not None + + def release(self) -> None: + if self._owner_task != current_task(): + raise RuntimeError("The current task is not holding this lock") + + for task, fut in self._waiters: + if not fut.cancelled(): + self._owner_task = task + fut.set_result(None) + return + + self._owner_task = None + + def statistics(self) -> LockStatistics: + task_info = AsyncIOTaskInfo(self._owner_task) if self._owner_task else None + return LockStatistics(self.locked(), task_info, len(self._waiters)) + + +class Semaphore(BaseSemaphore): + def __new__( + cls, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ) -> Semaphore: + return object.__new__(cls) + + def __init__( + self, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ): + super().__init__(initial_value, max_value=max_value) + self._value = initial_value + self._max_value = max_value + self._fast_acquire = fast_acquire + self._waiters: deque[asyncio.Future[None]] = deque() + + async def acquire(self) -> None: + if self._value > 0 and not self._waiters: + await AsyncIOBackend.checkpoint_if_cancelled() + self._value -= 1 + + # Unless on the "fast path", yield control of the event loop so that other + # tasks can run too + if not self._fast_acquire: + try: + await AsyncIOBackend.cancel_shielded_checkpoint() + except CancelledError: + self.release() + raise + + return + + fut: asyncio.Future[None] = asyncio.Future() + self._waiters.append(fut) + try: + await fut + except CancelledError: + try: + self._waiters.remove(fut) + except ValueError: + self.release() + + raise + + def acquire_nowait(self) -> None: + if self._value == 0: + raise WouldBlock + + self._value -= 1 + + def release(self) -> None: + if self._max_value is not None and self._value == self._max_value: + raise ValueError("semaphore released too many times") + + for fut in self._waiters: + if not fut.cancelled(): + fut.set_result(None) + self._waiters.remove(fut) + return + + self._value += 1 + + @property + def value(self) -> int: + return self._value + + @property + def max_value(self) -> int | None: + return self._max_value + + def statistics(self) -> SemaphoreStatistics: + return SemaphoreStatistics(len(self._waiters)) + + class CapacityLimiter(BaseCapacityLimiter): _total_tokens: float = 0 @@ -1861,7 +2015,9 @@ class AsyncIOTaskInfo(TaskInfo): if task_state := _task_states.get(task): if cancel_scope := task_state.cancel_scope: - return cancel_scope.cancel_called or cancel_scope._parent_cancelled() + return cancel_scope.cancel_called or ( + not cancel_scope.shield and cancel_scope._parent_cancelled() + ) return False @@ -1926,13 +2082,23 @@ class TestRunner(abc.TestRunner): tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]] ], ) -> None: + from _pytest.outcomes import OutcomeException + with receive_stream, self._send_stream: async for coro, future in receive_stream: try: retval = await coro + except CancelledError as exc: + if not future.cancelled(): + future.cancel(*exc.args) + + raise except BaseException as exc: if not future.cancelled(): future.set_exception(exc) + + if not isinstance(exc, (Exception, OutcomeException)): + raise else: if not future.cancelled(): future.set_result(retval) @@ -2114,6 +2280,20 @@ class AsyncIOBackend(AsyncBackend): return Event() @classmethod + def create_lock(cls, *, fast_acquire: bool) -> abc.Lock: + return Lock(fast_acquire=fast_acquire) + + @classmethod + def create_semaphore( + cls, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ) -> abc.Semaphore: + return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire) + + @classmethod def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter: return CapacityLimiter(total_tokens) @@ -2245,26 +2425,24 @@ class AsyncIOBackend(AsyncBackend): @classmethod async def open_process( cls, - command: str | bytes | Sequence[str | bytes], + command: StrOrBytesPath | Sequence[StrOrBytesPath], *, - shell: bool, stdin: int | IO[Any] | None, stdout: int | IO[Any] | None, stderr: int | IO[Any] | None, - cwd: str | bytes | PathLike | None = None, - env: Mapping[str, str] | None = None, - start_new_session: bool = False, + **kwargs: Any, ) -> Process: await cls.checkpoint() - if shell: + if isinstance(command, PathLike): + command = os.fspath(command) + + if isinstance(command, (str, bytes)): process = await asyncio.create_subprocess_shell( - cast("str | bytes", command), + command, stdin=stdin, stdout=stdout, stderr=stderr, - cwd=cwd, - env=env, - start_new_session=start_new_session, + **kwargs, ) else: process = await asyncio.create_subprocess_exec( @@ -2272,9 +2450,7 @@ class AsyncIOBackend(AsyncBackend): stdin=stdin, stdout=stdout, stderr=stderr, - cwd=cwd, - env=env, - start_new_session=start_new_session, + **kwargs, ) stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None @@ -2289,7 +2465,7 @@ class AsyncIOBackend(AsyncBackend): name="AnyIO process pool shutdown task", ) find_root_task().add_done_callback( - partial(_forcibly_shutdown_process_pool_on_exit, workers) + partial(_forcibly_shutdown_process_pool_on_exit, workers) # type:ignore[arg-type] ) @classmethod diff --git a/contrib/python/anyio/anyio/_backends/_trio.py b/contrib/python/anyio/anyio/_backends/_trio.py index cf6f3db789c..9b8369d4c50 100644 --- a/contrib/python/anyio/anyio/_backends/_trio.py +++ b/contrib/python/anyio/anyio/_backends/_trio.py @@ -2,6 +2,7 @@ from __future__ import annotations import array import math +import os import socket import sys import types @@ -25,7 +26,6 @@ from typing import ( ContextManager, Coroutine, Generic, - Mapping, NoReturn, Sequence, TypeVar, @@ -45,7 +45,14 @@ from trio.lowlevel import ( from trio.socket import SocketType as TrioSocketType from trio.to_thread import run_sync -from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc +from .. import ( + CapacityLimiterStatistics, + EventStatistics, + LockStatistics, + TaskInfo, + WouldBlock, + abc, +) from .._core._eventloop import claim_worker_thread from .._core._exceptions import ( BrokenResourceError, @@ -55,12 +62,19 @@ from .._core._exceptions import ( ) from .._core._sockets import convert_ipv6_sockaddr from .._core._streams import create_memory_object_stream -from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter +from .._core._synchronization import ( + CapacityLimiter as BaseCapacityLimiter, +) from .._core._synchronization import Event as BaseEvent -from .._core._synchronization import ResourceGuard +from .._core._synchronization import Lock as BaseLock +from .._core._synchronization import ( + ResourceGuard, + SemaphoreStatistics, +) +from .._core._synchronization import Semaphore as BaseSemaphore from .._core._tasks import CancelScope as BaseCancelScope from ..abc import IPSockAddrType, UDPPacketType, UNIXDatagramPacketType -from ..abc._eventloop import AsyncBackend +from ..abc._eventloop import AsyncBackend, StrOrBytesPath from ..streams.memory import MemoryObjectSendStream if sys.version_info >= (3, 10): @@ -637,6 +651,100 @@ class Event(BaseEvent): self.__original.set() +class Lock(BaseLock): + def __new__(cls, *, fast_acquire: bool = False) -> Lock: + return object.__new__(cls) + + def __init__(self, *, fast_acquire: bool = False) -> None: + self._fast_acquire = fast_acquire + self.__original = trio.Lock() + + async def acquire(self) -> None: + if not self._fast_acquire: + await self.__original.acquire() + return + + # This is the "fast path" where we don't let other tasks run + await trio.lowlevel.checkpoint_if_cancelled() + try: + self.__original.acquire_nowait() + except trio.WouldBlock: + await self.__original._lot.park() + + def acquire_nowait(self) -> None: + try: + self.__original.acquire_nowait() + except trio.WouldBlock: + raise WouldBlock from None + + def locked(self) -> bool: + return self.__original.locked() + + def release(self) -> None: + self.__original.release() + + def statistics(self) -> LockStatistics: + orig_statistics = self.__original.statistics() + owner = TrioTaskInfo(orig_statistics.owner) if orig_statistics.owner else None + return LockStatistics( + orig_statistics.locked, owner, orig_statistics.tasks_waiting + ) + + +class Semaphore(BaseSemaphore): + def __new__( + cls, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ) -> Semaphore: + return object.__new__(cls) + + def __init__( + self, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ) -> None: + super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire) + self.__original = trio.Semaphore(initial_value, max_value=max_value) + + async def acquire(self) -> None: + if not self._fast_acquire: + await self.__original.acquire() + return + + # This is the "fast path" where we don't let other tasks run + await trio.lowlevel.checkpoint_if_cancelled() + try: + self.__original.acquire_nowait() + except trio.WouldBlock: + await self.__original._lot.park() + + def acquire_nowait(self) -> None: + try: + self.__original.acquire_nowait() + except trio.WouldBlock: + raise WouldBlock from None + + @property + def max_value(self) -> int | None: + return self.__original.max_value + + @property + def value(self) -> int: + return self.__original.value + + def release(self) -> None: + self.__original.release() + + def statistics(self) -> SemaphoreStatistics: + orig_statistics = self.__original.statistics() + return SemaphoreStatistics(orig_statistics.tasks_waiting) + + class CapacityLimiter(BaseCapacityLimiter): def __new__( cls, @@ -916,6 +1024,20 @@ class TrioBackend(AsyncBackend): return Event() @classmethod + def create_lock(cls, *, fast_acquire: bool) -> Lock: + return Lock(fast_acquire=fast_acquire) + + @classmethod + def create_semaphore( + cls, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ) -> abc.Semaphore: + return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire) + + @classmethod def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: return CapacityLimiter(total_tokens) @@ -967,26 +1089,39 @@ class TrioBackend(AsyncBackend): @classmethod async def open_process( cls, - command: str | bytes | Sequence[str | bytes], + command: StrOrBytesPath | Sequence[StrOrBytesPath], *, - shell: bool, stdin: int | IO[Any] | None, stdout: int | IO[Any] | None, stderr: int | IO[Any] | None, - cwd: str | bytes | PathLike | None = None, - env: Mapping[str, str] | None = None, - start_new_session: bool = False, + **kwargs: Any, ) -> Process: - process = await trio.lowlevel.open_process( # type: ignore[misc] - command, # type: ignore[arg-type] - stdin=stdin, - stdout=stdout, - stderr=stderr, - shell=shell, - cwd=cwd, - env=env, - start_new_session=start_new_session, - ) + def convert_item(item: StrOrBytesPath) -> str: + str_or_bytes = os.fspath(item) + if isinstance(str_or_bytes, str): + return str_or_bytes + else: + return os.fsdecode(str_or_bytes) + + if isinstance(command, (str, bytes, PathLike)): + process = await trio.lowlevel.open_process( + convert_item(command), + stdin=stdin, + stdout=stdout, + stderr=stderr, + shell=True, + **kwargs, + ) + else: + process = await trio.lowlevel.open_process( + [convert_item(item) for item in command], + stdin=stdin, + stdout=stdout, + stderr=stderr, + shell=False, + **kwargs, + ) + stdin_stream = SendStreamWrapper(process.stdin) if process.stdin else None stdout_stream = ReceiveStreamWrapper(process.stdout) if process.stdout else None stderr_stream = ReceiveStreamWrapper(process.stderr) if process.stderr else None diff --git a/contrib/python/anyio/anyio/_core/_exceptions.py b/contrib/python/anyio/anyio/_core/_exceptions.py index 571c3b85316..6e3f8ccc675 100644 --- a/contrib/python/anyio/anyio/_core/_exceptions.py +++ b/contrib/python/anyio/anyio/_core/_exceptions.py @@ -1,5 +1,11 @@ from __future__ import annotations +import sys +from collections.abc import Generator + +if sys.version_info < (3, 11): + from exceptiongroup import BaseExceptionGroup + class BrokenResourceError(Exception): """ @@ -71,3 +77,13 @@ class TypedAttributeLookupError(LookupError): class WouldBlock(Exception): """Raised by ``X_nowait`` functions if ``X()`` would block.""" + + +def iterate_exceptions( + exception: BaseException, +) -> Generator[BaseException, None, None]: + if isinstance(exception, BaseExceptionGroup): + for exc in exception.exceptions: + yield from iterate_exceptions(exc) + else: + yield exception diff --git a/contrib/python/anyio/anyio/_core/_fileio.py b/contrib/python/anyio/anyio/_core/_fileio.py index df2057fe342..9503d944bb8 100644 --- a/contrib/python/anyio/anyio/_core/_fileio.py +++ b/contrib/python/anyio/anyio/_core/_fileio.py @@ -358,8 +358,26 @@ class Path: def as_uri(self) -> str: return self._path.as_uri() - def match(self, path_pattern: str) -> bool: - return self._path.match(path_pattern) + if sys.version_info >= (3, 13): + parser = pathlib.Path.parser + + @classmethod + def from_uri(cls, uri: str) -> Path: + return Path(pathlib.Path.from_uri(uri)) + + def full_match( + self, path_pattern: str, *, case_sensitive: bool | None = None + ) -> bool: + return self._path.full_match(path_pattern, case_sensitive=case_sensitive) + + def match( + self, path_pattern: str, *, case_sensitive: bool | None = None + ) -> bool: + return self._path.match(path_pattern, case_sensitive=case_sensitive) + else: + + def match(self, path_pattern: str) -> bool: + return self._path.match(path_pattern) def is_relative_to(self, other: str | PathLike[str]) -> bool: try: diff --git a/contrib/python/anyio/anyio/_core/_sockets.py b/contrib/python/anyio/anyio/_core/_sockets.py index 5e09cdbf0f0..6070c647fd9 100644 --- a/contrib/python/anyio/anyio/_core/_sockets.py +++ b/contrib/python/anyio/anyio/_core/_sockets.py @@ -680,19 +680,26 @@ async def setup_unix_local_socket( :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM """ - path_str: str | bytes | None + path_str: str | None if path is not None: - path_str = os.fspath(path) + path_str = os.fsdecode(path) - # Copied from pathlib... - try: - stat_result = os.stat(path) - except OSError as e: - if e.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EBADF, errno.ELOOP): - raise - else: - if stat.S_ISSOCK(stat_result.st_mode): - os.unlink(path) + # Linux abstract namespace sockets aren't backed by a concrete file so skip stat call + if not path_str.startswith("\0"): + # Copied from pathlib... + try: + stat_result = os.stat(path) + except OSError as e: + if e.errno not in ( + errno.ENOENT, + errno.ENOTDIR, + errno.EBADF, + errno.ELOOP, + ): + raise + else: + if stat.S_ISSOCK(stat_result.st_mode): + os.unlink(path) else: path_str = None diff --git a/contrib/python/anyio/anyio/_core/_subprocesses.py b/contrib/python/anyio/anyio/_core/_subprocesses.py index 5d5d7b768a1..1ac2d549dff 100644 --- a/contrib/python/anyio/anyio/_core/_subprocesses.py +++ b/contrib/python/anyio/anyio/_core/_subprocesses.py @@ -1,26 +1,41 @@ from __future__ import annotations -from collections.abc import AsyncIterable, Mapping, Sequence +import sys +from collections.abc import AsyncIterable, Iterable, Mapping, Sequence from io import BytesIO from os import PathLike from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess -from typing import IO, Any, cast +from typing import IO, Any, Union, cast from ..abc import Process from ._eventloop import get_async_backend from ._tasks import create_task_group +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"] + async def run_process( - command: str | bytes | Sequence[str | bytes], + command: StrOrBytesPath | Sequence[StrOrBytesPath], *, input: bytes | None = None, stdout: int | IO[Any] | None = PIPE, stderr: int | IO[Any] | None = PIPE, check: bool = True, - cwd: str | bytes | PathLike[str] | None = None, + cwd: StrOrBytesPath | None = None, env: Mapping[str, str] | None = None, + startupinfo: Any = None, + creationflags: int = 0, start_new_session: bool = False, + pass_fds: Sequence[int] = (), + user: str | int | None = None, + group: str | int | None = None, + extra_groups: Iterable[str | int] | None = None, + umask: int = -1, ) -> CompletedProcess[bytes]: """ Run an external command in a subprocess and wait until it completes. @@ -40,8 +55,20 @@ async def run_process( command :param env: if not ``None``, this mapping replaces the inherited environment variables from the parent process + :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used + to specify process startup parameters (Windows only) + :param creationflags: flags that can be used to control the creation of the + subprocess (see :class:`subprocess.Popen` for the specifics) :param start_new_session: if ``true`` the setsid() system call will be made in the child process prior to the execution of the subprocess. (POSIX only) + :param pass_fds: sequence of file descriptors to keep open between the parent and + child processes. (POSIX only) + :param user: effective user to run the process as (Python >= 3.9, POSIX only) + :param group: effective group to run the process as (Python >= 3.9, POSIX only) + :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9, + POSIX only) + :param umask: if not negative, this umask is applied in the child process before + running the given command (Python >= 3.9, POSIX only) :return: an object representing the completed process :raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process exits with a nonzero return code @@ -62,7 +89,14 @@ async def run_process( stderr=stderr, cwd=cwd, env=env, + startupinfo=startupinfo, + creationflags=creationflags, start_new_session=start_new_session, + pass_fds=pass_fds, + user=user, + group=group, + extra_groups=extra_groups, + umask=umask, ) as process: stream_contents: list[bytes | None] = [None, None] async with create_task_group() as tg: @@ -86,14 +120,21 @@ async def run_process( async def open_process( - command: str | bytes | Sequence[str | bytes], + command: StrOrBytesPath | Sequence[StrOrBytesPath], *, stdin: int | IO[Any] | None = PIPE, stdout: int | IO[Any] | None = PIPE, stderr: int | IO[Any] | None = PIPE, - cwd: str | bytes | PathLike[str] | None = None, + cwd: StrOrBytesPath | None = None, env: Mapping[str, str] | None = None, + startupinfo: Any = None, + creationflags: int = 0, start_new_session: bool = False, + pass_fds: Sequence[int] = (), + user: str | int | None = None, + group: str | int | None = None, + extra_groups: Iterable[str | int] | None = None, + umask: int = -1, ) -> Process: """ Start an external command in a subprocess. @@ -111,30 +152,58 @@ async def open_process( :param cwd: If not ``None``, the working directory is changed before executing :param env: If env is not ``None``, it must be a mapping that defines the environment variables for the new process + :param creationflags: flags that can be used to control the creation of the + subprocess (see :class:`subprocess.Popen` for the specifics) + :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used + to specify process startup parameters (Windows only) :param start_new_session: if ``true`` the setsid() system call will be made in the child process prior to the execution of the subprocess. (POSIX only) + :param pass_fds: sequence of file descriptors to keep open between the parent and + child processes. (POSIX only) + :param user: effective user to run the process as (Python >= 3.9; POSIX only) + :param group: effective group to run the process as (Python >= 3.9; POSIX only) + :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9; + POSIX only) + :param umask: if not negative, this umask is applied in the child process before + running the given command (Python >= 3.9; POSIX only) :return: an asynchronous process object """ - if isinstance(command, (str, bytes)): - return await get_async_backend().open_process( - command, - shell=True, - stdin=stdin, - stdout=stdout, - stderr=stderr, - cwd=cwd, - env=env, - start_new_session=start_new_session, - ) - else: - return await get_async_backend().open_process( - command, - shell=False, - stdin=stdin, - stdout=stdout, - stderr=stderr, - cwd=cwd, - env=env, - start_new_session=start_new_session, - ) + kwargs: dict[str, Any] = {} + if user is not None: + if sys.version_info < (3, 9): + raise TypeError("the 'user' argument requires Python 3.9 or later") + + kwargs["user"] = user + + if group is not None: + if sys.version_info < (3, 9): + raise TypeError("the 'group' argument requires Python 3.9 or later") + + kwargs["group"] = group + + if extra_groups is not None: + if sys.version_info < (3, 9): + raise TypeError("the 'extra_groups' argument requires Python 3.9 or later") + + kwargs["extra_groups"] = group + + if umask >= 0: + if sys.version_info < (3, 9): + raise TypeError("the 'umask' argument requires Python 3.9 or later") + + kwargs["umask"] = umask + + return await get_async_backend().open_process( + command, + stdin=stdin, + stdout=stdout, + stderr=stderr, + cwd=cwd, + env=env, + startupinfo=startupinfo, + creationflags=creationflags, + start_new_session=start_new_session, + pass_fds=pass_fds, + **kwargs, + ) diff --git a/contrib/python/anyio/anyio/_core/_synchronization.py b/contrib/python/anyio/anyio/_core/_synchronization.py index b274a31ea2c..023ab73370d 100644 --- a/contrib/python/anyio/anyio/_core/_synchronization.py +++ b/contrib/python/anyio/anyio/_core/_synchronization.py @@ -7,9 +7,9 @@ from types import TracebackType from sniffio import AsyncLibraryNotFoundError -from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled +from ..lowlevel import checkpoint from ._eventloop import get_async_backend -from ._exceptions import BusyResourceError, WouldBlock +from ._exceptions import BusyResourceError from ._tasks import CancelScope from ._testing import TaskInfo, get_current_task @@ -137,10 +137,11 @@ class EventAdapter(Event): class Lock: - _owner_task: TaskInfo | None = None - - def __init__(self) -> None: - self._waiters: deque[tuple[TaskInfo, Event]] = deque() + def __new__(cls, *, fast_acquire: bool = False) -> Lock: + try: + return get_async_backend().create_lock(fast_acquire=fast_acquire) + except AsyncLibraryNotFoundError: + return LockAdapter(fast_acquire=fast_acquire) async def __aenter__(self) -> None: await self.acquire() @@ -155,31 +156,7 @@ class Lock: async def acquire(self) -> None: """Acquire the lock.""" - await checkpoint_if_cancelled() - try: - self.acquire_nowait() - except WouldBlock: - task = get_current_task() - event = Event() - token = task, event - self._waiters.append(token) - try: - await event.wait() - except BaseException: - if not event.is_set(): - self._waiters.remove(token) - elif self._owner_task == task: - self.release() - - raise - - assert self._owner_task == task - else: - try: - await cancel_shielded_checkpoint() - except BaseException: - self.release() - raise + raise NotImplementedError def acquire_nowait(self) -> None: """ @@ -188,37 +165,87 @@ class Lock: :raises ~anyio.WouldBlock: if the operation would block """ - task = get_current_task() - if self._owner_task == task: - raise RuntimeError("Attempted to acquire an already held Lock") + raise NotImplementedError - if self._owner_task is not None: - raise WouldBlock + def release(self) -> None: + """Release the lock.""" + raise NotImplementedError - self._owner_task = task + def locked(self) -> bool: + """Return True if the lock is currently held.""" + raise NotImplementedError + + def statistics(self) -> LockStatistics: + """ + Return statistics about the current state of this lock. + + .. versionadded:: 3.0 + """ + raise NotImplementedError + + +class LockAdapter(Lock): + _internal_lock: Lock | None = None + + def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter: + return object.__new__(cls) + + def __init__(self, *, fast_acquire: bool = False): + self._fast_acquire = fast_acquire + + @property + def _lock(self) -> Lock: + if self._internal_lock is None: + self._internal_lock = get_async_backend().create_lock( + fast_acquire=self._fast_acquire + ) + + return self._internal_lock + + async def __aenter__(self) -> None: + await self._lock.acquire() + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + if self._internal_lock is not None: + self._internal_lock.release() + + async def acquire(self) -> None: + """Acquire the lock.""" + await self._lock.acquire() + + def acquire_nowait(self) -> None: + """ + Acquire the lock, without blocking. + + :raises ~anyio.WouldBlock: if the operation would block + + """ + self._lock.acquire_nowait() def release(self) -> None: """Release the lock.""" - if self._owner_task != get_current_task(): - raise RuntimeError("The current task is not holding this lock") - - if self._waiters: - self._owner_task, event = self._waiters.popleft() - event.set() - else: - del self._owner_task + self._lock.release() def locked(self) -> bool: """Return True if the lock is currently held.""" - return self._owner_task is not None + return self._lock.locked() def statistics(self) -> LockStatistics: """ Return statistics about the current state of this lock. .. versionadded:: 3.0 + """ - return LockStatistics(self.locked(), self._owner_task, len(self._waiters)) + if self._internal_lock is None: + return LockStatistics(False, None, 0) + + return self._internal_lock.statistics() class Condition: @@ -312,7 +339,27 @@ class Condition: class Semaphore: - def __init__(self, initial_value: int, *, max_value: int | None = None): + def __new__( + cls, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ) -> Semaphore: + try: + return get_async_backend().create_semaphore( + initial_value, max_value=max_value, fast_acquire=fast_acquire + ) + except AsyncLibraryNotFoundError: + return SemaphoreAdapter(initial_value, max_value=max_value) + + def __init__( + self, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ): if not isinstance(initial_value, int): raise TypeError("initial_value must be an integer") if initial_value < 0: @@ -325,9 +372,7 @@ class Semaphore: "max_value must be equal to or higher than initial_value" ) - self._value = initial_value - self._max_value = max_value - self._waiters: deque[Event] = deque() + self._fast_acquire = fast_acquire async def __aenter__(self) -> Semaphore: await self.acquire() @@ -343,27 +388,7 @@ class Semaphore: async def acquire(self) -> None: """Decrement the semaphore value, blocking if necessary.""" - await checkpoint_if_cancelled() - try: - self.acquire_nowait() - except WouldBlock: - event = Event() - self._waiters.append(event) - try: - await event.wait() - except BaseException: - if not event.is_set(): - self._waiters.remove(event) - else: - self.release() - - raise - else: - try: - await cancel_shielded_checkpoint() - except BaseException: - self.release() - raise + raise NotImplementedError def acquire_nowait(self) -> None: """ @@ -372,30 +397,21 @@ class Semaphore: :raises ~anyio.WouldBlock: if the operation would block """ - if self._value == 0: - raise WouldBlock - - self._value -= 1 + raise NotImplementedError def release(self) -> None: """Increment the semaphore value.""" - if self._max_value is not None and self._value == self._max_value: - raise ValueError("semaphore released too many times") - - if self._waiters: - self._waiters.popleft().set() - else: - self._value += 1 + raise NotImplementedError @property def value(self) -> int: """The current value of the semaphore.""" - return self._value + raise NotImplementedError @property def max_value(self) -> int | None: """The maximum value of the semaphore.""" - return self._max_value + raise NotImplementedError def statistics(self) -> SemaphoreStatistics: """ @@ -403,7 +419,66 @@ class Semaphore: .. versionadded:: 3.0 """ - return SemaphoreStatistics(len(self._waiters)) + raise NotImplementedError + + +class SemaphoreAdapter(Semaphore): + _internal_semaphore: Semaphore | None = None + + def __new__( + cls, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ) -> SemaphoreAdapter: + return object.__new__(cls) + + def __init__( + self, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ) -> None: + super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire) + self._initial_value = initial_value + self._max_value = max_value + + @property + def _semaphore(self) -> Semaphore: + if self._internal_semaphore is None: + self._internal_semaphore = get_async_backend().create_semaphore( + self._initial_value, max_value=self._max_value + ) + + return self._internal_semaphore + + async def acquire(self) -> None: + await self._semaphore.acquire() + + def acquire_nowait(self) -> None: + self._semaphore.acquire_nowait() + + def release(self) -> None: + self._semaphore.release() + + @property + def value(self) -> int: + if self._internal_semaphore is None: + return self._initial_value + + return self._semaphore.value + + @property + def max_value(self) -> int | None: + return self._max_value + + def statistics(self) -> SemaphoreStatistics: + if self._internal_semaphore is None: + return SemaphoreStatistics(tasks_waiting=0) + + return self._semaphore.statistics() class CapacityLimiter: diff --git a/contrib/python/anyio/anyio/abc/_eventloop.py b/contrib/python/anyio/anyio/abc/_eventloop.py index a50afefaa00..2c73bb9ffb8 100644 --- a/contrib/python/anyio/anyio/abc/_eventloop.py +++ b/contrib/python/anyio/anyio/abc/_eventloop.py @@ -3,7 +3,7 @@ from __future__ import annotations import math import sys from abc import ABCMeta, abstractmethod -from collections.abc import AsyncIterator, Awaitable, Mapping +from collections.abc import AsyncIterator, Awaitable from os import PathLike from signal import Signals from socket import AddressFamily, SocketKind, socket @@ -15,6 +15,7 @@ from typing import ( ContextManager, Sequence, TypeVar, + Union, overload, ) @@ -23,10 +24,13 @@ if sys.version_info >= (3, 11): else: from typing_extensions import TypeVarTuple, Unpack -if TYPE_CHECKING: - from typing import Literal +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias - from .._core._synchronization import CapacityLimiter, Event +if TYPE_CHECKING: + from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore from .._core._tasks import CancelScope from .._core._testing import TaskInfo from ..from_thread import BlockingPortal @@ -46,6 +50,7 @@ if TYPE_CHECKING: T_Retval = TypeVar("T_Retval") PosArgsT = TypeVarTuple("PosArgsT") +StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"] class AsyncBackend(metaclass=ABCMeta): @@ -169,6 +174,22 @@ class AsyncBackend(metaclass=ABCMeta): @classmethod @abstractmethod + def create_lock(cls, *, fast_acquire: bool) -> Lock: + pass + + @classmethod + @abstractmethod + def create_semaphore( + cls, + initial_value: int, + *, + max_value: int | None = None, + fast_acquire: bool = False, + ) -> Semaphore: + pass + + @classmethod + @abstractmethod def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: pass @@ -214,50 +235,15 @@ class AsyncBackend(metaclass=ABCMeta): pass @classmethod - @overload - async def open_process( - cls, - command: str | bytes, - *, - shell: Literal[True], - stdin: int | IO[Any] | None, - stdout: int | IO[Any] | None, - stderr: int | IO[Any] | None, - cwd: str | bytes | PathLike[str] | None = None, - env: Mapping[str, str] | None = None, - start_new_session: bool = False, - ) -> Process: - pass - - @classmethod - @overload - async def open_process( - cls, - command: Sequence[str | bytes], - *, - shell: Literal[False], - stdin: int | IO[Any] | None, - stdout: int | IO[Any] | None, - stderr: int | IO[Any] | None, - cwd: str | bytes | PathLike[str] | None = None, - env: Mapping[str, str] | None = None, - start_new_session: bool = False, - ) -> Process: - pass - - @classmethod @abstractmethod async def open_process( cls, - command: str | bytes | Sequence[str | bytes], + command: StrOrBytesPath | Sequence[StrOrBytesPath], *, - shell: bool, stdin: int | IO[Any] | None, stdout: int | IO[Any] | None, stderr: int | IO[Any] | None, - cwd: str | bytes | PathLike[str] | None = None, - env: Mapping[str, str] | None = None, - start_new_session: bool = False, + **kwargs: Any, ) -> Process: pass diff --git a/contrib/python/anyio/anyio/from_thread.py b/contrib/python/anyio/anyio/from_thread.py index 88a854bb919..b8785845bad 100644 --- a/contrib/python/anyio/anyio/from_thread.py +++ b/contrib/python/anyio/anyio/from_thread.py @@ -1,19 +1,18 @@ from __future__ import annotations import sys -import threading from collections.abc import Awaitable, Callable, Generator -from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait +from concurrent.futures import Future from contextlib import AbstractContextManager, contextmanager from dataclasses import dataclass, field from inspect import isawaitable +from threading import Lock, Thread, get_ident from types import TracebackType from typing import ( Any, AsyncContextManager, ContextManager, Generic, - Iterable, TypeVar, cast, overload, @@ -146,7 +145,7 @@ class BlockingPortal: return get_async_backend().create_blocking_portal() def __init__(self) -> None: - self._event_loop_thread_id: int | None = threading.get_ident() + self._event_loop_thread_id: int | None = get_ident() self._stop_event = Event() self._task_group = create_task_group() self._cancelled_exc_class = get_cancelled_exc_class() @@ -167,7 +166,7 @@ class BlockingPortal: def _check_running(self) -> None: if self._event_loop_thread_id is None: raise RuntimeError("This portal is not running") - if self._event_loop_thread_id == threading.get_ident(): + if self._event_loop_thread_id == get_ident(): raise RuntimeError( "This method cannot be called from the event loop thread" ) @@ -202,7 +201,7 @@ class BlockingPortal: def callback(f: Future[T_Retval]) -> None: if f.cancelled() and self._event_loop_thread_id not in ( None, - threading.get_ident(), + get_ident(), ): self.call(scope.cancel) @@ -411,7 +410,7 @@ class BlockingPortalProvider: backend: str = "asyncio" backend_options: dict[str, Any] | None = None - _lock: threading.Lock = field(init=False, default_factory=threading.Lock) + _lock: Lock = field(init=False, default_factory=Lock) _leases: int = field(init=False, default=0) _portal: BlockingPortal = field(init=False) _portal_cm: AbstractContextManager[BlockingPortal] | None = field( @@ -469,43 +468,37 @@ def start_blocking_portal( async def run_portal() -> None: async with BlockingPortal() as portal_: - if future.set_running_or_notify_cancel(): - future.set_result(portal_) - await portal_.sleep_until_stopped() + future.set_result(portal_) + await portal_.sleep_until_stopped() + + def run_blocking_portal() -> None: + if future.set_running_or_notify_cancel(): + try: + _eventloop.run( + run_portal, backend=backend, backend_options=backend_options + ) + except BaseException as exc: + if not future.done(): + future.set_exception(exc) future: Future[BlockingPortal] = Future() - with ThreadPoolExecutor(1) as executor: - run_future = executor.submit( - _eventloop.run, # type: ignore[arg-type] - run_portal, - backend=backend, - backend_options=backend_options, - ) + thread = Thread(target=run_blocking_portal, daemon=True) + thread.start() + try: + cancel_remaining_tasks = False + portal = future.result() try: - wait( - cast(Iterable[Future], [run_future, future]), - return_when=FIRST_COMPLETED, - ) + yield portal except BaseException: - future.cancel() - run_future.cancel() + cancel_remaining_tasks = True raise - - if future.done(): - portal = future.result() - cancel_remaining_tasks = False + finally: try: - yield portal - except BaseException: - cancel_remaining_tasks = True - raise - finally: - try: - portal.call(portal.stop, cancel_remaining_tasks) - except RuntimeError: - pass - - run_future.result() + portal.call(portal.stop, cancel_remaining_tasks) + except RuntimeError: + pass + finally: + thread.join() def check_cancelled() -> None: diff --git a/contrib/python/anyio/anyio/pytest_plugin.py b/contrib/python/anyio/anyio/pytest_plugin.py index a8dd6f3e3f1..558c72ec287 100644 --- a/contrib/python/anyio/anyio/pytest_plugin.py +++ b/contrib/python/anyio/anyio/pytest_plugin.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys from collections.abc import Iterator from contextlib import ExitStack, contextmanager from inspect import isasyncgenfunction, iscoroutinefunction @@ -7,10 +8,15 @@ from typing import Any, Dict, Tuple, cast import pytest import sniffio +from _pytest.outcomes import Exit from ._core._eventloop import get_all_backends, get_async_backend +from ._core._exceptions import iterate_exceptions from .abc import TestRunner +if sys.version_info < (3, 11): + from exceptiongroup import ExceptionGroup + _current_runner: TestRunner | None = None _runner_stack: ExitStack | None = None _runner_leases = 0 @@ -121,7 +127,14 @@ def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None: funcargs = pyfuncitem.funcargs testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames} with get_runner(backend_name, backend_options) as runner: - runner.run_test(pyfuncitem.obj, testargs) + try: + runner.run_test(pyfuncitem.obj, testargs) + except ExceptionGroup as excgrp: + for exc in iterate_exceptions(excgrp): + if isinstance(exc, (Exit, KeyboardInterrupt, SystemExit)): + raise exc from excgrp + + raise return True diff --git a/contrib/python/anyio/anyio/streams/memory.py b/contrib/python/anyio/anyio/streams/memory.py index 6840e6242ff..b547aa6a48c 100644 --- a/contrib/python/anyio/anyio/streams/memory.py +++ b/contrib/python/anyio/anyio/streams/memory.py @@ -38,6 +38,12 @@ class MemoryObjectItemReceiver(Generic[T_Item]): task_info: TaskInfo = field(init=False, default_factory=get_current_task) item: T_Item = field(init=False) + def __repr__(self) -> str: + # When item is not defined, we get following error with default __repr__: + # AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item' + item = getattr(self, "item", None) + return f"{self.__class__.__name__}(task_info={self.task_info}, item={item!r})" + @dataclass(eq=False) class MemoryObjectStreamState(Generic[T_Item]): @@ -175,7 +181,7 @@ class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]): def __del__(self) -> None: if not self._closed: warnings.warn( - f"Unclosed <{self.__class__.__name__}>", + f"Unclosed <{self.__class__.__name__} at {id(self):x}>", ResourceWarning, source=self, ) @@ -305,7 +311,7 @@ class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]): def __del__(self) -> None: if not self._closed: warnings.warn( - f"Unclosed <{self.__class__.__name__}>", + f"Unclosed <{self.__class__.__name__} at {id(self):x}>", ResourceWarning, source=self, ) diff --git a/contrib/python/anyio/anyio/to_process.py b/contrib/python/anyio/anyio/to_process.py index 1ff06f0b259..5050dee21e9 100644 --- a/contrib/python/anyio/anyio/to_process.py +++ b/contrib/python/anyio/anyio/to_process.py @@ -223,7 +223,7 @@ def process_worker() -> None: main_module_path: str | None sys.path, main_module_path = args del sys.modules["__main__"] - if main_module_path: + if main_module_path and os.path.isfile(main_module_path): # Load the parent's main module but as __mp_main__ instead of # __main__ (like multiprocessing does) to avoid infinite recursion try: @@ -234,7 +234,6 @@ def process_worker() -> None: sys.modules["__main__"] = main except BaseException as exc: exception = exc - try: if exception is not None: status = b"EXCEPTION" diff --git a/contrib/python/anyio/ya.make b/contrib/python/anyio/ya.make index 9062121337b..c166d2627f0 100644 --- a/contrib/python/anyio/ya.make +++ b/contrib/python/anyio/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(4.4.0) +VERSION(4.5.0) LICENSE(MIT) |
