summaryrefslogtreecommitdiffstats
path: root/contrib/python/anyio
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-01-20 16:38:03 +0300
committerrobot-piglet <[email protected]>2025-01-20 16:53:12 +0300
commitb6a2451a8ed4ad6c210751fcf6ae3d28998235f1 (patch)
treef91f69dd400095b92826c42ccbdbe4a97aa3d536 /contrib/python/anyio
parent13b3cd284513a5d3d21e207821e73f1dd58988b7 (diff)
Intermediate changes
commit_hash:68dae89c5b8414a342b134a250a7365b7b96e259
Diffstat (limited to 'contrib/python/anyio')
-rw-r--r--contrib/python/anyio/.dist-info/METADATA5
-rw-r--r--contrib/python/anyio/anyio/__init__.py1
-rw-r--r--contrib/python/anyio/anyio/_backends/_asyncio.py120
-rw-r--r--contrib/python/anyio/anyio/_backends/_trio.py8
-rw-r--r--contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py17
-rw-r--r--contrib/python/anyio/anyio/_core/_exceptions.py37
-rw-r--r--contrib/python/anyio/anyio/_core/_fileio.py57
-rw-r--r--contrib/python/anyio/anyio/_core/_synchronization.py3
-rw-r--r--contrib/python/anyio/anyio/_core/_tasks.py2
-rw-r--r--contrib/python/anyio/anyio/to_interpreter.py218
-rw-r--r--contrib/python/anyio/anyio/to_process.py2
-rw-r--r--contrib/python/anyio/ya.make3
12 files changed, 378 insertions, 95 deletions
diff --git a/contrib/python/anyio/.dist-info/METADATA b/contrib/python/anyio/.dist-info/METADATA
index 5d0f7c91301..7b114cdbe89 100644
--- a/contrib/python/anyio/.dist-info/METADATA
+++ b/contrib/python/anyio/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: anyio
-Version: 4.7.0
+Version: 4.8.0
Summary: High level compatibility layer for multiple asynchronous event loop implementations
Author-email: Alex Grönholm <[email protected]>
License: MIT
@@ -36,10 +36,9 @@ Requires-Dist: exceptiongroup>=1.2.0; extra == "test"
Requires-Dist: hypothesis>=4.0; extra == "test"
Requires-Dist: psutil>=5.9; extra == "test"
Requires-Dist: pytest>=7.0; extra == "test"
-Requires-Dist: pytest-mock>=3.6.1; extra == "test"
Requires-Dist: trustme; extra == "test"
Requires-Dist: truststore>=0.9.1; python_version >= "3.10" and extra == "test"
-Requires-Dist: uvloop>=0.21; (platform_python_implementation == "CPython" and platform_system != "Windows") and extra == "test"
+Requires-Dist: uvloop>=0.21; (platform_python_implementation == "CPython" and platform_system != "Windows" and python_version < "3.14") and extra == "test"
Provides-Extra: doc
Requires-Dist: packaging; extra == "doc"
Requires-Dist: Sphinx~=7.4; extra == "doc"
diff --git a/contrib/python/anyio/anyio/__init__.py b/contrib/python/anyio/anyio/__init__.py
index 0738e595830..098312599f6 100644
--- a/contrib/python/anyio/anyio/__init__.py
+++ b/contrib/python/anyio/anyio/__init__.py
@@ -8,6 +8,7 @@ from ._core._eventloop import sleep as sleep
from ._core._eventloop import sleep_forever as sleep_forever
from ._core._eventloop import sleep_until as sleep_until
from ._core._exceptions import BrokenResourceError as BrokenResourceError
+from ._core._exceptions import BrokenWorkerIntepreter as BrokenWorkerIntepreter
from ._core._exceptions import BrokenWorkerProcess as BrokenWorkerProcess
from ._core._exceptions import BusyResourceError as BusyResourceError
from ._core._exceptions import ClosedResourceError as ClosedResourceError
diff --git a/contrib/python/anyio/anyio/_backends/_asyncio.py b/contrib/python/anyio/anyio/_backends/_asyncio.py
index 0b7479d2649..76a400c1cb8 100644
--- a/contrib/python/anyio/anyio/_backends/_asyncio.py
+++ b/contrib/python/anyio/anyio/_backends/_asyncio.py
@@ -28,8 +28,6 @@ from collections.abc import (
Collection,
Coroutine,
Iterable,
- Iterator,
- MutableMapping,
Sequence,
)
from concurrent.futures import Future
@@ -49,7 +47,7 @@ from queue import Queue
from signal import Signals
from socket import AddressFamily, SocketKind
from threading import Thread
-from types import TracebackType
+from types import CodeType, TracebackType
from typing import (
IO,
TYPE_CHECKING,
@@ -449,7 +447,7 @@ class CancelScope(BaseCancelScope):
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
- ) -> bool | None:
+ ) -> bool:
del exc_tb
if not self._active:
@@ -677,45 +675,7 @@ class TaskState:
self.cancel_scope = cancel_scope
-class TaskStateStore(MutableMapping["Awaitable[Any] | asyncio.Task", TaskState]):
- def __init__(self) -> None:
- self._task_states = WeakKeyDictionary[asyncio.Task, TaskState]()
- self._preliminary_task_states: dict[Awaitable[Any], TaskState] = {}
-
- def __getitem__(self, key: Awaitable[Any] | asyncio.Task, /) -> TaskState:
- assert isinstance(key, asyncio.Task)
- try:
- return self._task_states[key]
- except KeyError:
- if coro := key.get_coro():
- if state := self._preliminary_task_states.get(coro):
- return state
-
- raise KeyError(key)
-
- def __setitem__(
- self, key: asyncio.Task | Awaitable[Any], value: TaskState, /
- ) -> None:
- if isinstance(key, asyncio.Task):
- self._task_states[key] = value
- else:
- self._preliminary_task_states[key] = value
-
- def __delitem__(self, key: asyncio.Task | Awaitable[Any], /) -> None:
- if isinstance(key, asyncio.Task):
- del self._task_states[key]
- else:
- del self._preliminary_task_states[key]
-
- def __len__(self) -> int:
- return len(self._task_states) + len(self._preliminary_task_states)
-
- def __iter__(self) -> Iterator[Awaitable[Any] | asyncio.Task]:
- yield from self._task_states
- yield from self._preliminary_task_states
-
-
-_task_states = TaskStateStore()
+_task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
#
@@ -741,24 +701,10 @@ class _AsyncioTaskStatus(abc.TaskStatus):
_task_states[task].parent_id = self._parent_id
-async def _wait(tasks: Iterable[asyncio.Task[object]]) -> None:
- tasks = set(tasks)
- waiter = get_running_loop().create_future()
-
- def on_completion(task: asyncio.Task[object]) -> None:
- tasks.discard(task)
- if not tasks and not waiter.done():
- waiter.set_result(None)
-
- for task in tasks:
- task.add_done_callback(on_completion)
- del task
-
- try:
- await waiter
- finally:
- while tasks:
- tasks.pop().remove_done_callback(on_completion)
+if sys.version_info >= (3, 12):
+ _eager_task_factory_code: CodeType | None = asyncio.eager_task_factory.__code__
+else:
+ _eager_task_factory_code = None
class TaskGroup(abc.TaskGroup):
@@ -767,6 +713,7 @@ class TaskGroup(abc.TaskGroup):
self._active = False
self._exceptions: list[BaseException] = []
self._tasks: set[asyncio.Task] = set()
+ self._on_completed_fut: asyncio.Future[None] | None = None
async def __aenter__(self) -> TaskGroup:
self.cancel_scope.__enter__()
@@ -785,12 +732,15 @@ class TaskGroup(abc.TaskGroup):
if not isinstance(exc_val, CancelledError):
self._exceptions.append(exc_val)
+ loop = get_running_loop()
try:
if self._tasks:
with CancelScope() as wait_scope:
while self._tasks:
+ self._on_completed_fut = loop.create_future()
+
try:
- await _wait(self._tasks)
+ await self._on_completed_fut
except CancelledError as exc:
# Shield the scope against further cancellation attempts,
# as they're not productive (#695)
@@ -805,6 +755,8 @@ class TaskGroup(abc.TaskGroup):
and not is_anyio_cancellation(exc)
):
exc_val = exc
+
+ self._on_completed_fut = None
else:
# If there are no child tasks to wait on, run at least one checkpoint
# anyway
@@ -835,13 +787,19 @@ class TaskGroup(abc.TaskGroup):
task_status_future: asyncio.Future | None = None,
) -> asyncio.Task:
def task_done(_task: asyncio.Task) -> None:
- # task_state = _task_states[_task]
+ task_state = _task_states[_task]
assert task_state.cancel_scope is not None
assert _task in task_state.cancel_scope._tasks
task_state.cancel_scope._tasks.remove(_task)
self._tasks.remove(task)
del _task_states[_task]
+ if self._on_completed_fut is not None and not self._tasks:
+ try:
+ self._on_completed_fut.set_result(None)
+ except asyncio.InvalidStateError:
+ pass
+
try:
exc = _task.exception()
except CancelledError as e:
@@ -892,26 +850,25 @@ class TaskGroup(abc.TaskGroup):
f"the return value ({coro!r}) is not a coroutine object"
)
- # Make the spawned task inherit the task group's cancel scope
- _task_states[coro] = task_state = TaskState(
- parent_id=parent_id, cancel_scope=self.cancel_scope
- )
name = get_callable_name(func) if name is None else str(name)
- try:
+ loop = asyncio.get_running_loop()
+ if (
+ (factory := loop.get_task_factory())
+ and getattr(factory, "__code__", None) is _eager_task_factory_code
+ and (closure := getattr(factory, "__closure__", None))
+ ):
+ custom_task_constructor = closure[0].cell_contents
+ task = custom_task_constructor(coro, loop=loop, name=name)
+ else:
task = create_task(coro, name=name)
- finally:
- del _task_states[coro]
- _task_states[task] = task_state
+ # Make the spawned task inherit the task group's cancel scope
+ _task_states[task] = TaskState(
+ parent_id=parent_id, cancel_scope=self.cancel_scope
+ )
self.cancel_scope._tasks.add(task)
self._tasks.add(task)
-
- if task.done():
- # This can happen with eager task factories
- task_done(task)
- else:
- task.add_done_callback(task_done)
-
+ task.add_done_callback(task_done)
return task
def start_soon(
@@ -2114,10 +2071,9 @@ class _SignalReceiver:
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
- ) -> bool | None:
+ ) -> None:
for sig in self._handled_signals:
self._loop.remove_signal_handler(sig)
- return None
def __aiter__(self) -> _SignalReceiver:
return self
@@ -2446,7 +2402,7 @@ class AsyncIOBackend(AsyncBackend):
return CapacityLimiter(total_tokens)
@classmethod
- async def run_sync_in_worker_thread(
+ async def run_sync_in_worker_thread( # type: ignore[return]
cls,
func: Callable[[Unpack[PosArgsT]], T_Retval],
args: tuple[Unpack[PosArgsT]],
@@ -2468,7 +2424,7 @@ class AsyncIOBackend(AsyncBackend):
async with limiter or cls.current_default_thread_limiter():
with CancelScope(shield=not abandon_on_cancel) as scope:
- future: asyncio.Future = asyncio.Future()
+ future = asyncio.Future[T_Retval]()
root_task = find_root_task()
if not idle_workers:
worker = WorkerThread(root_task, workers, idle_workers)
diff --git a/contrib/python/anyio/anyio/_backends/_trio.py b/contrib/python/anyio/anyio/_backends/_trio.py
index 70a0a605781..32ae8ace7b4 100644
--- a/contrib/python/anyio/anyio/_backends/_trio.py
+++ b/contrib/python/anyio/anyio/_backends/_trio.py
@@ -132,8 +132,7 @@ class CancelScope(BaseCancelScope):
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
- ) -> bool | None:
- # https://github.com/python-trio/trio-typing/pull/79
+ ) -> bool:
return self.__original.__exit__(exc_type, exc_val, exc_tb)
def cancel(self) -> None:
@@ -186,9 +185,10 @@ class TaskGroup(abc.TaskGroup):
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
- ) -> bool | None:
+ ) -> bool:
try:
- return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb)
+ # trio.Nursery.__exit__ returns bool; .open_nursery has wrong type
+ return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb) # type: ignore[return-value]
except BaseExceptionGroup as exc:
if not exc.split(trio.Cancelled)[1]:
raise trio.Cancelled._create() from exc
diff --git a/contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py b/contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py
index d98c3040721..f4d18cf0429 100644
--- a/contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py
+++ b/contrib/python/anyio/anyio/_core/_asyncio_selector_thread.py
@@ -21,6 +21,23 @@ class Selector:
self._send, self._receive = socket.socketpair()
self._send.setblocking(False)
self._receive.setblocking(False)
+ # This somewhat reduces the amount of memory wasted queueing up data
+ # for wakeups. With these settings, maximum number of 1-byte sends
+ # before getting BlockingIOError:
+ # Linux 4.8: 6
+ # macOS (darwin 15.5): 1
+ # Windows 10: 525347
+ # Windows you're weird. (And on Windows setting SNDBUF to 0 makes send
+ # blocking, even on non-blocking sockets, so don't do that.)
+ self._receive.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1)
+ self._send.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
+ # On Windows this is a TCP socket so this might matter. On other
+ # platforms this fails b/c AF_UNIX sockets aren't actually TCP.
+ try:
+ self._send.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ except OSError:
+ pass
+
self._selector.register(self._receive, EVENT_READ)
self._closed = False
diff --git a/contrib/python/anyio/anyio/_core/_exceptions.py b/contrib/python/anyio/anyio/_core/_exceptions.py
index 97ea3130414..16b94482c06 100644
--- a/contrib/python/anyio/anyio/_core/_exceptions.py
+++ b/contrib/python/anyio/anyio/_core/_exceptions.py
@@ -2,6 +2,8 @@ from __future__ import annotations
import sys
from collections.abc import Generator
+from textwrap import dedent
+from typing import Any
if sys.version_info < (3, 11):
from exceptiongroup import BaseExceptionGroup
@@ -21,6 +23,41 @@ class BrokenWorkerProcess(Exception):
"""
+class BrokenWorkerIntepreter(Exception):
+ """
+ Raised by :meth:`~anyio.to_interpreter.run_sync` if an unexpected exception is
+ raised in the subinterpreter.
+ """
+
+ def __init__(self, excinfo: Any):
+ # This was adapted from concurrent.futures.interpreter.ExecutionFailed
+ msg = excinfo.formatted
+ if not msg:
+ if excinfo.type and excinfo.msg:
+ msg = f"{excinfo.type.__name__}: {excinfo.msg}"
+ else:
+ msg = excinfo.type.__name__ or excinfo.msg
+
+ super().__init__(msg)
+ self.excinfo = excinfo
+
+ def __str__(self) -> str:
+ try:
+ formatted = self.excinfo.errdisplay
+ except Exception:
+ return super().__str__()
+ else:
+ return dedent(
+ f"""
+ {super().__str__()}
+
+ Uncaught in the interpreter:
+
+ {formatted}
+ """.strip()
+ )
+
+
class BusyResourceError(Exception):
"""
Raised when two tasks are trying to read from or write to the same resource
diff --git a/contrib/python/anyio/anyio/_core/_fileio.py b/contrib/python/anyio/anyio/_core/_fileio.py
index ef2930e480e..4e34f2addc5 100644
--- a/contrib/python/anyio/anyio/_core/_fileio.py
+++ b/contrib/python/anyio/anyio/_core/_fileio.py
@@ -3,7 +3,13 @@ from __future__ import annotations
import os
import pathlib
import sys
-from collections.abc import AsyncIterator, Callable, Iterable, Iterator, Sequence
+from collections.abc import (
+ AsyncIterator,
+ Callable,
+ Iterable,
+ Iterator,
+ Sequence,
+)
from dataclasses import dataclass
from functools import partial
from os import PathLike
@@ -220,11 +226,15 @@ class Path:
Some methods may be unavailable or have limited functionality, based on the Python
version:
+ * :meth:`~pathlib.Path.copy` (available on Python 3.14 or later)
+ * :meth:`~pathlib.Path.copy_into` (available on Python 3.14 or later)
* :meth:`~pathlib.Path.from_uri` (available on Python 3.13 or later)
* :meth:`~pathlib.Path.full_match` (available on Python 3.13 or later)
* :meth:`~pathlib.Path.is_junction` (available on Python 3.12 or later)
* :meth:`~pathlib.Path.match` (the ``case_sensitive`` paramater is only available on
Python 3.13 or later)
+ * :meth:`~pathlib.Path.move` (available on Python 3.14 or later)
+ * :meth:`~pathlib.Path.move_into` (available on Python 3.14 or later)
* :meth:`~pathlib.Path.relative_to` (the ``walk_up`` parameter is only available on
Python 3.12 or later)
* :meth:`~pathlib.Path.walk` (available on Python 3.12 or later)
@@ -396,6 +406,51 @@ class Path:
def match(self, path_pattern: str) -> bool:
return self._path.match(path_pattern)
+ if sys.version_info >= (3, 14):
+
+ async def copy(
+ self,
+ target: str | os.PathLike[str],
+ *,
+ follow_symlinks: bool = True,
+ dirs_exist_ok: bool = False,
+ preserve_metadata: bool = False,
+ ) -> Path:
+ func = partial(
+ self._path.copy,
+ follow_symlinks=follow_symlinks,
+ dirs_exist_ok=dirs_exist_ok,
+ preserve_metadata=preserve_metadata,
+ )
+ return Path(await to_thread.run_sync(func, target))
+
+ async def copy_into(
+ self,
+ target_dir: str | os.PathLike[str],
+ *,
+ follow_symlinks: bool = True,
+ dirs_exist_ok: bool = False,
+ preserve_metadata: bool = False,
+ ) -> Path:
+ func = partial(
+ self._path.copy_into,
+ follow_symlinks=follow_symlinks,
+ dirs_exist_ok=dirs_exist_ok,
+ preserve_metadata=preserve_metadata,
+ )
+ return Path(await to_thread.run_sync(func, target_dir))
+
+ async def move(self, target: str | os.PathLike[str]) -> Path:
+ # Upstream does not handle anyio.Path properly as a PathLike
+ target = pathlib.Path(target)
+ return Path(await to_thread.run_sync(self._path.move, target))
+
+ async def move_into(
+ self,
+ target_dir: str | os.PathLike[str],
+ ) -> Path:
+ return Path(await to_thread.run_sync(self._path.move_into, target_dir))
+
def is_relative_to(self, other: str | PathLike[str]) -> bool:
try:
self.relative_to(other)
diff --git a/contrib/python/anyio/anyio/_core/_synchronization.py b/contrib/python/anyio/anyio/_core/_synchronization.py
index 7878ba66682..a6331328d4f 100644
--- a/contrib/python/anyio/anyio/_core/_synchronization.py
+++ b/contrib/python/anyio/anyio/_core/_synchronization.py
@@ -728,6 +728,5 @@ class ResourceGuard:
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
- ) -> bool | None:
+ ) -> None:
self._guarded = False
- return None
diff --git a/contrib/python/anyio/anyio/_core/_tasks.py b/contrib/python/anyio/anyio/_core/_tasks.py
index 2f21ea20b13..fe49015102b 100644
--- a/contrib/python/anyio/anyio/_core/_tasks.py
+++ b/contrib/python/anyio/anyio/_core/_tasks.py
@@ -88,7 +88,7 @@ class CancelScope:
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
- ) -> bool | None:
+ ) -> bool:
raise NotImplementedError
diff --git a/contrib/python/anyio/anyio/to_interpreter.py b/contrib/python/anyio/anyio/to_interpreter.py
new file mode 100644
index 00000000000..bcde24d3d1d
--- /dev/null
+++ b/contrib/python/anyio/anyio/to_interpreter.py
@@ -0,0 +1,218 @@
+from __future__ import annotations
+
+import atexit
+import os
+import pickle
+import sys
+from collections import deque
+from collections.abc import Callable
+from textwrap import dedent
+from typing import Any, Final, TypeVar
+
+from . import current_time, to_thread
+from ._core._exceptions import BrokenWorkerIntepreter
+from ._core._synchronization import CapacityLimiter
+from .lowlevel import RunVar
+
+if sys.version_info >= (3, 11):
+ from typing import TypeVarTuple, Unpack
+else:
+ from typing_extensions import TypeVarTuple, Unpack
+
+UNBOUND: Final = 2 # I have no clue how this works, but it was used in the stdlib
+FMT_UNPICKLED: Final = 0
+FMT_PICKLED: Final = 1
+DEFAULT_CPU_COUNT: Final = 8 # this is just an arbitrarily selected value
+MAX_WORKER_IDLE_TIME = (
+ 30 # seconds a subinterpreter can be idle before becoming eligible for pruning
+)
+
+T_Retval = TypeVar("T_Retval")
+PosArgsT = TypeVarTuple("PosArgsT")
+
+_idle_workers = RunVar[deque["Worker"]]("_available_workers")
+_default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter")
+
+
+class Worker:
+ _run_func = compile(
+ dedent("""
+ import _interpqueues as queues
+ import _interpreters as interpreters
+ from pickle import loads, dumps, HIGHEST_PROTOCOL
+
+ item = queues.get(queue_id)[0]
+ try:
+ func, args = loads(item)
+ retval = func(*args)
+ except BaseException as exc:
+ is_exception = True
+ retval = exc
+ else:
+ is_exception = False
+
+ try:
+ queues.put(queue_id, (retval, is_exception), FMT_UNPICKLED, UNBOUND)
+ except interpreters.NotShareableError:
+ retval = dumps(retval, HIGHEST_PROTOCOL)
+ queues.put(queue_id, (retval, is_exception), FMT_PICKLED, UNBOUND)
+ """),
+ "<string>",
+ "exec",
+ )
+
+ last_used: float = 0
+
+ _initialized: bool = False
+ _interpreter_id: int
+ _queue_id: int
+
+ def initialize(self) -> None:
+ import _interpqueues as queues
+ import _interpreters as interpreters
+
+ self._interpreter_id = interpreters.create()
+ self._queue_id = queues.create(2, FMT_UNPICKLED, UNBOUND) # type: ignore[call-arg]
+ self._initialized = True
+ interpreters.set___main___attrs(
+ self._interpreter_id,
+ {
+ "queue_id": self._queue_id,
+ "FMT_PICKLED": FMT_PICKLED,
+ "FMT_UNPICKLED": FMT_UNPICKLED,
+ "UNBOUND": UNBOUND,
+ },
+ )
+
+ def destroy(self) -> None:
+ import _interpqueues as queues
+ import _interpreters as interpreters
+
+ if self._initialized:
+ interpreters.destroy(self._interpreter_id)
+ queues.destroy(self._queue_id)
+
+ def _call(
+ self,
+ func: Callable[..., T_Retval],
+ args: tuple[Any],
+ ) -> tuple[Any, bool]:
+ import _interpqueues as queues
+ import _interpreters as interpreters
+
+ if not self._initialized:
+ self.initialize()
+
+ payload = pickle.dumps((func, args), pickle.HIGHEST_PROTOCOL)
+ queues.put(self._queue_id, payload, FMT_PICKLED, UNBOUND) # type: ignore[call-arg]
+
+ res: Any
+ is_exception: bool
+ if exc_info := interpreters.exec(self._interpreter_id, self._run_func): # type: ignore[func-returns-value,arg-type]
+ raise BrokenWorkerIntepreter(exc_info)
+
+ (res, is_exception), fmt = queues.get(self._queue_id)[:2]
+ if fmt == FMT_PICKLED:
+ res = pickle.loads(res)
+
+ return res, is_exception
+
+ async def call(
+ self,
+ func: Callable[..., T_Retval],
+ args: tuple[Any],
+ limiter: CapacityLimiter,
+ ) -> T_Retval:
+ result, is_exception = await to_thread.run_sync(
+ self._call,
+ func,
+ args,
+ limiter=limiter,
+ )
+ if is_exception:
+ raise result
+
+ return result
+
+
+def _stop_workers(workers: deque[Worker]) -> None:
+ for worker in workers:
+ worker.destroy()
+
+ workers.clear()
+
+
+async def run_sync(
+ func: Callable[[Unpack[PosArgsT]], T_Retval],
+ *args: Unpack[PosArgsT],
+ limiter: CapacityLimiter | None = None,
+) -> T_Retval:
+ """
+ Call the given function with the given arguments in a subinterpreter.
+
+ If the ``cancellable`` option is enabled and the task waiting for its completion is
+ cancelled, the call will still run its course but its return value (or any raised
+ exception) will be ignored.
+
+ .. warning:: This feature is **experimental**. The upstream interpreter API has not
+ yet been finalized or thoroughly tested, so don't rely on this for anything
+ mission critical.
+
+ :param func: a callable
+ :param args: positional arguments for the callable
+ :param limiter: capacity limiter to use to limit the total amount of subinterpreters
+ running (if omitted, the default limiter is used)
+ :return: the result of the call
+ :raises BrokenWorkerIntepreter: if there's an internal error in a subinterpreter
+
+ """
+ if sys.version_info <= (3, 13):
+ raise RuntimeError("subinterpreters require at least Python 3.13")
+
+ if limiter is None:
+ limiter = current_default_interpreter_limiter()
+
+ try:
+ idle_workers = _idle_workers.get()
+ except LookupError:
+ idle_workers = deque()
+ _idle_workers.set(idle_workers)
+ atexit.register(_stop_workers, idle_workers)
+
+ async with limiter:
+ try:
+ worker = idle_workers.pop()
+ except IndexError:
+ worker = Worker()
+
+ try:
+ return await worker.call(func, args, limiter)
+ finally:
+ # Prune workers that have been idle for too long
+ now = current_time()
+ while idle_workers:
+ if now - idle_workers[0].last_used <= MAX_WORKER_IDLE_TIME:
+ break
+
+ await to_thread.run_sync(idle_workers.popleft().destroy, limiter=limiter)
+
+ worker.last_used = current_time()
+ idle_workers.append(worker)
+
+
+def current_default_interpreter_limiter() -> CapacityLimiter:
+ """
+ Return the capacity limiter that is used by default to limit the number of
+ concurrently running subinterpreters.
+
+ Defaults to the number of CPU cores.
+
+ :return: a capacity limiter object
+
+ """
+ try:
+ return _default_interpreter_limiter.get()
+ except LookupError:
+ limiter = CapacityLimiter(os.cpu_count() or DEFAULT_CPU_COUNT)
+ _default_interpreter_limiter.set(limiter)
+ return limiter
diff --git a/contrib/python/anyio/anyio/to_process.py b/contrib/python/anyio/anyio/to_process.py
index 5050dee21e9..495de2ae711 100644
--- a/contrib/python/anyio/anyio/to_process.py
+++ b/contrib/python/anyio/anyio/to_process.py
@@ -35,7 +35,7 @@ _process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar(
_default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter")
-async def run_sync(
+async def run_sync( # type: ignore[return]
func: Callable[[Unpack[PosArgsT]], T_Retval],
*args: Unpack[PosArgsT],
cancellable: bool = False,
diff --git a/contrib/python/anyio/ya.make b/contrib/python/anyio/ya.make
index 956fc4c8415..b676cfdd808 100644
--- a/contrib/python/anyio/ya.make
+++ b/contrib/python/anyio/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(4.7.0)
+VERSION(4.8.0)
LICENSE(MIT)
@@ -57,6 +57,7 @@ PY_SRCS(
anyio/streams/stapled.py
anyio/streams/text.py
anyio/streams/tls.py
+ anyio/to_interpreter.py
anyio/to_process.py
anyio/to_thread.py
)