summaryrefslogtreecommitdiffstats
path: root/contrib/python/anyio
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2024-10-05 21:45:15 +0300
committerrobot-piglet <[email protected]>2024-10-05 21:56:51 +0300
commitbb4b8d018c07ae166175cf29e5a3594ff942c7ce (patch)
tree8c0e9822cb7f6dba611fde09851211b98fa55b40 /contrib/python/anyio
parentc78357dda6c6b22763df2371c01c5b405f500196 (diff)
Intermediate changes
commit_hash:1f9110413b1655b37aa301b93c0f03f212db79be
Diffstat (limited to 'contrib/python/anyio')
-rw-r--r--contrib/python/anyio/.dist-info/METADATA9
-rw-r--r--contrib/python/anyio/anyio/__init__.py12
-rw-r--r--contrib/python/anyio/anyio/_backends/_asyncio.py240
-rw-r--r--contrib/python/anyio/anyio/_backends/_trio.py175
-rw-r--r--contrib/python/anyio/anyio/_core/_exceptions.py16
-rw-r--r--contrib/python/anyio/anyio/_core/_fileio.py22
-rw-r--r--contrib/python/anyio/anyio/_core/_sockets.py29
-rw-r--r--contrib/python/anyio/anyio/_core/_subprocesses.py125
-rw-r--r--contrib/python/anyio/anyio/_core/_synchronization.py247
-rw-r--r--contrib/python/anyio/anyio/abc/_eventloop.py68
-rw-r--r--contrib/python/anyio/anyio/from_thread.py69
-rw-r--r--contrib/python/anyio/anyio/pytest_plugin.py15
-rw-r--r--contrib/python/anyio/anyio/streams/memory.py10
-rw-r--r--contrib/python/anyio/anyio/to_process.py3
-rw-r--r--contrib/python/anyio/ya.make2
15 files changed, 767 insertions, 275 deletions
diff --git a/contrib/python/anyio/.dist-info/METADATA b/contrib/python/anyio/.dist-info/METADATA
index be13c8aa0f4..3fee32ffa38 100644
--- a/contrib/python/anyio/.dist-info/METADATA
+++ b/contrib/python/anyio/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: anyio
-Version: 4.4.0
+Version: 4.5.0
Summary: High level compatibility layer for multiple asynchronous event loop implementations
Author-email: Alex Grönholm <[email protected]>
License: MIT
@@ -20,6 +20,7 @@ Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
+Classifier: Programming Language :: Python :: 3.13
Requires-Python: >=3.8
Description-Content-Type: text/x-rst
License-File: LICENSE
@@ -29,7 +30,7 @@ Requires-Dist: exceptiongroup >=1.0.2 ; python_version < "3.11"
Requires-Dist: typing-extensions >=4.1 ; python_version < "3.11"
Provides-Extra: doc
Requires-Dist: packaging ; extra == 'doc'
-Requires-Dist: Sphinx >=7 ; extra == 'doc'
+Requires-Dist: Sphinx ~=7.4 ; extra == 'doc'
Requires-Dist: sphinx-rtd-theme ; extra == 'doc'
Requires-Dist: sphinx-autodoc-typehints >=1.2.0 ; extra == 'doc'
Provides-Extra: test
@@ -41,9 +42,9 @@ Requires-Dist: psutil >=5.9 ; extra == 'test'
Requires-Dist: pytest >=7.0 ; extra == 'test'
Requires-Dist: pytest-mock >=3.6.1 ; extra == 'test'
Requires-Dist: trustme ; extra == 'test'
-Requires-Dist: uvloop >=0.17 ; (platform_python_implementation == "CPython" and platform_system != "Windows") and extra == 'test'
+Requires-Dist: uvloop >=0.21.0b1 ; (platform_python_implementation == "CPython" and platform_system != "Windows") and extra == 'test'
Provides-Extra: trio
-Requires-Dist: trio >=0.23 ; extra == 'trio'
+Requires-Dist: trio >=0.26.1 ; extra == 'trio'
.. image:: https://github.com/agronholm/anyio/actions/workflows/test.yml/badge.svg
:target: https://github.com/agronholm/anyio/actions/workflows/test.yml
diff --git a/contrib/python/anyio/anyio/__init__.py b/contrib/python/anyio/anyio/__init__.py
index 7bfe231645c..fd9fe06bcfc 100644
--- a/contrib/python/anyio/anyio/__init__.py
+++ b/contrib/python/anyio/anyio/__init__.py
@@ -1,7 +1,5 @@
from __future__ import annotations
-from typing import Any
-
from ._core._eventloop import current_time as current_time
from ._core._eventloop import get_all_backends as get_all_backends
from ._core._eventloop import get_cancelled_exc_class as get_cancelled_exc_class
@@ -69,8 +67,8 @@ from ._core._typedattr import TypedAttributeSet as TypedAttributeSet
from ._core._typedattr import typed_attribute as typed_attribute
# Re-export imports so they look like they live directly in this package
-key: str
-value: Any
-for key, value in list(locals().items()):
- if getattr(value, "__module__", "").startswith("anyio."):
- value.__module__ = __name__
+for __value in list(locals().values()):
+ if getattr(__value, "__module__", "").startswith("anyio."):
+ __value.__module__ = __name__
+
+del __value
diff --git a/contrib/python/anyio/anyio/_backends/_asyncio.py b/contrib/python/anyio/anyio/_backends/_asyncio.py
index 43b7cb0e0ca..0d4cdf650d2 100644
--- a/contrib/python/anyio/anyio/_backends/_asyncio.py
+++ b/contrib/python/anyio/anyio/_backends/_asyncio.py
@@ -4,6 +4,7 @@ import array
import asyncio
import concurrent.futures
import math
+import os
import socket
import sys
import threading
@@ -19,7 +20,7 @@ from asyncio import (
)
from asyncio.base_events import _run_until_complete_cb # type: ignore[attr-defined]
from collections import OrderedDict, deque
-from collections.abc import AsyncIterator, Generator, Iterable
+from collections.abc import AsyncIterator, Iterable
from concurrent.futures import Future
from contextlib import suppress
from contextvars import Context, copy_context
@@ -47,7 +48,6 @@ from typing import (
Collection,
ContextManager,
Coroutine,
- Mapping,
Optional,
Sequence,
Tuple,
@@ -58,7 +58,13 @@ from weakref import WeakKeyDictionary
import sniffio
-from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
+from .. import (
+ CapacityLimiterStatistics,
+ EventStatistics,
+ LockStatistics,
+ TaskInfo,
+ abc,
+)
from .._core._eventloop import claim_worker_thread, threadlocals
from .._core._exceptions import (
BrokenResourceError,
@@ -66,12 +72,20 @@ from .._core._exceptions import (
ClosedResourceError,
EndOfStream,
WouldBlock,
+ iterate_exceptions,
)
from .._core._sockets import convert_ipv6_sockaddr
from .._core._streams import create_memory_object_stream
-from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
+from .._core._synchronization import (
+ CapacityLimiter as BaseCapacityLimiter,
+)
from .._core._synchronization import Event as BaseEvent
-from .._core._synchronization import ResourceGuard
+from .._core._synchronization import Lock as BaseLock
+from .._core._synchronization import (
+ ResourceGuard,
+ SemaphoreStatistics,
+)
+from .._core._synchronization import Semaphore as BaseSemaphore
from .._core._tasks import CancelScope as BaseCancelScope
from ..abc import (
AsyncBackend,
@@ -80,6 +94,7 @@ from ..abc import (
UDPPacketType,
UNIXDatagramPacketType,
)
+from ..abc._eventloop import StrOrBytesPath
from ..lowlevel import RunVar
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
@@ -630,16 +645,6 @@ class _AsyncioTaskStatus(abc.TaskStatus):
_task_states[task].parent_id = self._parent_id
-def iterate_exceptions(
- exception: BaseException,
-) -> Generator[BaseException, None, None]:
- if isinstance(exception, BaseExceptionGroup):
- for exc in exception.exceptions:
- yield from iterate_exceptions(exc)
- else:
- yield exception
-
-
class TaskGroup(abc.TaskGroup):
def __init__(self) -> None:
self.cancel_scope: CancelScope = CancelScope()
@@ -925,7 +930,7 @@ class StreamReaderWrapper(abc.ByteReceiveStream):
raise EndOfStream
async def aclose(self) -> None:
- self._stream.feed_eof()
+ self._stream.set_exception(ClosedResourceError())
await AsyncIOBackend.checkpoint()
@@ -1073,7 +1078,8 @@ class StreamProtocol(asyncio.Protocol):
self.write_event.set()
def data_received(self, data: bytes) -> None:
- self.read_queue.append(data)
+ # ProactorEventloop sometimes sends bytearray instead of bytes
+ self.read_queue.append(bytes(data))
self.read_event.set()
def eof_received(self) -> bool | None:
@@ -1665,6 +1671,154 @@ class Event(BaseEvent):
return EventStatistics(len(self._event._waiters))
+class Lock(BaseLock):
+ def __new__(cls, *, fast_acquire: bool = False) -> Lock:
+ return object.__new__(cls)
+
+ def __init__(self, *, fast_acquire: bool = False) -> None:
+ self._fast_acquire = fast_acquire
+ self._owner_task: asyncio.Task | None = None
+ self._waiters: deque[tuple[asyncio.Task, asyncio.Future]] = deque()
+
+ async def acquire(self) -> None:
+ if self._owner_task is None and not self._waiters:
+ await AsyncIOBackend.checkpoint_if_cancelled()
+ self._owner_task = current_task()
+
+ # Unless on the "fast path", yield control of the event loop so that other
+ # tasks can run too
+ if not self._fast_acquire:
+ try:
+ await AsyncIOBackend.cancel_shielded_checkpoint()
+ except CancelledError:
+ self.release()
+ raise
+
+ return
+
+ task = cast(asyncio.Task, current_task())
+ fut: asyncio.Future[None] = asyncio.Future()
+ item = task, fut
+ self._waiters.append(item)
+ try:
+ await fut
+ except CancelledError:
+ self._waiters.remove(item)
+ if self._owner_task is task:
+ self.release()
+
+ raise
+
+ self._waiters.remove(item)
+
+ def acquire_nowait(self) -> None:
+ if self._owner_task is None and not self._waiters:
+ self._owner_task = current_task()
+ return
+
+ raise WouldBlock
+
+ def locked(self) -> bool:
+ return self._owner_task is not None
+
+ def release(self) -> None:
+ if self._owner_task != current_task():
+ raise RuntimeError("The current task is not holding this lock")
+
+ for task, fut in self._waiters:
+ if not fut.cancelled():
+ self._owner_task = task
+ fut.set_result(None)
+ return
+
+ self._owner_task = None
+
+ def statistics(self) -> LockStatistics:
+ task_info = AsyncIOTaskInfo(self._owner_task) if self._owner_task else None
+ return LockStatistics(self.locked(), task_info, len(self._waiters))
+
+
+class Semaphore(BaseSemaphore):
+ def __new__(
+ cls,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ) -> Semaphore:
+ return object.__new__(cls)
+
+ def __init__(
+ self,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ):
+ super().__init__(initial_value, max_value=max_value)
+ self._value = initial_value
+ self._max_value = max_value
+ self._fast_acquire = fast_acquire
+ self._waiters: deque[asyncio.Future[None]] = deque()
+
+ async def acquire(self) -> None:
+ if self._value > 0 and not self._waiters:
+ await AsyncIOBackend.checkpoint_if_cancelled()
+ self._value -= 1
+
+ # Unless on the "fast path", yield control of the event loop so that other
+ # tasks can run too
+ if not self._fast_acquire:
+ try:
+ await AsyncIOBackend.cancel_shielded_checkpoint()
+ except CancelledError:
+ self.release()
+ raise
+
+ return
+
+ fut: asyncio.Future[None] = asyncio.Future()
+ self._waiters.append(fut)
+ try:
+ await fut
+ except CancelledError:
+ try:
+ self._waiters.remove(fut)
+ except ValueError:
+ self.release()
+
+ raise
+
+ def acquire_nowait(self) -> None:
+ if self._value == 0:
+ raise WouldBlock
+
+ self._value -= 1
+
+ def release(self) -> None:
+ if self._max_value is not None and self._value == self._max_value:
+ raise ValueError("semaphore released too many times")
+
+ for fut in self._waiters:
+ if not fut.cancelled():
+ fut.set_result(None)
+ self._waiters.remove(fut)
+ return
+
+ self._value += 1
+
+ @property
+ def value(self) -> int:
+ return self._value
+
+ @property
+ def max_value(self) -> int | None:
+ return self._max_value
+
+ def statistics(self) -> SemaphoreStatistics:
+ return SemaphoreStatistics(len(self._waiters))
+
+
class CapacityLimiter(BaseCapacityLimiter):
_total_tokens: float = 0
@@ -1861,7 +2015,9 @@ class AsyncIOTaskInfo(TaskInfo):
if task_state := _task_states.get(task):
if cancel_scope := task_state.cancel_scope:
- return cancel_scope.cancel_called or cancel_scope._parent_cancelled()
+ return cancel_scope.cancel_called or (
+ not cancel_scope.shield and cancel_scope._parent_cancelled()
+ )
return False
@@ -1926,13 +2082,23 @@ class TestRunner(abc.TestRunner):
tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
],
) -> None:
+ from _pytest.outcomes import OutcomeException
+
with receive_stream, self._send_stream:
async for coro, future in receive_stream:
try:
retval = await coro
+ except CancelledError as exc:
+ if not future.cancelled():
+ future.cancel(*exc.args)
+
+ raise
except BaseException as exc:
if not future.cancelled():
future.set_exception(exc)
+
+ if not isinstance(exc, (Exception, OutcomeException)):
+ raise
else:
if not future.cancelled():
future.set_result(retval)
@@ -2114,6 +2280,20 @@ class AsyncIOBackend(AsyncBackend):
return Event()
@classmethod
+ def create_lock(cls, *, fast_acquire: bool) -> abc.Lock:
+ return Lock(fast_acquire=fast_acquire)
+
+ @classmethod
+ def create_semaphore(
+ cls,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ) -> abc.Semaphore:
+ return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire)
+
+ @classmethod
def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
return CapacityLimiter(total_tokens)
@@ -2245,26 +2425,24 @@ class AsyncIOBackend(AsyncBackend):
@classmethod
async def open_process(
cls,
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
- shell: bool,
stdin: int | IO[Any] | None,
stdout: int | IO[Any] | None,
stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
+ **kwargs: Any,
) -> Process:
await cls.checkpoint()
- if shell:
+ if isinstance(command, PathLike):
+ command = os.fspath(command)
+
+ if isinstance(command, (str, bytes)):
process = await asyncio.create_subprocess_shell(
- cast("str | bytes", command),
+ command,
stdin=stdin,
stdout=stdout,
stderr=stderr,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
+ **kwargs,
)
else:
process = await asyncio.create_subprocess_exec(
@@ -2272,9 +2450,7 @@ class AsyncIOBackend(AsyncBackend):
stdin=stdin,
stdout=stdout,
stderr=stderr,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
+ **kwargs,
)
stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
@@ -2289,7 +2465,7 @@ class AsyncIOBackend(AsyncBackend):
name="AnyIO process pool shutdown task",
)
find_root_task().add_done_callback(
- partial(_forcibly_shutdown_process_pool_on_exit, workers)
+ partial(_forcibly_shutdown_process_pool_on_exit, workers) # type:ignore[arg-type]
)
@classmethod
diff --git a/contrib/python/anyio/anyio/_backends/_trio.py b/contrib/python/anyio/anyio/_backends/_trio.py
index cf6f3db789c..9b8369d4c50 100644
--- a/contrib/python/anyio/anyio/_backends/_trio.py
+++ b/contrib/python/anyio/anyio/_backends/_trio.py
@@ -2,6 +2,7 @@ from __future__ import annotations
import array
import math
+import os
import socket
import sys
import types
@@ -25,7 +26,6 @@ from typing import (
ContextManager,
Coroutine,
Generic,
- Mapping,
NoReturn,
Sequence,
TypeVar,
@@ -45,7 +45,14 @@ from trio.lowlevel import (
from trio.socket import SocketType as TrioSocketType
from trio.to_thread import run_sync
-from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
+from .. import (
+ CapacityLimiterStatistics,
+ EventStatistics,
+ LockStatistics,
+ TaskInfo,
+ WouldBlock,
+ abc,
+)
from .._core._eventloop import claim_worker_thread
from .._core._exceptions import (
BrokenResourceError,
@@ -55,12 +62,19 @@ from .._core._exceptions import (
)
from .._core._sockets import convert_ipv6_sockaddr
from .._core._streams import create_memory_object_stream
-from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
+from .._core._synchronization import (
+ CapacityLimiter as BaseCapacityLimiter,
+)
from .._core._synchronization import Event as BaseEvent
-from .._core._synchronization import ResourceGuard
+from .._core._synchronization import Lock as BaseLock
+from .._core._synchronization import (
+ ResourceGuard,
+ SemaphoreStatistics,
+)
+from .._core._synchronization import Semaphore as BaseSemaphore
from .._core._tasks import CancelScope as BaseCancelScope
from ..abc import IPSockAddrType, UDPPacketType, UNIXDatagramPacketType
-from ..abc._eventloop import AsyncBackend
+from ..abc._eventloop import AsyncBackend, StrOrBytesPath
from ..streams.memory import MemoryObjectSendStream
if sys.version_info >= (3, 10):
@@ -637,6 +651,100 @@ class Event(BaseEvent):
self.__original.set()
+class Lock(BaseLock):
+ def __new__(cls, *, fast_acquire: bool = False) -> Lock:
+ return object.__new__(cls)
+
+ def __init__(self, *, fast_acquire: bool = False) -> None:
+ self._fast_acquire = fast_acquire
+ self.__original = trio.Lock()
+
+ async def acquire(self) -> None:
+ if not self._fast_acquire:
+ await self.__original.acquire()
+ return
+
+ # This is the "fast path" where we don't let other tasks run
+ await trio.lowlevel.checkpoint_if_cancelled()
+ try:
+ self.__original.acquire_nowait()
+ except trio.WouldBlock:
+ await self.__original._lot.park()
+
+ def acquire_nowait(self) -> None:
+ try:
+ self.__original.acquire_nowait()
+ except trio.WouldBlock:
+ raise WouldBlock from None
+
+ def locked(self) -> bool:
+ return self.__original.locked()
+
+ def release(self) -> None:
+ self.__original.release()
+
+ def statistics(self) -> LockStatistics:
+ orig_statistics = self.__original.statistics()
+ owner = TrioTaskInfo(orig_statistics.owner) if orig_statistics.owner else None
+ return LockStatistics(
+ orig_statistics.locked, owner, orig_statistics.tasks_waiting
+ )
+
+
+class Semaphore(BaseSemaphore):
+ def __new__(
+ cls,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ) -> Semaphore:
+ return object.__new__(cls)
+
+ def __init__(
+ self,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ) -> None:
+ super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
+ self.__original = trio.Semaphore(initial_value, max_value=max_value)
+
+ async def acquire(self) -> None:
+ if not self._fast_acquire:
+ await self.__original.acquire()
+ return
+
+ # This is the "fast path" where we don't let other tasks run
+ await trio.lowlevel.checkpoint_if_cancelled()
+ try:
+ self.__original.acquire_nowait()
+ except trio.WouldBlock:
+ await self.__original._lot.park()
+
+ def acquire_nowait(self) -> None:
+ try:
+ self.__original.acquire_nowait()
+ except trio.WouldBlock:
+ raise WouldBlock from None
+
+ @property
+ def max_value(self) -> int | None:
+ return self.__original.max_value
+
+ @property
+ def value(self) -> int:
+ return self.__original.value
+
+ def release(self) -> None:
+ self.__original.release()
+
+ def statistics(self) -> SemaphoreStatistics:
+ orig_statistics = self.__original.statistics()
+ return SemaphoreStatistics(orig_statistics.tasks_waiting)
+
+
class CapacityLimiter(BaseCapacityLimiter):
def __new__(
cls,
@@ -916,6 +1024,20 @@ class TrioBackend(AsyncBackend):
return Event()
@classmethod
+ def create_lock(cls, *, fast_acquire: bool) -> Lock:
+ return Lock(fast_acquire=fast_acquire)
+
+ @classmethod
+ def create_semaphore(
+ cls,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ) -> abc.Semaphore:
+ return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire)
+
+ @classmethod
def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
return CapacityLimiter(total_tokens)
@@ -967,26 +1089,39 @@ class TrioBackend(AsyncBackend):
@classmethod
async def open_process(
cls,
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
- shell: bool,
stdin: int | IO[Any] | None,
stdout: int | IO[Any] | None,
stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
+ **kwargs: Any,
) -> Process:
- process = await trio.lowlevel.open_process( # type: ignore[misc]
- command, # type: ignore[arg-type]
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- shell=shell,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
- )
+ def convert_item(item: StrOrBytesPath) -> str:
+ str_or_bytes = os.fspath(item)
+ if isinstance(str_or_bytes, str):
+ return str_or_bytes
+ else:
+ return os.fsdecode(str_or_bytes)
+
+ if isinstance(command, (str, bytes, PathLike)):
+ process = await trio.lowlevel.open_process(
+ convert_item(command),
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ shell=True,
+ **kwargs,
+ )
+ else:
+ process = await trio.lowlevel.open_process(
+ [convert_item(item) for item in command],
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ shell=False,
+ **kwargs,
+ )
+
stdin_stream = SendStreamWrapper(process.stdin) if process.stdin else None
stdout_stream = ReceiveStreamWrapper(process.stdout) if process.stdout else None
stderr_stream = ReceiveStreamWrapper(process.stderr) if process.stderr else None
diff --git a/contrib/python/anyio/anyio/_core/_exceptions.py b/contrib/python/anyio/anyio/_core/_exceptions.py
index 571c3b85316..6e3f8ccc675 100644
--- a/contrib/python/anyio/anyio/_core/_exceptions.py
+++ b/contrib/python/anyio/anyio/_core/_exceptions.py
@@ -1,5 +1,11 @@
from __future__ import annotations
+import sys
+from collections.abc import Generator
+
+if sys.version_info < (3, 11):
+ from exceptiongroup import BaseExceptionGroup
+
class BrokenResourceError(Exception):
"""
@@ -71,3 +77,13 @@ class TypedAttributeLookupError(LookupError):
class WouldBlock(Exception):
"""Raised by ``X_nowait`` functions if ``X()`` would block."""
+
+
+def iterate_exceptions(
+ exception: BaseException,
+) -> Generator[BaseException, None, None]:
+ if isinstance(exception, BaseExceptionGroup):
+ for exc in exception.exceptions:
+ yield from iterate_exceptions(exc)
+ else:
+ yield exception
diff --git a/contrib/python/anyio/anyio/_core/_fileio.py b/contrib/python/anyio/anyio/_core/_fileio.py
index df2057fe342..9503d944bb8 100644
--- a/contrib/python/anyio/anyio/_core/_fileio.py
+++ b/contrib/python/anyio/anyio/_core/_fileio.py
@@ -358,8 +358,26 @@ class Path:
def as_uri(self) -> str:
return self._path.as_uri()
- def match(self, path_pattern: str) -> bool:
- return self._path.match(path_pattern)
+ if sys.version_info >= (3, 13):
+ parser = pathlib.Path.parser
+
+ @classmethod
+ def from_uri(cls, uri: str) -> Path:
+ return Path(pathlib.Path.from_uri(uri))
+
+ def full_match(
+ self, path_pattern: str, *, case_sensitive: bool | None = None
+ ) -> bool:
+ return self._path.full_match(path_pattern, case_sensitive=case_sensitive)
+
+ def match(
+ self, path_pattern: str, *, case_sensitive: bool | None = None
+ ) -> bool:
+ return self._path.match(path_pattern, case_sensitive=case_sensitive)
+ else:
+
+ def match(self, path_pattern: str) -> bool:
+ return self._path.match(path_pattern)
def is_relative_to(self, other: str | PathLike[str]) -> bool:
try:
diff --git a/contrib/python/anyio/anyio/_core/_sockets.py b/contrib/python/anyio/anyio/_core/_sockets.py
index 5e09cdbf0f0..6070c647fd9 100644
--- a/contrib/python/anyio/anyio/_core/_sockets.py
+++ b/contrib/python/anyio/anyio/_core/_sockets.py
@@ -680,19 +680,26 @@ async def setup_unix_local_socket(
:param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM
"""
- path_str: str | bytes | None
+ path_str: str | None
if path is not None:
- path_str = os.fspath(path)
+ path_str = os.fsdecode(path)
- # Copied from pathlib...
- try:
- stat_result = os.stat(path)
- except OSError as e:
- if e.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EBADF, errno.ELOOP):
- raise
- else:
- if stat.S_ISSOCK(stat_result.st_mode):
- os.unlink(path)
+ # Linux abstract namespace sockets aren't backed by a concrete file so skip stat call
+ if not path_str.startswith("\0"):
+ # Copied from pathlib...
+ try:
+ stat_result = os.stat(path)
+ except OSError as e:
+ if e.errno not in (
+ errno.ENOENT,
+ errno.ENOTDIR,
+ errno.EBADF,
+ errno.ELOOP,
+ ):
+ raise
+ else:
+ if stat.S_ISSOCK(stat_result.st_mode):
+ os.unlink(path)
else:
path_str = None
diff --git a/contrib/python/anyio/anyio/_core/_subprocesses.py b/contrib/python/anyio/anyio/_core/_subprocesses.py
index 5d5d7b768a1..1ac2d549dff 100644
--- a/contrib/python/anyio/anyio/_core/_subprocesses.py
+++ b/contrib/python/anyio/anyio/_core/_subprocesses.py
@@ -1,26 +1,41 @@
from __future__ import annotations
-from collections.abc import AsyncIterable, Mapping, Sequence
+import sys
+from collections.abc import AsyncIterable, Iterable, Mapping, Sequence
from io import BytesIO
from os import PathLike
from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess
-from typing import IO, Any, cast
+from typing import IO, Any, Union, cast
from ..abc import Process
from ._eventloop import get_async_backend
from ._tasks import create_task_group
+if sys.version_info >= (3, 10):
+ from typing import TypeAlias
+else:
+ from typing_extensions import TypeAlias
+
+StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
+
async def run_process(
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
input: bytes | None = None,
stdout: int | IO[Any] | None = PIPE,
stderr: int | IO[Any] | None = PIPE,
check: bool = True,
- cwd: str | bytes | PathLike[str] | None = None,
+ cwd: StrOrBytesPath | None = None,
env: Mapping[str, str] | None = None,
+ startupinfo: Any = None,
+ creationflags: int = 0,
start_new_session: bool = False,
+ pass_fds: Sequence[int] = (),
+ user: str | int | None = None,
+ group: str | int | None = None,
+ extra_groups: Iterable[str | int] | None = None,
+ umask: int = -1,
) -> CompletedProcess[bytes]:
"""
Run an external command in a subprocess and wait until it completes.
@@ -40,8 +55,20 @@ async def run_process(
command
:param env: if not ``None``, this mapping replaces the inherited environment
variables from the parent process
+ :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
+ to specify process startup parameters (Windows only)
+ :param creationflags: flags that can be used to control the creation of the
+ subprocess (see :class:`subprocess.Popen` for the specifics)
:param start_new_session: if ``true`` the setsid() system call will be made in the
child process prior to the execution of the subprocess. (POSIX only)
+ :param pass_fds: sequence of file descriptors to keep open between the parent and
+ child processes. (POSIX only)
+ :param user: effective user to run the process as (Python >= 3.9, POSIX only)
+ :param group: effective group to run the process as (Python >= 3.9, POSIX only)
+ :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9,
+ POSIX only)
+ :param umask: if not negative, this umask is applied in the child process before
+ running the given command (Python >= 3.9, POSIX only)
:return: an object representing the completed process
:raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process
exits with a nonzero return code
@@ -62,7 +89,14 @@ async def run_process(
stderr=stderr,
cwd=cwd,
env=env,
+ startupinfo=startupinfo,
+ creationflags=creationflags,
start_new_session=start_new_session,
+ pass_fds=pass_fds,
+ user=user,
+ group=group,
+ extra_groups=extra_groups,
+ umask=umask,
) as process:
stream_contents: list[bytes | None] = [None, None]
async with create_task_group() as tg:
@@ -86,14 +120,21 @@ async def run_process(
async def open_process(
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
stdin: int | IO[Any] | None = PIPE,
stdout: int | IO[Any] | None = PIPE,
stderr: int | IO[Any] | None = PIPE,
- cwd: str | bytes | PathLike[str] | None = None,
+ cwd: StrOrBytesPath | None = None,
env: Mapping[str, str] | None = None,
+ startupinfo: Any = None,
+ creationflags: int = 0,
start_new_session: bool = False,
+ pass_fds: Sequence[int] = (),
+ user: str | int | None = None,
+ group: str | int | None = None,
+ extra_groups: Iterable[str | int] | None = None,
+ umask: int = -1,
) -> Process:
"""
Start an external command in a subprocess.
@@ -111,30 +152,58 @@ async def open_process(
:param cwd: If not ``None``, the working directory is changed before executing
:param env: If env is not ``None``, it must be a mapping that defines the
environment variables for the new process
+ :param creationflags: flags that can be used to control the creation of the
+ subprocess (see :class:`subprocess.Popen` for the specifics)
+ :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
+ to specify process startup parameters (Windows only)
:param start_new_session: if ``true`` the setsid() system call will be made in the
child process prior to the execution of the subprocess. (POSIX only)
+ :param pass_fds: sequence of file descriptors to keep open between the parent and
+ child processes. (POSIX only)
+ :param user: effective user to run the process as (Python >= 3.9; POSIX only)
+ :param group: effective group to run the process as (Python >= 3.9; POSIX only)
+ :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9;
+ POSIX only)
+ :param umask: if not negative, this umask is applied in the child process before
+ running the given command (Python >= 3.9; POSIX only)
:return: an asynchronous process object
"""
- if isinstance(command, (str, bytes)):
- return await get_async_backend().open_process(
- command,
- shell=True,
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
- )
- else:
- return await get_async_backend().open_process(
- command,
- shell=False,
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
- )
+ kwargs: dict[str, Any] = {}
+ if user is not None:
+ if sys.version_info < (3, 9):
+ raise TypeError("the 'user' argument requires Python 3.9 or later")
+
+ kwargs["user"] = user
+
+ if group is not None:
+ if sys.version_info < (3, 9):
+ raise TypeError("the 'group' argument requires Python 3.9 or later")
+
+ kwargs["group"] = group
+
+ if extra_groups is not None:
+ if sys.version_info < (3, 9):
+ raise TypeError("the 'extra_groups' argument requires Python 3.9 or later")
+
+ kwargs["extra_groups"] = group
+
+ if umask >= 0:
+ if sys.version_info < (3, 9):
+ raise TypeError("the 'umask' argument requires Python 3.9 or later")
+
+ kwargs["umask"] = umask
+
+ return await get_async_backend().open_process(
+ command,
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ cwd=cwd,
+ env=env,
+ startupinfo=startupinfo,
+ creationflags=creationflags,
+ start_new_session=start_new_session,
+ pass_fds=pass_fds,
+ **kwargs,
+ )
diff --git a/contrib/python/anyio/anyio/_core/_synchronization.py b/contrib/python/anyio/anyio/_core/_synchronization.py
index b274a31ea2c..023ab73370d 100644
--- a/contrib/python/anyio/anyio/_core/_synchronization.py
+++ b/contrib/python/anyio/anyio/_core/_synchronization.py
@@ -7,9 +7,9 @@ from types import TracebackType
from sniffio import AsyncLibraryNotFoundError
-from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled
+from ..lowlevel import checkpoint
from ._eventloop import get_async_backend
-from ._exceptions import BusyResourceError, WouldBlock
+from ._exceptions import BusyResourceError
from ._tasks import CancelScope
from ._testing import TaskInfo, get_current_task
@@ -137,10 +137,11 @@ class EventAdapter(Event):
class Lock:
- _owner_task: TaskInfo | None = None
-
- def __init__(self) -> None:
- self._waiters: deque[tuple[TaskInfo, Event]] = deque()
+ def __new__(cls, *, fast_acquire: bool = False) -> Lock:
+ try:
+ return get_async_backend().create_lock(fast_acquire=fast_acquire)
+ except AsyncLibraryNotFoundError:
+ return LockAdapter(fast_acquire=fast_acquire)
async def __aenter__(self) -> None:
await self.acquire()
@@ -155,31 +156,7 @@ class Lock:
async def acquire(self) -> None:
"""Acquire the lock."""
- await checkpoint_if_cancelled()
- try:
- self.acquire_nowait()
- except WouldBlock:
- task = get_current_task()
- event = Event()
- token = task, event
- self._waiters.append(token)
- try:
- await event.wait()
- except BaseException:
- if not event.is_set():
- self._waiters.remove(token)
- elif self._owner_task == task:
- self.release()
-
- raise
-
- assert self._owner_task == task
- else:
- try:
- await cancel_shielded_checkpoint()
- except BaseException:
- self.release()
- raise
+ raise NotImplementedError
def acquire_nowait(self) -> None:
"""
@@ -188,37 +165,87 @@ class Lock:
:raises ~anyio.WouldBlock: if the operation would block
"""
- task = get_current_task()
- if self._owner_task == task:
- raise RuntimeError("Attempted to acquire an already held Lock")
+ raise NotImplementedError
- if self._owner_task is not None:
- raise WouldBlock
+ def release(self) -> None:
+ """Release the lock."""
+ raise NotImplementedError
- self._owner_task = task
+ def locked(self) -> bool:
+ """Return True if the lock is currently held."""
+ raise NotImplementedError
+
+ def statistics(self) -> LockStatistics:
+ """
+ Return statistics about the current state of this lock.
+
+ .. versionadded:: 3.0
+ """
+ raise NotImplementedError
+
+
+class LockAdapter(Lock):
+ _internal_lock: Lock | None = None
+
+ def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter:
+ return object.__new__(cls)
+
+ def __init__(self, *, fast_acquire: bool = False):
+ self._fast_acquire = fast_acquire
+
+ @property
+ def _lock(self) -> Lock:
+ if self._internal_lock is None:
+ self._internal_lock = get_async_backend().create_lock(
+ fast_acquire=self._fast_acquire
+ )
+
+ return self._internal_lock
+
+ async def __aenter__(self) -> None:
+ await self._lock.acquire()
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ if self._internal_lock is not None:
+ self._internal_lock.release()
+
+ async def acquire(self) -> None:
+ """Acquire the lock."""
+ await self._lock.acquire()
+
+ def acquire_nowait(self) -> None:
+ """
+ Acquire the lock, without blocking.
+
+ :raises ~anyio.WouldBlock: if the operation would block
+
+ """
+ self._lock.acquire_nowait()
def release(self) -> None:
"""Release the lock."""
- if self._owner_task != get_current_task():
- raise RuntimeError("The current task is not holding this lock")
-
- if self._waiters:
- self._owner_task, event = self._waiters.popleft()
- event.set()
- else:
- del self._owner_task
+ self._lock.release()
def locked(self) -> bool:
"""Return True if the lock is currently held."""
- return self._owner_task is not None
+ return self._lock.locked()
def statistics(self) -> LockStatistics:
"""
Return statistics about the current state of this lock.
.. versionadded:: 3.0
+
"""
- return LockStatistics(self.locked(), self._owner_task, len(self._waiters))
+ if self._internal_lock is None:
+ return LockStatistics(False, None, 0)
+
+ return self._internal_lock.statistics()
class Condition:
@@ -312,7 +339,27 @@ class Condition:
class Semaphore:
- def __init__(self, initial_value: int, *, max_value: int | None = None):
+ def __new__(
+ cls,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ) -> Semaphore:
+ try:
+ return get_async_backend().create_semaphore(
+ initial_value, max_value=max_value, fast_acquire=fast_acquire
+ )
+ except AsyncLibraryNotFoundError:
+ return SemaphoreAdapter(initial_value, max_value=max_value)
+
+ def __init__(
+ self,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ):
if not isinstance(initial_value, int):
raise TypeError("initial_value must be an integer")
if initial_value < 0:
@@ -325,9 +372,7 @@ class Semaphore:
"max_value must be equal to or higher than initial_value"
)
- self._value = initial_value
- self._max_value = max_value
- self._waiters: deque[Event] = deque()
+ self._fast_acquire = fast_acquire
async def __aenter__(self) -> Semaphore:
await self.acquire()
@@ -343,27 +388,7 @@ class Semaphore:
async def acquire(self) -> None:
"""Decrement the semaphore value, blocking if necessary."""
- await checkpoint_if_cancelled()
- try:
- self.acquire_nowait()
- except WouldBlock:
- event = Event()
- self._waiters.append(event)
- try:
- await event.wait()
- except BaseException:
- if not event.is_set():
- self._waiters.remove(event)
- else:
- self.release()
-
- raise
- else:
- try:
- await cancel_shielded_checkpoint()
- except BaseException:
- self.release()
- raise
+ raise NotImplementedError
def acquire_nowait(self) -> None:
"""
@@ -372,30 +397,21 @@ class Semaphore:
:raises ~anyio.WouldBlock: if the operation would block
"""
- if self._value == 0:
- raise WouldBlock
-
- self._value -= 1
+ raise NotImplementedError
def release(self) -> None:
"""Increment the semaphore value."""
- if self._max_value is not None and self._value == self._max_value:
- raise ValueError("semaphore released too many times")
-
- if self._waiters:
- self._waiters.popleft().set()
- else:
- self._value += 1
+ raise NotImplementedError
@property
def value(self) -> int:
"""The current value of the semaphore."""
- return self._value
+ raise NotImplementedError
@property
def max_value(self) -> int | None:
"""The maximum value of the semaphore."""
- return self._max_value
+ raise NotImplementedError
def statistics(self) -> SemaphoreStatistics:
"""
@@ -403,7 +419,66 @@ class Semaphore:
.. versionadded:: 3.0
"""
- return SemaphoreStatistics(len(self._waiters))
+ raise NotImplementedError
+
+
+class SemaphoreAdapter(Semaphore):
+ _internal_semaphore: Semaphore | None = None
+
+ def __new__(
+ cls,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ) -> SemaphoreAdapter:
+ return object.__new__(cls)
+
+ def __init__(
+ self,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ) -> None:
+ super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
+ self._initial_value = initial_value
+ self._max_value = max_value
+
+ @property
+ def _semaphore(self) -> Semaphore:
+ if self._internal_semaphore is None:
+ self._internal_semaphore = get_async_backend().create_semaphore(
+ self._initial_value, max_value=self._max_value
+ )
+
+ return self._internal_semaphore
+
+ async def acquire(self) -> None:
+ await self._semaphore.acquire()
+
+ def acquire_nowait(self) -> None:
+ self._semaphore.acquire_nowait()
+
+ def release(self) -> None:
+ self._semaphore.release()
+
+ @property
+ def value(self) -> int:
+ if self._internal_semaphore is None:
+ return self._initial_value
+
+ return self._semaphore.value
+
+ @property
+ def max_value(self) -> int | None:
+ return self._max_value
+
+ def statistics(self) -> SemaphoreStatistics:
+ if self._internal_semaphore is None:
+ return SemaphoreStatistics(tasks_waiting=0)
+
+ return self._semaphore.statistics()
class CapacityLimiter:
diff --git a/contrib/python/anyio/anyio/abc/_eventloop.py b/contrib/python/anyio/anyio/abc/_eventloop.py
index a50afefaa00..2c73bb9ffb8 100644
--- a/contrib/python/anyio/anyio/abc/_eventloop.py
+++ b/contrib/python/anyio/anyio/abc/_eventloop.py
@@ -3,7 +3,7 @@ from __future__ import annotations
import math
import sys
from abc import ABCMeta, abstractmethod
-from collections.abc import AsyncIterator, Awaitable, Mapping
+from collections.abc import AsyncIterator, Awaitable
from os import PathLike
from signal import Signals
from socket import AddressFamily, SocketKind, socket
@@ -15,6 +15,7 @@ from typing import (
ContextManager,
Sequence,
TypeVar,
+ Union,
overload,
)
@@ -23,10 +24,13 @@ if sys.version_info >= (3, 11):
else:
from typing_extensions import TypeVarTuple, Unpack
-if TYPE_CHECKING:
- from typing import Literal
+if sys.version_info >= (3, 10):
+ from typing import TypeAlias
+else:
+ from typing_extensions import TypeAlias
- from .._core._synchronization import CapacityLimiter, Event
+if TYPE_CHECKING:
+ from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
from .._core._tasks import CancelScope
from .._core._testing import TaskInfo
from ..from_thread import BlockingPortal
@@ -46,6 +50,7 @@ if TYPE_CHECKING:
T_Retval = TypeVar("T_Retval")
PosArgsT = TypeVarTuple("PosArgsT")
+StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
class AsyncBackend(metaclass=ABCMeta):
@@ -169,6 +174,22 @@ class AsyncBackend(metaclass=ABCMeta):
@classmethod
@abstractmethod
+ def create_lock(cls, *, fast_acquire: bool) -> Lock:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def create_semaphore(
+ cls,
+ initial_value: int,
+ *,
+ max_value: int | None = None,
+ fast_acquire: bool = False,
+ ) -> Semaphore:
+ pass
+
+ @classmethod
+ @abstractmethod
def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
pass
@@ -214,50 +235,15 @@ class AsyncBackend(metaclass=ABCMeta):
pass
@classmethod
- @overload
- async def open_process(
- cls,
- command: str | bytes,
- *,
- shell: Literal[True],
- stdin: int | IO[Any] | None,
- stdout: int | IO[Any] | None,
- stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike[str] | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
- ) -> Process:
- pass
-
- @classmethod
- @overload
- async def open_process(
- cls,
- command: Sequence[str | bytes],
- *,
- shell: Literal[False],
- stdin: int | IO[Any] | None,
- stdout: int | IO[Any] | None,
- stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike[str] | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
- ) -> Process:
- pass
-
- @classmethod
@abstractmethod
async def open_process(
cls,
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
- shell: bool,
stdin: int | IO[Any] | None,
stdout: int | IO[Any] | None,
stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike[str] | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
+ **kwargs: Any,
) -> Process:
pass
diff --git a/contrib/python/anyio/anyio/from_thread.py b/contrib/python/anyio/anyio/from_thread.py
index 88a854bb919..b8785845bad 100644
--- a/contrib/python/anyio/anyio/from_thread.py
+++ b/contrib/python/anyio/anyio/from_thread.py
@@ -1,19 +1,18 @@
from __future__ import annotations
import sys
-import threading
from collections.abc import Awaitable, Callable, Generator
-from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
+from concurrent.futures import Future
from contextlib import AbstractContextManager, contextmanager
from dataclasses import dataclass, field
from inspect import isawaitable
+from threading import Lock, Thread, get_ident
from types import TracebackType
from typing import (
Any,
AsyncContextManager,
ContextManager,
Generic,
- Iterable,
TypeVar,
cast,
overload,
@@ -146,7 +145,7 @@ class BlockingPortal:
return get_async_backend().create_blocking_portal()
def __init__(self) -> None:
- self._event_loop_thread_id: int | None = threading.get_ident()
+ self._event_loop_thread_id: int | None = get_ident()
self._stop_event = Event()
self._task_group = create_task_group()
self._cancelled_exc_class = get_cancelled_exc_class()
@@ -167,7 +166,7 @@ class BlockingPortal:
def _check_running(self) -> None:
if self._event_loop_thread_id is None:
raise RuntimeError("This portal is not running")
- if self._event_loop_thread_id == threading.get_ident():
+ if self._event_loop_thread_id == get_ident():
raise RuntimeError(
"This method cannot be called from the event loop thread"
)
@@ -202,7 +201,7 @@ class BlockingPortal:
def callback(f: Future[T_Retval]) -> None:
if f.cancelled() and self._event_loop_thread_id not in (
None,
- threading.get_ident(),
+ get_ident(),
):
self.call(scope.cancel)
@@ -411,7 +410,7 @@ class BlockingPortalProvider:
backend: str = "asyncio"
backend_options: dict[str, Any] | None = None
- _lock: threading.Lock = field(init=False, default_factory=threading.Lock)
+ _lock: Lock = field(init=False, default_factory=Lock)
_leases: int = field(init=False, default=0)
_portal: BlockingPortal = field(init=False)
_portal_cm: AbstractContextManager[BlockingPortal] | None = field(
@@ -469,43 +468,37 @@ def start_blocking_portal(
async def run_portal() -> None:
async with BlockingPortal() as portal_:
- if future.set_running_or_notify_cancel():
- future.set_result(portal_)
- await portal_.sleep_until_stopped()
+ future.set_result(portal_)
+ await portal_.sleep_until_stopped()
+
+ def run_blocking_portal() -> None:
+ if future.set_running_or_notify_cancel():
+ try:
+ _eventloop.run(
+ run_portal, backend=backend, backend_options=backend_options
+ )
+ except BaseException as exc:
+ if not future.done():
+ future.set_exception(exc)
future: Future[BlockingPortal] = Future()
- with ThreadPoolExecutor(1) as executor:
- run_future = executor.submit(
- _eventloop.run, # type: ignore[arg-type]
- run_portal,
- backend=backend,
- backend_options=backend_options,
- )
+ thread = Thread(target=run_blocking_portal, daemon=True)
+ thread.start()
+ try:
+ cancel_remaining_tasks = False
+ portal = future.result()
try:
- wait(
- cast(Iterable[Future], [run_future, future]),
- return_when=FIRST_COMPLETED,
- )
+ yield portal
except BaseException:
- future.cancel()
- run_future.cancel()
+ cancel_remaining_tasks = True
raise
-
- if future.done():
- portal = future.result()
- cancel_remaining_tasks = False
+ finally:
try:
- yield portal
- except BaseException:
- cancel_remaining_tasks = True
- raise
- finally:
- try:
- portal.call(portal.stop, cancel_remaining_tasks)
- except RuntimeError:
- pass
-
- run_future.result()
+ portal.call(portal.stop, cancel_remaining_tasks)
+ except RuntimeError:
+ pass
+ finally:
+ thread.join()
def check_cancelled() -> None:
diff --git a/contrib/python/anyio/anyio/pytest_plugin.py b/contrib/python/anyio/anyio/pytest_plugin.py
index a8dd6f3e3f1..558c72ec287 100644
--- a/contrib/python/anyio/anyio/pytest_plugin.py
+++ b/contrib/python/anyio/anyio/pytest_plugin.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import sys
from collections.abc import Iterator
from contextlib import ExitStack, contextmanager
from inspect import isasyncgenfunction, iscoroutinefunction
@@ -7,10 +8,15 @@ from typing import Any, Dict, Tuple, cast
import pytest
import sniffio
+from _pytest.outcomes import Exit
from ._core._eventloop import get_all_backends, get_async_backend
+from ._core._exceptions import iterate_exceptions
from .abc import TestRunner
+if sys.version_info < (3, 11):
+ from exceptiongroup import ExceptionGroup
+
_current_runner: TestRunner | None = None
_runner_stack: ExitStack | None = None
_runner_leases = 0
@@ -121,7 +127,14 @@ def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
funcargs = pyfuncitem.funcargs
testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
with get_runner(backend_name, backend_options) as runner:
- runner.run_test(pyfuncitem.obj, testargs)
+ try:
+ runner.run_test(pyfuncitem.obj, testargs)
+ except ExceptionGroup as excgrp:
+ for exc in iterate_exceptions(excgrp):
+ if isinstance(exc, (Exit, KeyboardInterrupt, SystemExit)):
+ raise exc from excgrp
+
+ raise
return True
diff --git a/contrib/python/anyio/anyio/streams/memory.py b/contrib/python/anyio/anyio/streams/memory.py
index 6840e6242ff..b547aa6a48c 100644
--- a/contrib/python/anyio/anyio/streams/memory.py
+++ b/contrib/python/anyio/anyio/streams/memory.py
@@ -38,6 +38,12 @@ class MemoryObjectItemReceiver(Generic[T_Item]):
task_info: TaskInfo = field(init=False, default_factory=get_current_task)
item: T_Item = field(init=False)
+ def __repr__(self) -> str:
+ # When item is not defined, we get following error with default __repr__:
+ # AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'
+ item = getattr(self, "item", None)
+ return f"{self.__class__.__name__}(task_info={self.task_info}, item={item!r})"
+
@dataclass(eq=False)
class MemoryObjectStreamState(Generic[T_Item]):
@@ -175,7 +181,7 @@ class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]):
def __del__(self) -> None:
if not self._closed:
warnings.warn(
- f"Unclosed <{self.__class__.__name__}>",
+ f"Unclosed <{self.__class__.__name__} at {id(self):x}>",
ResourceWarning,
source=self,
)
@@ -305,7 +311,7 @@ class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]):
def __del__(self) -> None:
if not self._closed:
warnings.warn(
- f"Unclosed <{self.__class__.__name__}>",
+ f"Unclosed <{self.__class__.__name__} at {id(self):x}>",
ResourceWarning,
source=self,
)
diff --git a/contrib/python/anyio/anyio/to_process.py b/contrib/python/anyio/anyio/to_process.py
index 1ff06f0b259..5050dee21e9 100644
--- a/contrib/python/anyio/anyio/to_process.py
+++ b/contrib/python/anyio/anyio/to_process.py
@@ -223,7 +223,7 @@ def process_worker() -> None:
main_module_path: str | None
sys.path, main_module_path = args
del sys.modules["__main__"]
- if main_module_path:
+ if main_module_path and os.path.isfile(main_module_path):
# Load the parent's main module but as __mp_main__ instead of
# __main__ (like multiprocessing does) to avoid infinite recursion
try:
@@ -234,7 +234,6 @@ def process_worker() -> None:
sys.modules["__main__"] = main
except BaseException as exc:
exception = exc
-
try:
if exception is not None:
status = b"EXCEPTION"
diff --git a/contrib/python/anyio/ya.make b/contrib/python/anyio/ya.make
index 9062121337b..c166d2627f0 100644
--- a/contrib/python/anyio/ya.make
+++ b/contrib/python/anyio/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(4.4.0)
+VERSION(4.5.0)
LICENSE(MIT)