summaryrefslogtreecommitdiffstats
path: root/contrib/python/grpcio/py3/grpc/_cython
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-08-29 14:19:24 +0300
committerrobot-piglet <[email protected]>2025-08-29 14:40:38 +0300
commit5715939b5b1a1812ed85171fb519f9c1c3c326e8 (patch)
tree5d981253427e490749bbb50d3616507fa0d6d1bc /contrib/python/grpcio/py3/grpc/_cython
parentc390a008ee5d15e1d8f49326671908f375e0851b (diff)
Intermediate changes
commit_hash:88dd3a7e237f5ebeb9b302a0e6866042635fda83
Diffstat (limited to 'contrib/python/grpcio/py3/grpc/_cython')
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi6
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi33
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi2
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi2
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi12
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi4
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi65
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi12
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi116
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi17
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi14
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi11
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi30
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/observability.pyx.pxi61
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi6
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx9
16 files changed, 311 insertions, 89 deletions
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi
index de4d71b8196..c2b37594f95 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi
@@ -17,6 +17,9 @@ cdef object _custom_op_on_c_call(int op, grpc_call *call):
raise NotImplementedError("No custom hooks are implemented")
def install_context_from_request_call_event(RequestCallEvent event):
+ maybe_save_server_trace_context(event)
+
+def install_context_from_request_call_event_aio(GrpcCallWrapper event):
pass
def uninstall_context():
@@ -31,5 +34,8 @@ cdef class CensusContext:
def set_census_context_on_call(_CallState call_state, CensusContext census_ctx):
pass
+def set_instrumentation_context_on_call_aio(GrpcCallWrapper call_state, CensusContext census_ctx):
+ pass
+
def get_deadline_from_context():
return None
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi
index 7bce1850dc8..00c0a29c2ab 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi
@@ -285,14 +285,24 @@ cdef class _AioCall(GrpcCallWrapper):
return False
+ def set_internal_error(self, str error_str):
+ self._set_status(AioRpcStatus(
+ StatusCode.internal,
+ 'Internal error from Core',
+ (),
+ error_str,
+ ))
+
async def unary_unary(self,
bytes request,
- tuple outbound_initial_metadata):
+ tuple outbound_initial_metadata,
+ object context = None):
"""Performs a unary unary RPC.
Args:
request: the serialized requests in bytes.
outbound_initial_metadata: optional outbound metadata.
+ context: instrumentation context.
"""
cdef tuple ops
@@ -305,6 +315,8 @@ cdef class _AioCall(GrpcCallWrapper):
cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
+ if context is not None:
+ set_instrumentation_context_on_call_aio(self, context)
ops = (initial_metadata_op, send_message_op, send_close_op,
receive_initial_metadata_op, receive_message_op,
receive_status_on_client_op)
@@ -382,7 +394,8 @@ cdef class _AioCall(GrpcCallWrapper):
async def initiate_unary_stream(self,
bytes request,
- tuple outbound_initial_metadata):
+ tuple outbound_initial_metadata,
+ object context = None):
"""Implementation of the start of a unary-stream call."""
# Peer may prematurely end this RPC at any point. We need a corutine
# that watches if the server sends the final status.
@@ -398,6 +411,8 @@ cdef class _AioCall(GrpcCallWrapper):
cdef Operation send_close_op = SendCloseFromClientOperation(
_EMPTY_FLAGS)
+ if context is not None:
+ set_instrumentation_context_on_call_aio(self, context)
outbound_ops = (
initial_metadata_op,
send_message_op,
@@ -421,7 +436,8 @@ cdef class _AioCall(GrpcCallWrapper):
async def stream_unary(self,
tuple outbound_initial_metadata,
- object metadata_sent_observer):
+ object metadata_sent_observer,
+ object context = None):
"""Actual implementation of the complete unary-stream call.
Needs to pay extra attention to the raise mechanism. If we want to
@@ -452,6 +468,9 @@ cdef class _AioCall(GrpcCallWrapper):
cdef tuple inbound_ops
cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
+
+ if context is not None:
+ set_instrumentation_context_on_call_aio(self, context)
inbound_ops = (receive_message_op, receive_status_on_client_op)
# Executes all operations in one batch.
@@ -476,7 +495,8 @@ cdef class _AioCall(GrpcCallWrapper):
async def initiate_stream_stream(self,
tuple outbound_initial_metadata,
- object metadata_sent_observer):
+ object metadata_sent_observer,
+ object context = None):
"""Actual implementation of the complete stream-stream call.
Needs to pay extra attention to the raise mechanism. If we want to
@@ -487,6 +507,9 @@ cdef class _AioCall(GrpcCallWrapper):
# that watches if the server sends the final status.
status_task = self._loop.create_task(self._handle_status_once_received())
+ if context is not None:
+ set_instrumentation_context_on_call_aio(self, context)
+
try:
# Sends out initial_metadata ASAP.
await _send_initial_metadata(self,
@@ -505,4 +528,4 @@ cdef class _AioCall(GrpcCallWrapper):
await status_task
# Allow upper layer to proceed only if the status is set
- metadata_sent_observer()
+ metadata_sent_observer() \ No newline at end of file
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
index e54e5107547..26edbdb917b 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
@@ -48,7 +48,7 @@ cdef class CallbackWrapper:
@staticmethod
cdef void functor_run(
grpc_completion_queue_functor* functor,
- int succeed)
+ int succeed) noexcept
cdef grpc_completion_queue_functor *c_functor(self)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
index 14a0098fc20..2b0df0e5ce7 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
@@ -50,7 +50,7 @@ cdef class CallbackWrapper:
@staticmethod
cdef void functor_run(
grpc_completion_queue_functor* functor,
- int success):
+ int success) noexcept:
cdef CallbackContext *context = <CallbackContext *>functor
cdef object waiter = <object>context.waiter
if not waiter.cancelled():
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi
index f698390cd5c..0e3e8de00bf 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import warnings
+
from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION
TYPE_METADATA_STRING = "Tuple[Tuple[str, Union[str, bytes]]...]"
@@ -181,7 +183,15 @@ if PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7:
try:
return asyncio.get_running_loop()
except RuntimeError:
- return asyncio.get_event_loop_policy().get_event_loop()
+ with warnings.catch_warnings():
+ # Convert DeprecationWarning to errors so we can capture them with except
+ warnings.simplefilter("error", DeprecationWarning)
+ try:
+ return asyncio.get_event_loop_policy().get_event_loop()
+ # Since version 3.12, DeprecationWarning is emitted if there is no
+ # current event loop.
+ except DeprecationWarning:
+ return asyncio.get_event_loop_policy().new_event_loop()
else:
def get_working_loop():
"""Returns a running event loop."""
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi
index fe10c3883c3..15050e6b21e 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi
@@ -29,6 +29,7 @@ cdef class RPCState(GrpcCallWrapper):
cdef bint metadata_sent
cdef bint status_sent
cdef grpc_status_code status_code
+ cdef object py_status_code
cdef str status_details
cdef tuple trailing_metadata
cdef object compression_algorithm
@@ -71,8 +72,7 @@ cdef enum AioServerStatus:
cdef class _ConcurrentRpcLimiter:
cdef int _maximum_concurrent_rpcs
cdef int _active_rpcs
- cdef object _active_rpcs_condition # asyncio.Condition
- cdef object _loop # asyncio.EventLoop
+ cdef bint limiter_concurrency_exceeded
cdef class AioServer:
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi
index e85efdd0b9f..d166bd9fabf 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi
@@ -55,6 +55,7 @@ cdef class RPCState:
self.metadata_sent = False
self.status_sent = False
self.status_code = StatusCode.ok
+ self.py_status_code = None
self.status_details = ''
self.trailing_metadata = _IMMUTABLE_EMPTY_METADATA
self.compression_algorithm = None
@@ -184,6 +185,7 @@ cdef class _ServicerContext:
self._rpc_state.status_details = details
actual_code = get_status_code(code)
+ self._rpc_state.py_status_code = code
self._rpc_state.status_code = actual_code
self._rpc_state.status_sent = True
@@ -213,9 +215,10 @@ cdef class _ServicerContext:
def set_code(self, object code):
self._rpc_state.status_code = get_status_code(code)
+ self._rpc_state.py_status_code = code
def code(self):
- return self._rpc_state.status_code
+ return self._rpc_state.py_status_code
def set_details(self, str details):
self._rpc_state.status_details = details
@@ -398,6 +401,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state,
# Executes application logic
cdef object response_message
cdef _SyncServicerContext sync_servicer_context
+ install_context_from_request_call_event_aio(rpc_state)
if _is_async_handler(unary_handler):
# Run async method handlers in this coroutine
@@ -450,6 +454,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state,
rpc_state.metadata_sent = True
rpc_state.status_sent = True
await execute_batch(rpc_state, finish_ops, loop)
+ uninstall_context()
async def _finish_handler_with_stream_responses(RPCState rpc_state,
@@ -465,6 +470,7 @@ async def _finish_handler_with_stream_responses(RPCState rpc_state,
"""
cdef object async_response_generator
cdef object response_message
+ install_context_from_request_call_event_aio(rpc_state)
if inspect.iscoroutinefunction(stream_handler):
# Case 1: Coroutine async handler - using reader-writer API
@@ -518,6 +524,7 @@ async def _finish_handler_with_stream_responses(RPCState rpc_state,
rpc_state.metadata_sent = True
rpc_state.status_sent = True
await execute_batch(rpc_state, finish_ops, loop)
+ uninstall_context()
async def _handle_unary_unary_rpc(object method_handler,
@@ -702,7 +709,9 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop):
if rpc_state.client_closed:
return
else:
- raise
+ _LOGGER.exception('ExecuteBatchError raised in core by servicer method [%s]' % (
+ _decode(rpc_state.method())))
+ return
except Exception as e:
_LOGGER.exception('Unexpected [%s] raised by servicer method [%s]' % (
type(e).__name__,
@@ -774,7 +783,7 @@ async def _schedule_rpc_coro(object rpc_coro,
async def _handle_rpc(list generic_handlers, tuple interceptors,
- RPCState rpc_state, object loop):
+ RPCState rpc_state, object loop, bint concurrency_exceeded):
cdef object method_handler
# Finds the method handler (application logic)
method_handler = await _find_method_handler(
@@ -795,6 +804,18 @@ async def _handle_rpc(list generic_handlers, tuple interceptors,
)
return
+ if concurrency_exceeded:
+ rpc_state.status_sent = True
+ await _send_error_status_from_server(
+ rpc_state,
+ StatusCode.resource_exhausted,
+ 'Concurrent RPC limit exceeded!',
+ _IMMUTABLE_EMPTY_METADATA,
+ rpc_state.create_send_initial_metadata_op_if_not_sent(),
+ loop
+ )
+ return
+
# Handles unary-unary case
if not method_handler.request_streaming and not method_handler.response_streaming:
await _handle_unary_unary_rpc(method_handler,
@@ -838,33 +859,23 @@ cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHan
cdef class _ConcurrentRpcLimiter:
- def __cinit__(self, int maximum_concurrent_rpcs, object loop):
+ def __cinit__(self, int maximum_concurrent_rpcs):
if maximum_concurrent_rpcs <= 0:
raise ValueError("maximum_concurrent_rpcs should be a postive integer")
self._maximum_concurrent_rpcs = maximum_concurrent_rpcs
self._active_rpcs = 0
- self._active_rpcs_condition = asyncio.Condition()
- self._loop = loop
+ self.limiter_concurrency_exceeded = False
- async def check_before_request_call(self):
- await self._active_rpcs_condition.acquire()
- try:
- predicate = lambda: self._active_rpcs < self._maximum_concurrent_rpcs
- await self._active_rpcs_condition.wait_for(predicate)
+ def check_before_request_call(self):
+ if self._active_rpcs >= self._maximum_concurrent_rpcs:
+ self.limiter_concurrency_exceeded = True
+ else:
self._active_rpcs += 1
- finally:
- self._active_rpcs_condition.release()
-
- async def _decrease_active_rpcs_count_with_lock(self):
- await self._active_rpcs_condition.acquire()
- try:
- self._active_rpcs -= 1
- self._active_rpcs_condition.notify()
- finally:
- self._active_rpcs_condition.release()
def _decrease_active_rpcs_count(self, unused_future):
- self._loop.create_task(self._decrease_active_rpcs_count_with_lock())
+ self._active_rpcs -= 1
+ if self._active_rpcs < self._maximum_concurrent_rpcs:
+ self.limiter_concurrency_exceeded = False
def decrease_once_finished(self, object rpc_task):
rpc_task.add_done_callback(self._decrease_active_rpcs_count)
@@ -906,8 +917,7 @@ cdef class AioServer:
self._thread_pool = thread_pool
if maximum_concurrent_rpcs is not None:
- self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs,
- loop)
+ self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs)
def add_generic_rpc_handlers(self, object generic_rpc_handlers):
self._generic_handlers.extend(generic_rpc_handlers)
@@ -950,8 +960,10 @@ cdef class AioServer:
if self._status != AIO_SERVER_STATUS_RUNNING:
break
+ concurrency_exceeded = False
if self._limiter is not None:
- await self._limiter.check_before_request_call()
+ self._limiter.check_before_request_call()
+ concurrency_exceeded = self._limiter.limiter_concurrency_exceeded
# Accepts new request from Core
rpc_state = await self._request_call()
@@ -964,7 +976,8 @@ cdef class AioServer:
rpc_coro = _handle_rpc(self._generic_handlers,
self._interceptors,
rpc_state,
- self._loop)
+ self._loop,
+ concurrency_exceeded)
# Fires off a task that listens on the cancellation from client.
rpc_task = self._loop.create_task(
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi
index eb27f2df7ad..96d03e181b9 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi
@@ -26,10 +26,15 @@ cdef class _CallState:
cdef grpc_call *c_call
cdef set due
+ # call_tracer_capsule should have type of grpc._observability.ClientCallTracerCapsule
+ cdef object call_tracer_capsule
+ cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *
+ cdef void maybe_delete_call_tracer(self) except *
cdef class _ChannelState:
+ cdef bytes target
cdef object condition
cdef grpc_channel *c_channel
# A boolean field indicating that the channel is open (if True) or is being
@@ -69,6 +74,13 @@ cdef class SegregatedCall:
cdef class Channel:
cdef _ChannelState _state
+ cdef dict _registered_call_handles
# TODO(https://github.com/grpc/grpc/issues/15662): Eliminate this.
cdef tuple _arguments
+
+
+cdef class CallHandle:
+
+ cdef void *c_call_handle
+ cdef object method
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi
index d49a4210f7c..dde3b166789 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from grpc import _observability
_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
'Internal gRPC call error %d. ' +
@@ -71,10 +72,28 @@ cdef class _CallState:
def __cinit__(self):
self.due = set()
+ cdef void maybe_delete_call_tracer(self) except *:
+ if not self.call_tracer_capsule:
+ return
+ _observability.delete_call_tracer(self.call_tracer_capsule)
+
+ cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *:
+ # TODO(xuanwn): use channel args to exclude those metrics.
+ for exclude_prefix in _observability._SERVICES_TO_EXCLUDE:
+ if exclude_prefix in method_name:
+ return
+ with _observability.get_plugin() as plugin:
+ if not (plugin and plugin.observability_enabled):
+ return
+ capsule = plugin.create_client_call_tracer(method_name, target)
+ capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER)
+ _set_call_tracer(self.c_call, capsule_ptr)
+ self.call_tracer_capsule = capsule
cdef class _ChannelState:
- def __cinit__(self):
+ def __cinit__(self, target):
+ self.target = target
self.condition = threading.Condition()
self.open = True
self.integrated_call_states = {}
@@ -82,6 +101,25 @@ cdef class _ChannelState:
self.connectivity_due = set()
self.closed_reason = None
+cdef class CallHandle:
+
+ def __cinit__(self, _ChannelState channel_state, object method):
+ self.method = method
+ cpython.Py_INCREF(method)
+ # Note that since we always pass None for host, we set the
+ # second-to-last parameter of grpc_channel_register_call to a fixed
+ # NULL value.
+ self.c_call_handle = grpc_channel_register_call(
+ channel_state.c_channel, <const char *>method, NULL, NULL)
+
+ def __dealloc__(self):
+ cpython.Py_DECREF(self.method)
+
+ @property
+ def call_handle(self):
+ return cpython.PyLong_FromVoidPtr(self.c_call_handle)
+
+
cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
cdef grpc_call_error c_call_error
@@ -180,7 +218,7 @@ cdef void _call(
grpc_completion_queue *c_completion_queue, on_success, int flags, method,
host, object deadline, CallCredentials credentials,
object operationses_and_user_tags, object metadata,
- object context) except *:
+ object context, object registered_call_handle) except *:
"""Invokes an RPC.
Args:
@@ -207,6 +245,8 @@ cdef void _call(
must be present in the first element of this value.
metadata: The metadata for this call.
context: Context object for distributed tracing.
+ registered_call_handle: An int representing the call handle of the method, or
+ None if the method is not registered.
"""
cdef grpc_slice method_slice
cdef grpc_slice host_slice
@@ -223,13 +263,20 @@ cdef void _call(
else:
host_slice = _slice_from_bytes(host)
host_slice_ptr = &host_slice
- call_state.c_call = grpc_channel_create_call(
- channel_state.c_channel, NULL, flags,
- c_completion_queue, method_slice, host_slice_ptr,
- _timespec_from_time(deadline), NULL)
+ if registered_call_handle:
+ call_state.c_call = grpc_channel_create_registered_call(
+ channel_state.c_channel, NULL, flags,
+ c_completion_queue, cpython.PyLong_AsVoidPtr(registered_call_handle),
+ _timespec_from_time(deadline), NULL)
+ else:
+ call_state.c_call = grpc_channel_create_call(
+ channel_state.c_channel, NULL, flags,
+ c_completion_queue, method_slice, host_slice_ptr,
+ _timespec_from_time(deadline), NULL)
grpc_slice_unref(method_slice)
if host_slice_ptr:
grpc_slice_unref(host_slice)
+ call_state.maybe_set_client_call_tracer_on_call(method, channel_state.target)
if context is not None:
set_census_context_on_call(call_state, context)
if credentials is not None:
@@ -238,7 +285,9 @@ cdef void _call(
call_state.c_call, c_call_credentials)
grpc_call_credentials_release(c_call_credentials)
if c_call_error != GRPC_CALL_OK:
- grpc_call_unref(call_state.c_call)
+ #TODO(xuanwn): Expand the scope of nogil
+ with nogil:
+ grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
_raise_call_error_no_metadata(c_call_error)
started_tags = set()
@@ -248,7 +297,9 @@ cdef void _call(
started_tags.add(tag)
else:
grpc_call_cancel(call_state.c_call, NULL)
- grpc_call_unref(call_state.c_call)
+ #TODO(xuanwn): Expand the scope of nogil
+ with nogil:
+ grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
_raise_call_error(c_call_error, metadata)
else:
@@ -263,9 +314,10 @@ cdef void _process_integrated_call_tag(
cdef _CallState call_state = state.integrated_call_states.pop(tag)
call_state.due.remove(tag)
if not call_state.due:
- grpc_call_unref(call_state.c_call)
+ with nogil:
+ grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
-
+ call_state.maybe_delete_call_tracer()
cdef class IntegratedCall:
@@ -284,7 +336,7 @@ cdef class IntegratedCall:
cdef IntegratedCall _integrated_call(
_ChannelState state, int flags, method, host, object deadline,
object metadata, CallCredentials credentials, operationses_and_user_tags,
- object context):
+ object context, object registered_call_handle):
call_state = _CallState()
def on_success(started_tags):
@@ -293,7 +345,8 @@ cdef IntegratedCall _integrated_call(
_call(
state, call_state, state.c_call_completion_queue, on_success, flags,
- method, host, deadline, credentials, operationses_and_user_tags, metadata, context)
+ method, host, deadline, credentials, operationses_and_user_tags,
+ metadata, context, registered_call_handle)
return IntegratedCall(state, call_state)
@@ -303,8 +356,11 @@ cdef object _process_segregated_call_tag(
grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
call_state.due.remove(tag)
if not call_state.due:
- grpc_call_unref(call_state.c_call)
+ #TODO(xuanwn): Expand the scope of nogil
+ with nogil:
+ grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
+ call_state.maybe_delete_call_tracer()
state.segregated_call_states.remove(call_state)
_destroy_c_completion_queue(c_completion_queue)
return True
@@ -331,7 +387,8 @@ cdef class SegregatedCall:
self._channel_state, self._call_state, self._c_completion_queue, tag)
def on_failure():
self._call_state.due.clear()
- grpc_call_unref(self._call_state.c_call)
+ with nogil:
+ grpc_call_unref(self._call_state.c_call)
self._call_state.c_call = NULL
self._channel_state.segregated_call_states.remove(self._call_state)
_destroy_c_completion_queue(self._c_completion_queue)
@@ -342,7 +399,7 @@ cdef class SegregatedCall:
cdef SegregatedCall _segregated_call(
_ChannelState state, int flags, method, host, object deadline,
object metadata, CallCredentials credentials, operationses_and_user_tags,
- object context):
+ object context, object registered_call_handle):
cdef _CallState call_state = _CallState()
cdef SegregatedCall segregated_call
cdef grpc_completion_queue *c_completion_queue
@@ -360,7 +417,7 @@ cdef SegregatedCall _segregated_call(
_call(
state, call_state, c_completion_queue, on_success, flags, method, host,
deadline, credentials, operationses_and_user_tags, metadata,
- context)
+ context, registered_call_handle)
except:
_destroy_c_completion_queue(c_completion_queue)
raise
@@ -445,7 +502,7 @@ cdef class Channel:
ChannelCredentials channel_credentials):
arguments = () if arguments is None else tuple(arguments)
fork_handlers_and_grpc_init()
- self._state = _ChannelState()
+ self._state = _ChannelState(target)
self._state.c_call_completion_queue = (
grpc_completion_queue_create_for_next(NULL))
self._state.c_connectivity_completion_queue = (
@@ -457,6 +514,7 @@ cdef class Channel:
else grpc_insecure_credentials_create())
self._state.c_channel = grpc_channel_create(
<char *>target, c_channel_credentials, channel_args.c_args())
+ self._registered_call_handles = {}
grpc_channel_credentials_release(c_channel_credentials)
def target(self):
@@ -470,10 +528,10 @@ cdef class Channel:
def integrated_call(
self, int flags, method, host, object deadline, object metadata,
CallCredentials credentials, operationses_and_tags,
- object context = None):
+ object context = None, object registered_call_handle = None):
return _integrated_call(
self._state, flags, method, host, deadline, metadata, credentials,
- operationses_and_tags, context)
+ operationses_and_tags, context, registered_call_handle)
def next_call_event(self):
def on_success(tag):
@@ -492,10 +550,10 @@ cdef class Channel:
def segregated_call(
self, int flags, method, host, object deadline, object metadata,
CallCredentials credentials, operationses_and_tags,
- object context = None):
+ object context = None, object registered_call_handle = None):
return _segregated_call(
self._state, flags, method, host, deadline, metadata, credentials,
- operationses_and_tags, context)
+ operationses_and_tags, context, registered_call_handle)
def check_connectivity_state(self, bint try_to_connect):
with self._state.condition:
@@ -514,3 +572,19 @@ cdef class Channel:
def close_on_fork(self, code, details):
_close(self, code, details, True)
+
+ def get_registered_call_handle(self, method):
+ """
+ Get or registers a call handler for a method.
+
+ This method is not thread-safe.
+
+ Args:
+ method: Required, the method name for the RPC.
+
+ Returns:
+ The registered call handle pointer in the form of a Python Long.
+ """
+ if method not in self._registered_call_handles.keys():
+ self._registered_call_handles[method] = CallHandle(self._state, method)
+ return self._registered_call_handles[method].call_handle
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi
index 27b56aa378b..181704cb85a 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -153,8 +153,9 @@ cdef class SSLChannelCredentials(ChannelCredentials):
else:
c_pem_root_certificates = self._pem_root_certificates
if self._private_key is None and self._certificate_chain is None:
- return grpc_ssl_credentials_create(
- c_pem_root_certificates, NULL, NULL, NULL)
+ with nogil:
+ return grpc_ssl_credentials_create(
+ c_pem_root_certificates, NULL, NULL, NULL)
else:
if self._private_key:
c_pem_key_certificate_pair.private_key = self._private_key
@@ -164,8 +165,9 @@ cdef class SSLChannelCredentials(ChannelCredentials):
c_pem_key_certificate_pair.certificate_chain = self._certificate_chain
else:
c_pem_key_certificate_pair.certificate_chain = NULL
- return grpc_ssl_credentials_create(
- c_pem_root_certificates, &c_pem_key_certificate_pair, NULL, NULL)
+ with nogil:
+ return grpc_ssl_credentials_create(
+ c_pem_root_certificates, &c_pem_key_certificate_pair, NULL, NULL)
cdef class CompositeChannelCredentials(ChannelCredentials):
@@ -314,7 +316,7 @@ def server_credentials_ssl_dynamic_cert_config(initial_cert_config,
return credentials
cdef grpc_ssl_certificate_config_reload_status _server_cert_config_fetcher_wrapper(
- void* user_data, grpc_ssl_server_certificate_config **config) with gil:
+ void* user_data, grpc_ssl_server_certificate_config **config) noexcept with gil:
# This is a credentials.ServerCertificateConfig
cdef ServerCertificateConfig cert_config = None
if not user_data:
@@ -435,8 +437,9 @@ cdef class ComputeEngineChannelCredentials(ChannelCredentials):
raise ValueError("Call credentials may not be NULL.")
cdef grpc_channel_credentials *c(self) except *:
- self._c_creds = grpc_google_default_credentials_create(self._call_creds)
- return self._c_creds
+ with nogil:
+ self._c_creds = grpc_google_default_credentials_create(self._call_creds)
+ return self._c_creds
def channel_credentials_compute_engine(call_creds):
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi
index a925bdd2e69..b300883abae 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi
@@ -12,18 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+cdef void __prefork() noexcept nogil
-cdef extern from "pthread.h" nogil:
- int pthread_atfork(
- void (*prepare)() nogil,
- void (*parent)() nogil,
- void (*child)() nogil)
+cdef void __postfork_parent() noexcept nogil
-cdef void __prefork() nogil
-
-cdef void __postfork_parent() nogil
-
-
-cdef void __postfork_child() nogil \ No newline at end of file
+cdef void __postfork_child() noexcept nogil
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
index 53657e8b1a9..d901cfddf43 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
_AWAIT_THREADS_TIMEOUT_SECONDS = 5
@@ -34,7 +35,7 @@ _GRPC_ENABLE_FORK_SUPPORT = (
_fork_handler_failed = False
-cdef void __prefork() nogil:
+cdef void __prefork() noexcept nogil:
with gil:
global _fork_handler_failed
_fork_handler_failed = False
@@ -48,14 +49,14 @@ cdef void __prefork() nogil:
_fork_handler_failed = True
-cdef void __postfork_parent() nogil:
+cdef void __postfork_parent() noexcept nogil:
with gil:
with _fork_state.fork_in_progress_condition:
_fork_state.fork_in_progress = False
_fork_state.fork_in_progress_condition.notify_all()
-cdef void __postfork_child() nogil:
+cdef void __postfork_child() noexcept nogil:
with gil:
try:
if _fork_handler_failed:
@@ -90,7 +91,9 @@ def fork_handlers_and_grpc_init():
if _GRPC_ENABLE_FORK_SUPPORT:
with _fork_state.fork_handler_registered_lock:
if not _fork_state.fork_handler_registered:
- pthread_atfork(&__prefork, &__postfork_parent, &__postfork_child)
+ os.register_at_fork(before=__prefork,
+ after_in_parent=__postfork_parent,
+ after_in_child=__postfork_child)
_fork_state.fork_handler_registered = True
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi
index 6e04e0cbfd4..43e751ff89e 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from libcpp.string cimport string
+
cimport libc.time
ctypedef ssize_t intptr_t
@@ -57,6 +59,28 @@ cdef extern from "<condition_variable>" namespace "std" nogil:
# gRPC Core Declarations
+cdef extern from "src/core/lib/channel/call_tracer.h" namespace "grpc_core":
+ cdef cppclass ClientCallTracer:
+ pass
+
+ cdef cppclass ServerCallTracer:
+ string TraceId() nogil
+ string SpanId() nogil
+ bint IsSampled() nogil
+
+ cdef cppclass ServerCallTracerFactory:
+ @staticmethod
+ void RegisterGlobal(ServerCallTracerFactory* factory) nogil
+
+cdef extern from "src/core/lib/channel/context.h":
+ ctypedef enum grpc_context_index:
+ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE
+
+cdef extern from "src/core/lib/surface/call.h":
+ void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
+ void* value, void (*destroy)(void* value)) nogil
+ void *grpc_call_context_get(grpc_call* call, grpc_context_index elem) nogil
+
cdef extern from "grpc/support/alloc.h":
void *gpr_malloc(size_t size) nogil
@@ -410,6 +434,12 @@ cdef extern from "grpc/grpc.h":
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
grpc_completion_queue *completion_queue, grpc_slice method,
const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil
+ void *grpc_channel_register_call(
+ grpc_channel *channel, const char *method, const char *host, void *reserved) nogil
+ grpc_call *grpc_channel_create_registered_call(
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_completion_queue *completion_queue, void* registered_call_handle,
+ gpr_timespec deadline, void *reserved) nogil
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect) nogil
void grpc_channel_watch_connectivity_state(
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/observability.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/observability.pyx.pxi
new file mode 100644
index 00000000000..aa7dce5e8ac
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/observability.pyx.pxi
@@ -0,0 +1,61 @@
+# Copyright 2023 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import codecs
+from typing import Optional
+
+from libcpp.cast cimport static_cast
+
+from grpc import _observability
+
+
+cdef const char* CLIENT_CALL_TRACER = "client_call_tracer"
+cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory"
+
+
+def set_server_call_tracer_factory(object observability_plugin) -> None:
+ capsule = observability_plugin.create_server_call_tracer_factory()
+ capsule_ptr = cpython.PyCapsule_GetPointer(capsule, SERVER_CALL_TRACER_FACTORY)
+ _register_server_call_tracer_factory(capsule_ptr)
+
+
+def clear_server_call_tracer_factory() -> None:
+ _register_server_call_tracer_factory(NULL)
+
+
+def maybe_save_server_trace_context(RequestCallEvent event) -> None:
+ cdef ServerCallTracer* server_call_tracer
+ with _observability.get_plugin() as plugin:
+ if not (plugin and plugin.tracing_enabled):
+ return
+ server_call_tracer = static_cast['ServerCallTracer*'](_get_call_tracer(event.call.c_call))
+ # TraceId and SpanId is hex string, need to convert to str
+ trace_id = _decode(codecs.decode(server_call_tracer.TraceId(), 'hex_codec'))
+ span_id = _decode(codecs.decode(server_call_tracer.SpanId(), 'hex_codec'))
+ is_sampled = server_call_tracer.IsSampled()
+ plugin.save_trace_context(trace_id, span_id, is_sampled)
+
+
+cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr):
+ cdef ClientCallTracer* call_tracer = <ClientCallTracer*>capsule_ptr
+ grpc_call_context_set(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, call_tracer, NULL)
+
+
+cdef void* _get_call_tracer(grpc_call* call):
+ cdef void* call_tracer = grpc_call_context_get(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)
+ return call_tracer
+
+
+cdef void _register_server_call_tracer_factory(void* capsule_ptr):
+ cdef ServerCallTracerFactory* call_tracer_factory = <ServerCallTracerFactory*>capsule_ptr
+ ServerCallTracerFactory.RegisterGlobal(call_tracer_factory)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi
index da4b81bd97e..f59410073b7 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi
@@ -13,16 +13,16 @@
# limitations under the License.
# TODO(https://github.com/grpc/grpc/issues/15662): Reform this.
-cdef void* _copy_pointer(void* pointer):
+cdef void* _copy_pointer(void* pointer) noexcept:
return pointer
# TODO(https://github.com/grpc/grpc/issues/15662): Reform this.
-cdef void _destroy_pointer(void* pointer):
+cdef void _destroy_pointer(void* pointer) noexcept:
pass
-cdef int _compare_pointer(void* first_pointer, void* second_pointer):
+cdef int _compare_pointer(void* first_pointer, void* second_pointer) noexcept:
if first_pointer < second_pointer:
return -1
elif first_pointer > second_pointer:
diff --git a/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx
index c7925676c34..9078f82fa31 100644
--- a/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx
+++ b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx
@@ -55,6 +55,7 @@ include "_cygrpc/tag.pyx.pxi"
include "_cygrpc/time.pyx.pxi"
include "_cygrpc/vtable.pyx.pxi"
include "_cygrpc/_hooks.pyx.pxi"
+include "_cygrpc/observability.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx.pxi"
@@ -79,16 +80,10 @@ include "_cygrpc/aio/server.pyx.pxi"
#
# initialize gRPC
#
-cdef extern from "Python.h":
-
- int PyEval_InitThreads()
-
cdef _initialize():
- # We have Python callbacks called by c-core threads, this ensures the GIL
- # is initialized.
- PyEval_InitThreads()
import ssl
grpc_dont_init_openssl()
# Load Arcadia certs in ComputePemRootCerts and do not override here.
+
_initialize()