diff options
| author | robot-piglet <[email protected]> | 2025-01-20 16:38:03 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2025-01-20 16:53:12 +0300 |
| commit | b6a2451a8ed4ad6c210751fcf6ae3d28998235f1 (patch) | |
| tree | f91f69dd400095b92826c42ccbdbe4a97aa3d536 /contrib/python/anyio | |
| parent | 13b3cd284513a5d3d21e207821e73f1dd58988b7 (diff) | |
Intermediate changes
commit_hash:68dae89c5b8414a342b134a250a7365b7b96e259
Diffstat (limited to 'contrib/python/anyio')
| -rw-r--r-- | contrib/python/anyio/.dist-info/METADATA | 5 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/__init__.py | 1 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_backends/_asyncio.py | 120 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_backends/_trio.py | 8 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py | 17 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_exceptions.py | 37 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_fileio.py | 57 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_synchronization.py | 3 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/_core/_tasks.py | 2 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/to_interpreter.py | 218 | ||||
| -rw-r--r-- | contrib/python/anyio/anyio/to_process.py | 2 | ||||
| -rw-r--r-- | contrib/python/anyio/ya.make | 3 |
12 files changed, 378 insertions, 95 deletions
diff --git a/contrib/python/anyio/.dist-info/METADATA b/contrib/python/anyio/.dist-info/METADATA index 5d0f7c91301..7b114cdbe89 100644 --- a/contrib/python/anyio/.dist-info/METADATA +++ b/contrib/python/anyio/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: anyio -Version: 4.7.0 +Version: 4.8.0 Summary: High level compatibility layer for multiple asynchronous event loop implementations Author-email: Alex Grönholm <[email protected]> License: MIT @@ -36,10 +36,9 @@ Requires-Dist: exceptiongroup>=1.2.0; extra == "test" Requires-Dist: hypothesis>=4.0; extra == "test" Requires-Dist: psutil>=5.9; extra == "test" Requires-Dist: pytest>=7.0; extra == "test" -Requires-Dist: pytest-mock>=3.6.1; extra == "test" Requires-Dist: trustme; extra == "test" Requires-Dist: truststore>=0.9.1; python_version >= "3.10" and extra == "test" -Requires-Dist: uvloop>=0.21; (platform_python_implementation == "CPython" and platform_system != "Windows") and extra == "test" +Requires-Dist: uvloop>=0.21; (platform_python_implementation == "CPython" and platform_system != "Windows" and python_version < "3.14") and extra == "test" Provides-Extra: doc Requires-Dist: packaging; extra == "doc" Requires-Dist: Sphinx~=7.4; extra == "doc" diff --git a/contrib/python/anyio/anyio/__init__.py b/contrib/python/anyio/anyio/__init__.py index 0738e595830..098312599f6 100644 --- a/contrib/python/anyio/anyio/__init__.py +++ b/contrib/python/anyio/anyio/__init__.py @@ -8,6 +8,7 @@ from ._core._eventloop import sleep as sleep from ._core._eventloop import sleep_forever as sleep_forever from ._core._eventloop import sleep_until as sleep_until from ._core._exceptions import BrokenResourceError as BrokenResourceError +from ._core._exceptions import BrokenWorkerIntepreter as BrokenWorkerIntepreter from ._core._exceptions import BrokenWorkerProcess as BrokenWorkerProcess from ._core._exceptions import BusyResourceError as BusyResourceError from ._core._exceptions import ClosedResourceError as ClosedResourceError diff --git a/contrib/python/anyio/anyio/_backends/_asyncio.py b/contrib/python/anyio/anyio/_backends/_asyncio.py index 0b7479d2649..76a400c1cb8 100644 --- a/contrib/python/anyio/anyio/_backends/_asyncio.py +++ b/contrib/python/anyio/anyio/_backends/_asyncio.py @@ -28,8 +28,6 @@ from collections.abc import ( Collection, Coroutine, Iterable, - Iterator, - MutableMapping, Sequence, ) from concurrent.futures import Future @@ -49,7 +47,7 @@ from queue import Queue from signal import Signals from socket import AddressFamily, SocketKind from threading import Thread -from types import TracebackType +from types import CodeType, TracebackType from typing import ( IO, TYPE_CHECKING, @@ -449,7 +447,7 @@ class CancelScope(BaseCancelScope): exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, - ) -> bool | None: + ) -> bool: del exc_tb if not self._active: @@ -677,45 +675,7 @@ class TaskState: self.cancel_scope = cancel_scope -class TaskStateStore(MutableMapping["Awaitable[Any] | asyncio.Task", TaskState]): - def __init__(self) -> None: - self._task_states = WeakKeyDictionary[asyncio.Task, TaskState]() - self._preliminary_task_states: dict[Awaitable[Any], TaskState] = {} - - def __getitem__(self, key: Awaitable[Any] | asyncio.Task, /) -> TaskState: - assert isinstance(key, asyncio.Task) - try: - return self._task_states[key] - except KeyError: - if coro := key.get_coro(): - if state := self._preliminary_task_states.get(coro): - return state - - raise KeyError(key) - - def __setitem__( - self, key: asyncio.Task | Awaitable[Any], value: TaskState, / - ) -> None: - if isinstance(key, asyncio.Task): - self._task_states[key] = value - else: - self._preliminary_task_states[key] = value - - def __delitem__(self, key: asyncio.Task | Awaitable[Any], /) -> None: - if isinstance(key, asyncio.Task): - del self._task_states[key] - else: - del self._preliminary_task_states[key] - - def __len__(self) -> int: - return len(self._task_states) + len(self._preliminary_task_states) - - def __iter__(self) -> Iterator[Awaitable[Any] | asyncio.Task]: - yield from self._task_states - yield from self._preliminary_task_states - - -_task_states = TaskStateStore() +_task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary() # @@ -741,24 +701,10 @@ class _AsyncioTaskStatus(abc.TaskStatus): _task_states[task].parent_id = self._parent_id -async def _wait(tasks: Iterable[asyncio.Task[object]]) -> None: - tasks = set(tasks) - waiter = get_running_loop().create_future() - - def on_completion(task: asyncio.Task[object]) -> None: - tasks.discard(task) - if not tasks and not waiter.done(): - waiter.set_result(None) - - for task in tasks: - task.add_done_callback(on_completion) - del task - - try: - await waiter - finally: - while tasks: - tasks.pop().remove_done_callback(on_completion) +if sys.version_info >= (3, 12): + _eager_task_factory_code: CodeType | None = asyncio.eager_task_factory.__code__ +else: + _eager_task_factory_code = None class TaskGroup(abc.TaskGroup): @@ -767,6 +713,7 @@ class TaskGroup(abc.TaskGroup): self._active = False self._exceptions: list[BaseException] = [] self._tasks: set[asyncio.Task] = set() + self._on_completed_fut: asyncio.Future[None] | None = None async def __aenter__(self) -> TaskGroup: self.cancel_scope.__enter__() @@ -785,12 +732,15 @@ class TaskGroup(abc.TaskGroup): if not isinstance(exc_val, CancelledError): self._exceptions.append(exc_val) + loop = get_running_loop() try: if self._tasks: with CancelScope() as wait_scope: while self._tasks: + self._on_completed_fut = loop.create_future() + try: - await _wait(self._tasks) + await self._on_completed_fut except CancelledError as exc: # Shield the scope against further cancellation attempts, # as they're not productive (#695) @@ -805,6 +755,8 @@ class TaskGroup(abc.TaskGroup): and not is_anyio_cancellation(exc) ): exc_val = exc + + self._on_completed_fut = None else: # If there are no child tasks to wait on, run at least one checkpoint # anyway @@ -835,13 +787,19 @@ class TaskGroup(abc.TaskGroup): task_status_future: asyncio.Future | None = None, ) -> asyncio.Task: def task_done(_task: asyncio.Task) -> None: - # task_state = _task_states[_task] + task_state = _task_states[_task] assert task_state.cancel_scope is not None assert _task in task_state.cancel_scope._tasks task_state.cancel_scope._tasks.remove(_task) self._tasks.remove(task) del _task_states[_task] + if self._on_completed_fut is not None and not self._tasks: + try: + self._on_completed_fut.set_result(None) + except asyncio.InvalidStateError: + pass + try: exc = _task.exception() except CancelledError as e: @@ -892,26 +850,25 @@ class TaskGroup(abc.TaskGroup): f"the return value ({coro!r}) is not a coroutine object" ) - # Make the spawned task inherit the task group's cancel scope - _task_states[coro] = task_state = TaskState( - parent_id=parent_id, cancel_scope=self.cancel_scope - ) name = get_callable_name(func) if name is None else str(name) - try: + loop = asyncio.get_running_loop() + if ( + (factory := loop.get_task_factory()) + and getattr(factory, "__code__", None) is _eager_task_factory_code + and (closure := getattr(factory, "__closure__", None)) + ): + custom_task_constructor = closure[0].cell_contents + task = custom_task_constructor(coro, loop=loop, name=name) + else: task = create_task(coro, name=name) - finally: - del _task_states[coro] - _task_states[task] = task_state + # Make the spawned task inherit the task group's cancel scope + _task_states[task] = TaskState( + parent_id=parent_id, cancel_scope=self.cancel_scope + ) self.cancel_scope._tasks.add(task) self._tasks.add(task) - - if task.done(): - # This can happen with eager task factories - task_done(task) - else: - task.add_done_callback(task_done) - + task.add_done_callback(task_done) return task def start_soon( @@ -2114,10 +2071,9 @@ class _SignalReceiver: exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, - ) -> bool | None: + ) -> None: for sig in self._handled_signals: self._loop.remove_signal_handler(sig) - return None def __aiter__(self) -> _SignalReceiver: return self @@ -2446,7 +2402,7 @@ class AsyncIOBackend(AsyncBackend): return CapacityLimiter(total_tokens) @classmethod - async def run_sync_in_worker_thread( + async def run_sync_in_worker_thread( # type: ignore[return] cls, func: Callable[[Unpack[PosArgsT]], T_Retval], args: tuple[Unpack[PosArgsT]], @@ -2468,7 +2424,7 @@ class AsyncIOBackend(AsyncBackend): async with limiter or cls.current_default_thread_limiter(): with CancelScope(shield=not abandon_on_cancel) as scope: - future: asyncio.Future = asyncio.Future() + future = asyncio.Future[T_Retval]() root_task = find_root_task() if not idle_workers: worker = WorkerThread(root_task, workers, idle_workers) diff --git a/contrib/python/anyio/anyio/_backends/_trio.py b/contrib/python/anyio/anyio/_backends/_trio.py index 70a0a605781..32ae8ace7b4 100644 --- a/contrib/python/anyio/anyio/_backends/_trio.py +++ b/contrib/python/anyio/anyio/_backends/_trio.py @@ -132,8 +132,7 @@ class CancelScope(BaseCancelScope): exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, - ) -> bool | None: - # https://github.com/python-trio/trio-typing/pull/79 + ) -> bool: return self.__original.__exit__(exc_type, exc_val, exc_tb) def cancel(self) -> None: @@ -186,9 +185,10 @@ class TaskGroup(abc.TaskGroup): exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, - ) -> bool | None: + ) -> bool: try: - return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb) + # trio.Nursery.__exit__ returns bool; .open_nursery has wrong type + return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb) # type: ignore[return-value] except BaseExceptionGroup as exc: if not exc.split(trio.Cancelled)[1]: raise trio.Cancelled._create() from exc diff --git a/contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py b/contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py index d98c3040721..f4d18cf0429 100644 --- a/contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py +++ b/contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py @@ -21,6 +21,23 @@ class Selector: self._send, self._receive = socket.socketpair() self._send.setblocking(False) self._receive.setblocking(False) + # This somewhat reduces the amount of memory wasted queueing up data + # for wakeups. With these settings, maximum number of 1-byte sends + # before getting BlockingIOError: + # Linux 4.8: 6 + # macOS (darwin 15.5): 1 + # Windows 10: 525347 + # Windows you're weird. (And on Windows setting SNDBUF to 0 makes send + # blocking, even on non-blocking sockets, so don't do that.) + self._receive.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1) + self._send.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1) + # On Windows this is a TCP socket so this might matter. On other + # platforms this fails b/c AF_UNIX sockets aren't actually TCP. + try: + self._send.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + except OSError: + pass + self._selector.register(self._receive, EVENT_READ) self._closed = False diff --git a/contrib/python/anyio/anyio/_core/_exceptions.py b/contrib/python/anyio/anyio/_core/_exceptions.py index 97ea3130414..16b94482c06 100644 --- a/contrib/python/anyio/anyio/_core/_exceptions.py +++ b/contrib/python/anyio/anyio/_core/_exceptions.py @@ -2,6 +2,8 @@ from __future__ import annotations import sys from collections.abc import Generator +from textwrap import dedent +from typing import Any if sys.version_info < (3, 11): from exceptiongroup import BaseExceptionGroup @@ -21,6 +23,41 @@ class BrokenWorkerProcess(Exception): """ +class BrokenWorkerIntepreter(Exception): + """ + Raised by :meth:`~anyio.to_interpreter.run_sync` if an unexpected exception is + raised in the subinterpreter. + """ + + def __init__(self, excinfo: Any): + # This was adapted from concurrent.futures.interpreter.ExecutionFailed + msg = excinfo.formatted + if not msg: + if excinfo.type and excinfo.msg: + msg = f"{excinfo.type.__name__}: {excinfo.msg}" + else: + msg = excinfo.type.__name__ or excinfo.msg + + super().__init__(msg) + self.excinfo = excinfo + + def __str__(self) -> str: + try: + formatted = self.excinfo.errdisplay + except Exception: + return super().__str__() + else: + return dedent( + f""" + {super().__str__()} + + Uncaught in the interpreter: + + {formatted} + """.strip() + ) + + class BusyResourceError(Exception): """ Raised when two tasks are trying to read from or write to the same resource diff --git a/contrib/python/anyio/anyio/_core/_fileio.py b/contrib/python/anyio/anyio/_core/_fileio.py index ef2930e480e..4e34f2addc5 100644 --- a/contrib/python/anyio/anyio/_core/_fileio.py +++ b/contrib/python/anyio/anyio/_core/_fileio.py @@ -3,7 +3,13 @@ from __future__ import annotations import os import pathlib import sys -from collections.abc import AsyncIterator, Callable, Iterable, Iterator, Sequence +from collections.abc import ( + AsyncIterator, + Callable, + Iterable, + Iterator, + Sequence, +) from dataclasses import dataclass from functools import partial from os import PathLike @@ -220,11 +226,15 @@ class Path: Some methods may be unavailable or have limited functionality, based on the Python version: + * :meth:`~pathlib.Path.copy` (available on Python 3.14 or later) + * :meth:`~pathlib.Path.copy_into` (available on Python 3.14 or later) * :meth:`~pathlib.Path.from_uri` (available on Python 3.13 or later) * :meth:`~pathlib.Path.full_match` (available on Python 3.13 or later) * :meth:`~pathlib.Path.is_junction` (available on Python 3.12 or later) * :meth:`~pathlib.Path.match` (the ``case_sensitive`` paramater is only available on Python 3.13 or later) + * :meth:`~pathlib.Path.move` (available on Python 3.14 or later) + * :meth:`~pathlib.Path.move_into` (available on Python 3.14 or later) * :meth:`~pathlib.Path.relative_to` (the ``walk_up`` parameter is only available on Python 3.12 or later) * :meth:`~pathlib.Path.walk` (available on Python 3.12 or later) @@ -396,6 +406,51 @@ class Path: def match(self, path_pattern: str) -> bool: return self._path.match(path_pattern) + if sys.version_info >= (3, 14): + + async def copy( + self, + target: str | os.PathLike[str], + *, + follow_symlinks: bool = True, + dirs_exist_ok: bool = False, + preserve_metadata: bool = False, + ) -> Path: + func = partial( + self._path.copy, + follow_symlinks=follow_symlinks, + dirs_exist_ok=dirs_exist_ok, + preserve_metadata=preserve_metadata, + ) + return Path(await to_thread.run_sync(func, target)) + + async def copy_into( + self, + target_dir: str | os.PathLike[str], + *, + follow_symlinks: bool = True, + dirs_exist_ok: bool = False, + preserve_metadata: bool = False, + ) -> Path: + func = partial( + self._path.copy_into, + follow_symlinks=follow_symlinks, + dirs_exist_ok=dirs_exist_ok, + preserve_metadata=preserve_metadata, + ) + return Path(await to_thread.run_sync(func, target_dir)) + + async def move(self, target: str | os.PathLike[str]) -> Path: + # Upstream does not handle anyio.Path properly as a PathLike + target = pathlib.Path(target) + return Path(await to_thread.run_sync(self._path.move, target)) + + async def move_into( + self, + target_dir: str | os.PathLike[str], + ) -> Path: + return Path(await to_thread.run_sync(self._path.move_into, target_dir)) + def is_relative_to(self, other: str | PathLike[str]) -> bool: try: self.relative_to(other) diff --git a/contrib/python/anyio/anyio/_core/_synchronization.py b/contrib/python/anyio/anyio/_core/_synchronization.py index 7878ba66682..a6331328d4f 100644 --- a/contrib/python/anyio/anyio/_core/_synchronization.py +++ b/contrib/python/anyio/anyio/_core/_synchronization.py @@ -728,6 +728,5 @@ class ResourceGuard: exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, - ) -> bool | None: + ) -> None: self._guarded = False - return None diff --git a/contrib/python/anyio/anyio/_core/_tasks.py b/contrib/python/anyio/anyio/_core/_tasks.py index 2f21ea20b13..fe49015102b 100644 --- a/contrib/python/anyio/anyio/_core/_tasks.py +++ b/contrib/python/anyio/anyio/_core/_tasks.py @@ -88,7 +88,7 @@ class CancelScope: exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, - ) -> bool | None: + ) -> bool: raise NotImplementedError diff --git a/contrib/python/anyio/anyio/to_interpreter.py b/contrib/python/anyio/anyio/to_interpreter.py new file mode 100644 index 00000000000..bcde24d3d1d --- /dev/null +++ b/contrib/python/anyio/anyio/to_interpreter.py @@ -0,0 +1,218 @@ +from __future__ import annotations + +import atexit +import os +import pickle +import sys +from collections import deque +from collections.abc import Callable +from textwrap import dedent +from typing import Any, Final, TypeVar + +from . import current_time, to_thread +from ._core._exceptions import BrokenWorkerIntepreter +from ._core._synchronization import CapacityLimiter +from .lowlevel import RunVar + +if sys.version_info >= (3, 11): + from typing import TypeVarTuple, Unpack +else: + from typing_extensions import TypeVarTuple, Unpack + +UNBOUND: Final = 2 # I have no clue how this works, but it was used in the stdlib +FMT_UNPICKLED: Final = 0 +FMT_PICKLED: Final = 1 +DEFAULT_CPU_COUNT: Final = 8 # this is just an arbitrarily selected value +MAX_WORKER_IDLE_TIME = ( + 30 # seconds a subinterpreter can be idle before becoming eligible for pruning +) + +T_Retval = TypeVar("T_Retval") +PosArgsT = TypeVarTuple("PosArgsT") + +_idle_workers = RunVar[deque["Worker"]]("_available_workers") +_default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter") + + +class Worker: + _run_func = compile( + dedent(""" + import _interpqueues as queues + import _interpreters as interpreters + from pickle import loads, dumps, HIGHEST_PROTOCOL + + item = queues.get(queue_id)[0] + try: + func, args = loads(item) + retval = func(*args) + except BaseException as exc: + is_exception = True + retval = exc + else: + is_exception = False + + try: + queues.put(queue_id, (retval, is_exception), FMT_UNPICKLED, UNBOUND) + except interpreters.NotShareableError: + retval = dumps(retval, HIGHEST_PROTOCOL) + queues.put(queue_id, (retval, is_exception), FMT_PICKLED, UNBOUND) + """), + "<string>", + "exec", + ) + + last_used: float = 0 + + _initialized: bool = False + _interpreter_id: int + _queue_id: int + + def initialize(self) -> None: + import _interpqueues as queues + import _interpreters as interpreters + + self._interpreter_id = interpreters.create() + self._queue_id = queues.create(2, FMT_UNPICKLED, UNBOUND) # type: ignore[call-arg] + self._initialized = True + interpreters.set___main___attrs( + self._interpreter_id, + { + "queue_id": self._queue_id, + "FMT_PICKLED": FMT_PICKLED, + "FMT_UNPICKLED": FMT_UNPICKLED, + "UNBOUND": UNBOUND, + }, + ) + + def destroy(self) -> None: + import _interpqueues as queues + import _interpreters as interpreters + + if self._initialized: + interpreters.destroy(self._interpreter_id) + queues.destroy(self._queue_id) + + def _call( + self, + func: Callable[..., T_Retval], + args: tuple[Any], + ) -> tuple[Any, bool]: + import _interpqueues as queues + import _interpreters as interpreters + + if not self._initialized: + self.initialize() + + payload = pickle.dumps((func, args), pickle.HIGHEST_PROTOCOL) + queues.put(self._queue_id, payload, FMT_PICKLED, UNBOUND) # type: ignore[call-arg] + + res: Any + is_exception: bool + if exc_info := interpreters.exec(self._interpreter_id, self._run_func): # type: ignore[func-returns-value,arg-type] + raise BrokenWorkerIntepreter(exc_info) + + (res, is_exception), fmt = queues.get(self._queue_id)[:2] + if fmt == FMT_PICKLED: + res = pickle.loads(res) + + return res, is_exception + + async def call( + self, + func: Callable[..., T_Retval], + args: tuple[Any], + limiter: CapacityLimiter, + ) -> T_Retval: + result, is_exception = await to_thread.run_sync( + self._call, + func, + args, + limiter=limiter, + ) + if is_exception: + raise result + + return result + + +def _stop_workers(workers: deque[Worker]) -> None: + for worker in workers: + worker.destroy() + + workers.clear() + + +async def run_sync( + func: Callable[[Unpack[PosArgsT]], T_Retval], + *args: Unpack[PosArgsT], + limiter: CapacityLimiter | None = None, +) -> T_Retval: + """ + Call the given function with the given arguments in a subinterpreter. + + If the ``cancellable`` option is enabled and the task waiting for its completion is + cancelled, the call will still run its course but its return value (or any raised + exception) will be ignored. + + .. warning:: This feature is **experimental**. The upstream interpreter API has not + yet been finalized or thoroughly tested, so don't rely on this for anything + mission critical. + + :param func: a callable + :param args: positional arguments for the callable + :param limiter: capacity limiter to use to limit the total amount of subinterpreters + running (if omitted, the default limiter is used) + :return: the result of the call + :raises BrokenWorkerIntepreter: if there's an internal error in a subinterpreter + + """ + if sys.version_info <= (3, 13): + raise RuntimeError("subinterpreters require at least Python 3.13") + + if limiter is None: + limiter = current_default_interpreter_limiter() + + try: + idle_workers = _idle_workers.get() + except LookupError: + idle_workers = deque() + _idle_workers.set(idle_workers) + atexit.register(_stop_workers, idle_workers) + + async with limiter: + try: + worker = idle_workers.pop() + except IndexError: + worker = Worker() + + try: + return await worker.call(func, args, limiter) + finally: + # Prune workers that have been idle for too long + now = current_time() + while idle_workers: + if now - idle_workers[0].last_used <= MAX_WORKER_IDLE_TIME: + break + + await to_thread.run_sync(idle_workers.popleft().destroy, limiter=limiter) + + worker.last_used = current_time() + idle_workers.append(worker) + + +def current_default_interpreter_limiter() -> CapacityLimiter: + """ + Return the capacity limiter that is used by default to limit the number of + concurrently running subinterpreters. + + Defaults to the number of CPU cores. + + :return: a capacity limiter object + + """ + try: + return _default_interpreter_limiter.get() + except LookupError: + limiter = CapacityLimiter(os.cpu_count() or DEFAULT_CPU_COUNT) + _default_interpreter_limiter.set(limiter) + return limiter diff --git a/contrib/python/anyio/anyio/to_process.py b/contrib/python/anyio/anyio/to_process.py index 5050dee21e9..495de2ae711 100644 --- a/contrib/python/anyio/anyio/to_process.py +++ b/contrib/python/anyio/anyio/to_process.py @@ -35,7 +35,7 @@ _process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar( _default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter") -async def run_sync( +async def run_sync( # type: ignore[return] func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT], cancellable: bool = False, diff --git a/contrib/python/anyio/ya.make b/contrib/python/anyio/ya.make index 956fc4c8415..b676cfdd808 100644 --- a/contrib/python/anyio/ya.make +++ b/contrib/python/anyio/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(4.7.0) +VERSION(4.8.0) LICENSE(MIT) @@ -57,6 +57,7 @@ PY_SRCS( anyio/streams/stapled.py anyio/streams/text.py anyio/streams/tls.py + anyio/to_interpreter.py anyio/to_process.py anyio/to_thread.py ) |
