diff options
author | robot-piglet <[email protected]> | 2025-08-29 14:19:24 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-08-29 14:40:38 +0300 |
commit | 5715939b5b1a1812ed85171fb519f9c1c3c326e8 (patch) | |
tree | 5d981253427e490749bbb50d3616507fa0d6d1bc /contrib/python/grpcio/py3/grpc/_cython | |
parent | c390a008ee5d15e1d8f49326671908f375e0851b (diff) |
Intermediate changes
commit_hash:88dd3a7e237f5ebeb9b302a0e6866042635fda83
Diffstat (limited to 'contrib/python/grpcio/py3/grpc/_cython')
16 files changed, 311 insertions, 89 deletions
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi index de4d71b8196..c2b37594f95 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi @@ -17,6 +17,9 @@ cdef object _custom_op_on_c_call(int op, grpc_call *call): raise NotImplementedError("No custom hooks are implemented") def install_context_from_request_call_event(RequestCallEvent event): + maybe_save_server_trace_context(event) + +def install_context_from_request_call_event_aio(GrpcCallWrapper event): pass def uninstall_context(): @@ -31,5 +34,8 @@ cdef class CensusContext: def set_census_context_on_call(_CallState call_state, CensusContext census_ctx): pass +def set_instrumentation_context_on_call_aio(GrpcCallWrapper call_state, CensusContext census_ctx): + pass + def get_deadline_from_context(): return None diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi index 7bce1850dc8..00c0a29c2ab 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -285,14 +285,24 @@ cdef class _AioCall(GrpcCallWrapper): return False + def set_internal_error(self, str error_str): + self._set_status(AioRpcStatus( + StatusCode.internal, + 'Internal error from Core', + (), + error_str, + )) + async def unary_unary(self, bytes request, - tuple outbound_initial_metadata): + tuple outbound_initial_metadata, + object context = None): """Performs a unary unary RPC. Args: request: the serialized requests in bytes. outbound_initial_metadata: optional outbound metadata. + context: instrumentation context. """ cdef tuple ops @@ -305,6 +315,8 @@ cdef class _AioCall(GrpcCallWrapper): cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS) cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) + if context is not None: + set_instrumentation_context_on_call_aio(self, context) ops = (initial_metadata_op, send_message_op, send_close_op, receive_initial_metadata_op, receive_message_op, receive_status_on_client_op) @@ -382,7 +394,8 @@ cdef class _AioCall(GrpcCallWrapper): async def initiate_unary_stream(self, bytes request, - tuple outbound_initial_metadata): + tuple outbound_initial_metadata, + object context = None): """Implementation of the start of a unary-stream call.""" # Peer may prematurely end this RPC at any point. We need a corutine # that watches if the server sends the final status. @@ -398,6 +411,8 @@ cdef class _AioCall(GrpcCallWrapper): cdef Operation send_close_op = SendCloseFromClientOperation( _EMPTY_FLAGS) + if context is not None: + set_instrumentation_context_on_call_aio(self, context) outbound_ops = ( initial_metadata_op, send_message_op, @@ -421,7 +436,8 @@ cdef class _AioCall(GrpcCallWrapper): async def stream_unary(self, tuple outbound_initial_metadata, - object metadata_sent_observer): + object metadata_sent_observer, + object context = None): """Actual implementation of the complete unary-stream call. Needs to pay extra attention to the raise mechanism. If we want to @@ -452,6 +468,9 @@ cdef class _AioCall(GrpcCallWrapper): cdef tuple inbound_ops cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS) cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) + + if context is not None: + set_instrumentation_context_on_call_aio(self, context) inbound_ops = (receive_message_op, receive_status_on_client_op) # Executes all operations in one batch. @@ -476,7 +495,8 @@ cdef class _AioCall(GrpcCallWrapper): async def initiate_stream_stream(self, tuple outbound_initial_metadata, - object metadata_sent_observer): + object metadata_sent_observer, + object context = None): """Actual implementation of the complete stream-stream call. Needs to pay extra attention to the raise mechanism. If we want to @@ -487,6 +507,9 @@ cdef class _AioCall(GrpcCallWrapper): # that watches if the server sends the final status. status_task = self._loop.create_task(self._handle_status_once_received()) + if context is not None: + set_instrumentation_context_on_call_aio(self, context) + try: # Sends out initial_metadata ASAP. await _send_initial_metadata(self, @@ -505,4 +528,4 @@ cdef class _AioCall(GrpcCallWrapper): await status_task # Allow upper layer to proceed only if the status is set - metadata_sent_observer() + metadata_sent_observer()
\ No newline at end of file diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi index e54e5107547..26edbdb917b 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi @@ -48,7 +48,7 @@ cdef class CallbackWrapper: @staticmethod cdef void functor_run( grpc_completion_queue_functor* functor, - int succeed) + int succeed) noexcept cdef grpc_completion_queue_functor *c_functor(self) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index 14a0098fc20..2b0df0e5ce7 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -50,7 +50,7 @@ cdef class CallbackWrapper: @staticmethod cdef void functor_run( grpc_completion_queue_functor* functor, - int success): + int success) noexcept: cdef CallbackContext *context = <CallbackContext *>functor cdef object waiter = <object>context.waiter if not waiter.cancelled(): diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi index f698390cd5c..0e3e8de00bf 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import warnings + from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION TYPE_METADATA_STRING = "Tuple[Tuple[str, Union[str, bytes]]...]" @@ -181,7 +183,15 @@ if PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7: try: return asyncio.get_running_loop() except RuntimeError: - return asyncio.get_event_loop_policy().get_event_loop() + with warnings.catch_warnings(): + # Convert DeprecationWarning to errors so we can capture them with except + warnings.simplefilter("error", DeprecationWarning) + try: + return asyncio.get_event_loop_policy().get_event_loop() + # Since version 3.12, DeprecationWarning is emitted if there is no + # current event loop. + except DeprecationWarning: + return asyncio.get_event_loop_policy().new_event_loop() else: def get_working_loop(): """Returns a running event loop.""" diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi index fe10c3883c3..15050e6b21e 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -29,6 +29,7 @@ cdef class RPCState(GrpcCallWrapper): cdef bint metadata_sent cdef bint status_sent cdef grpc_status_code status_code + cdef object py_status_code cdef str status_details cdef tuple trailing_metadata cdef object compression_algorithm @@ -71,8 +72,7 @@ cdef enum AioServerStatus: cdef class _ConcurrentRpcLimiter: cdef int _maximum_concurrent_rpcs cdef int _active_rpcs - cdef object _active_rpcs_condition # asyncio.Condition - cdef object _loop # asyncio.EventLoop + cdef bint limiter_concurrency_exceeded cdef class AioServer: diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi index e85efdd0b9f..d166bd9fabf 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -55,6 +55,7 @@ cdef class RPCState: self.metadata_sent = False self.status_sent = False self.status_code = StatusCode.ok + self.py_status_code = None self.status_details = '' self.trailing_metadata = _IMMUTABLE_EMPTY_METADATA self.compression_algorithm = None @@ -184,6 +185,7 @@ cdef class _ServicerContext: self._rpc_state.status_details = details actual_code = get_status_code(code) + self._rpc_state.py_status_code = code self._rpc_state.status_code = actual_code self._rpc_state.status_sent = True @@ -213,9 +215,10 @@ cdef class _ServicerContext: def set_code(self, object code): self._rpc_state.status_code = get_status_code(code) + self._rpc_state.py_status_code = code def code(self): - return self._rpc_state.status_code + return self._rpc_state.py_status_code def set_details(self, str details): self._rpc_state.status_details = details @@ -398,6 +401,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state, # Executes application logic cdef object response_message cdef _SyncServicerContext sync_servicer_context + install_context_from_request_call_event_aio(rpc_state) if _is_async_handler(unary_handler): # Run async method handlers in this coroutine @@ -450,6 +454,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state, rpc_state.metadata_sent = True rpc_state.status_sent = True await execute_batch(rpc_state, finish_ops, loop) + uninstall_context() async def _finish_handler_with_stream_responses(RPCState rpc_state, @@ -465,6 +470,7 @@ async def _finish_handler_with_stream_responses(RPCState rpc_state, """ cdef object async_response_generator cdef object response_message + install_context_from_request_call_event_aio(rpc_state) if inspect.iscoroutinefunction(stream_handler): # Case 1: Coroutine async handler - using reader-writer API @@ -518,6 +524,7 @@ async def _finish_handler_with_stream_responses(RPCState rpc_state, rpc_state.metadata_sent = True rpc_state.status_sent = True await execute_batch(rpc_state, finish_ops, loop) + uninstall_context() async def _handle_unary_unary_rpc(object method_handler, @@ -702,7 +709,9 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop): if rpc_state.client_closed: return else: - raise + _LOGGER.exception('ExecuteBatchError raised in core by servicer method [%s]' % ( + _decode(rpc_state.method()))) + return except Exception as e: _LOGGER.exception('Unexpected [%s] raised by servicer method [%s]' % ( type(e).__name__, @@ -774,7 +783,7 @@ async def _schedule_rpc_coro(object rpc_coro, async def _handle_rpc(list generic_handlers, tuple interceptors, - RPCState rpc_state, object loop): + RPCState rpc_state, object loop, bint concurrency_exceeded): cdef object method_handler # Finds the method handler (application logic) method_handler = await _find_method_handler( @@ -795,6 +804,18 @@ async def _handle_rpc(list generic_handlers, tuple interceptors, ) return + if concurrency_exceeded: + rpc_state.status_sent = True + await _send_error_status_from_server( + rpc_state, + StatusCode.resource_exhausted, + 'Concurrent RPC limit exceeded!', + _IMMUTABLE_EMPTY_METADATA, + rpc_state.create_send_initial_metadata_op_if_not_sent(), + loop + ) + return + # Handles unary-unary case if not method_handler.request_streaming and not method_handler.response_streaming: await _handle_unary_unary_rpc(method_handler, @@ -838,33 +859,23 @@ cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHan cdef class _ConcurrentRpcLimiter: - def __cinit__(self, int maximum_concurrent_rpcs, object loop): + def __cinit__(self, int maximum_concurrent_rpcs): if maximum_concurrent_rpcs <= 0: raise ValueError("maximum_concurrent_rpcs should be a postive integer") self._maximum_concurrent_rpcs = maximum_concurrent_rpcs self._active_rpcs = 0 - self._active_rpcs_condition = asyncio.Condition() - self._loop = loop + self.limiter_concurrency_exceeded = False - async def check_before_request_call(self): - await self._active_rpcs_condition.acquire() - try: - predicate = lambda: self._active_rpcs < self._maximum_concurrent_rpcs - await self._active_rpcs_condition.wait_for(predicate) + def check_before_request_call(self): + if self._active_rpcs >= self._maximum_concurrent_rpcs: + self.limiter_concurrency_exceeded = True + else: self._active_rpcs += 1 - finally: - self._active_rpcs_condition.release() - - async def _decrease_active_rpcs_count_with_lock(self): - await self._active_rpcs_condition.acquire() - try: - self._active_rpcs -= 1 - self._active_rpcs_condition.notify() - finally: - self._active_rpcs_condition.release() def _decrease_active_rpcs_count(self, unused_future): - self._loop.create_task(self._decrease_active_rpcs_count_with_lock()) + self._active_rpcs -= 1 + if self._active_rpcs < self._maximum_concurrent_rpcs: + self.limiter_concurrency_exceeded = False def decrease_once_finished(self, object rpc_task): rpc_task.add_done_callback(self._decrease_active_rpcs_count) @@ -906,8 +917,7 @@ cdef class AioServer: self._thread_pool = thread_pool if maximum_concurrent_rpcs is not None: - self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs, - loop) + self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs) def add_generic_rpc_handlers(self, object generic_rpc_handlers): self._generic_handlers.extend(generic_rpc_handlers) @@ -950,8 +960,10 @@ cdef class AioServer: if self._status != AIO_SERVER_STATUS_RUNNING: break + concurrency_exceeded = False if self._limiter is not None: - await self._limiter.check_before_request_call() + self._limiter.check_before_request_call() + concurrency_exceeded = self._limiter.limiter_concurrency_exceeded # Accepts new request from Core rpc_state = await self._request_call() @@ -964,7 +976,8 @@ cdef class AioServer: rpc_coro = _handle_rpc(self._generic_handlers, self._interceptors, rpc_state, - self._loop) + self._loop, + concurrency_exceeded) # Fires off a task that listens on the cancellation from client. rpc_task = self._loop.create_task( diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi index eb27f2df7ad..96d03e181b9 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi @@ -26,10 +26,15 @@ cdef class _CallState: cdef grpc_call *c_call cdef set due + # call_tracer_capsule should have type of grpc._observability.ClientCallTracerCapsule + cdef object call_tracer_capsule + cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except * + cdef void maybe_delete_call_tracer(self) except * cdef class _ChannelState: + cdef bytes target cdef object condition cdef grpc_channel *c_channel # A boolean field indicating that the channel is open (if True) or is being @@ -69,6 +74,13 @@ cdef class SegregatedCall: cdef class Channel: cdef _ChannelState _state + cdef dict _registered_call_handles # TODO(https://github.com/grpc/grpc/issues/15662): Eliminate this. cdef tuple _arguments + + +cdef class CallHandle: + + cdef void *c_call_handle + cdef object method diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi index d49a4210f7c..dde3b166789 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from grpc import _observability _INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( 'Internal gRPC call error %d. ' + @@ -71,10 +72,28 @@ cdef class _CallState: def __cinit__(self): self.due = set() + cdef void maybe_delete_call_tracer(self) except *: + if not self.call_tracer_capsule: + return + _observability.delete_call_tracer(self.call_tracer_capsule) + + cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *: + # TODO(xuanwn): use channel args to exclude those metrics. + for exclude_prefix in _observability._SERVICES_TO_EXCLUDE: + if exclude_prefix in method_name: + return + with _observability.get_plugin() as plugin: + if not (plugin and plugin.observability_enabled): + return + capsule = plugin.create_client_call_tracer(method_name, target) + capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER) + _set_call_tracer(self.c_call, capsule_ptr) + self.call_tracer_capsule = capsule cdef class _ChannelState: - def __cinit__(self): + def __cinit__(self, target): + self.target = target self.condition = threading.Condition() self.open = True self.integrated_call_states = {} @@ -82,6 +101,25 @@ cdef class _ChannelState: self.connectivity_due = set() self.closed_reason = None +cdef class CallHandle: + + def __cinit__(self, _ChannelState channel_state, object method): + self.method = method + cpython.Py_INCREF(method) + # Note that since we always pass None for host, we set the + # second-to-last parameter of grpc_channel_register_call to a fixed + # NULL value. + self.c_call_handle = grpc_channel_register_call( + channel_state.c_channel, <const char *>method, NULL, NULL) + + def __dealloc__(self): + cpython.Py_DECREF(self.method) + + @property + def call_handle(self): + return cpython.PyLong_FromVoidPtr(self.c_call_handle) + + cdef tuple _operate(grpc_call *c_call, object operations, object user_tag): cdef grpc_call_error c_call_error @@ -180,7 +218,7 @@ cdef void _call( grpc_completion_queue *c_completion_queue, on_success, int flags, method, host, object deadline, CallCredentials credentials, object operationses_and_user_tags, object metadata, - object context) except *: + object context, object registered_call_handle) except *: """Invokes an RPC. Args: @@ -207,6 +245,8 @@ cdef void _call( must be present in the first element of this value. metadata: The metadata for this call. context: Context object for distributed tracing. + registered_call_handle: An int representing the call handle of the method, or + None if the method is not registered. """ cdef grpc_slice method_slice cdef grpc_slice host_slice @@ -223,13 +263,20 @@ cdef void _call( else: host_slice = _slice_from_bytes(host) host_slice_ptr = &host_slice - call_state.c_call = grpc_channel_create_call( - channel_state.c_channel, NULL, flags, - c_completion_queue, method_slice, host_slice_ptr, - _timespec_from_time(deadline), NULL) + if registered_call_handle: + call_state.c_call = grpc_channel_create_registered_call( + channel_state.c_channel, NULL, flags, + c_completion_queue, cpython.PyLong_AsVoidPtr(registered_call_handle), + _timespec_from_time(deadline), NULL) + else: + call_state.c_call = grpc_channel_create_call( + channel_state.c_channel, NULL, flags, + c_completion_queue, method_slice, host_slice_ptr, + _timespec_from_time(deadline), NULL) grpc_slice_unref(method_slice) if host_slice_ptr: grpc_slice_unref(host_slice) + call_state.maybe_set_client_call_tracer_on_call(method, channel_state.target) if context is not None: set_census_context_on_call(call_state, context) if credentials is not None: @@ -238,7 +285,9 @@ cdef void _call( call_state.c_call, c_call_credentials) grpc_call_credentials_release(c_call_credentials) if c_call_error != GRPC_CALL_OK: - grpc_call_unref(call_state.c_call) + #TODO(xuanwn): Expand the scope of nogil + with nogil: + grpc_call_unref(call_state.c_call) call_state.c_call = NULL _raise_call_error_no_metadata(c_call_error) started_tags = set() @@ -248,7 +297,9 @@ cdef void _call( started_tags.add(tag) else: grpc_call_cancel(call_state.c_call, NULL) - grpc_call_unref(call_state.c_call) + #TODO(xuanwn): Expand the scope of nogil + with nogil: + grpc_call_unref(call_state.c_call) call_state.c_call = NULL _raise_call_error(c_call_error, metadata) else: @@ -263,9 +314,10 @@ cdef void _process_integrated_call_tag( cdef _CallState call_state = state.integrated_call_states.pop(tag) call_state.due.remove(tag) if not call_state.due: - grpc_call_unref(call_state.c_call) + with nogil: + grpc_call_unref(call_state.c_call) call_state.c_call = NULL - + call_state.maybe_delete_call_tracer() cdef class IntegratedCall: @@ -284,7 +336,7 @@ cdef class IntegratedCall: cdef IntegratedCall _integrated_call( _ChannelState state, int flags, method, host, object deadline, object metadata, CallCredentials credentials, operationses_and_user_tags, - object context): + object context, object registered_call_handle): call_state = _CallState() def on_success(started_tags): @@ -293,7 +345,8 @@ cdef IntegratedCall _integrated_call( _call( state, call_state, state.c_call_completion_queue, on_success, flags, - method, host, deadline, credentials, operationses_and_user_tags, metadata, context) + method, host, deadline, credentials, operationses_and_user_tags, + metadata, context, registered_call_handle) return IntegratedCall(state, call_state) @@ -303,8 +356,11 @@ cdef object _process_segregated_call_tag( grpc_completion_queue *c_completion_queue, _BatchOperationTag tag): call_state.due.remove(tag) if not call_state.due: - grpc_call_unref(call_state.c_call) + #TODO(xuanwn): Expand the scope of nogil + with nogil: + grpc_call_unref(call_state.c_call) call_state.c_call = NULL + call_state.maybe_delete_call_tracer() state.segregated_call_states.remove(call_state) _destroy_c_completion_queue(c_completion_queue) return True @@ -331,7 +387,8 @@ cdef class SegregatedCall: self._channel_state, self._call_state, self._c_completion_queue, tag) def on_failure(): self._call_state.due.clear() - grpc_call_unref(self._call_state.c_call) + with nogil: + grpc_call_unref(self._call_state.c_call) self._call_state.c_call = NULL self._channel_state.segregated_call_states.remove(self._call_state) _destroy_c_completion_queue(self._c_completion_queue) @@ -342,7 +399,7 @@ cdef class SegregatedCall: cdef SegregatedCall _segregated_call( _ChannelState state, int flags, method, host, object deadline, object metadata, CallCredentials credentials, operationses_and_user_tags, - object context): + object context, object registered_call_handle): cdef _CallState call_state = _CallState() cdef SegregatedCall segregated_call cdef grpc_completion_queue *c_completion_queue @@ -360,7 +417,7 @@ cdef SegregatedCall _segregated_call( _call( state, call_state, c_completion_queue, on_success, flags, method, host, deadline, credentials, operationses_and_user_tags, metadata, - context) + context, registered_call_handle) except: _destroy_c_completion_queue(c_completion_queue) raise @@ -445,7 +502,7 @@ cdef class Channel: ChannelCredentials channel_credentials): arguments = () if arguments is None else tuple(arguments) fork_handlers_and_grpc_init() - self._state = _ChannelState() + self._state = _ChannelState(target) self._state.c_call_completion_queue = ( grpc_completion_queue_create_for_next(NULL)) self._state.c_connectivity_completion_queue = ( @@ -457,6 +514,7 @@ cdef class Channel: else grpc_insecure_credentials_create()) self._state.c_channel = grpc_channel_create( <char *>target, c_channel_credentials, channel_args.c_args()) + self._registered_call_handles = {} grpc_channel_credentials_release(c_channel_credentials) def target(self): @@ -470,10 +528,10 @@ cdef class Channel: def integrated_call( self, int flags, method, host, object deadline, object metadata, CallCredentials credentials, operationses_and_tags, - object context = None): + object context = None, object registered_call_handle = None): return _integrated_call( self._state, flags, method, host, deadline, metadata, credentials, - operationses_and_tags, context) + operationses_and_tags, context, registered_call_handle) def next_call_event(self): def on_success(tag): @@ -492,10 +550,10 @@ cdef class Channel: def segregated_call( self, int flags, method, host, object deadline, object metadata, CallCredentials credentials, operationses_and_tags, - object context = None): + object context = None, object registered_call_handle = None): return _segregated_call( self._state, flags, method, host, deadline, metadata, credentials, - operationses_and_tags, context) + operationses_and_tags, context, registered_call_handle) def check_connectivity_state(self, bint try_to_connect): with self._state.condition: @@ -514,3 +572,19 @@ cdef class Channel: def close_on_fork(self, code, details): _close(self, code, details, True) + + def get_registered_call_handle(self, method): + """ + Get or registers a call handler for a method. + + This method is not thread-safe. + + Args: + method: Required, the method name for the RPC. + + Returns: + The registered call handle pointer in the form of a Python Long. + """ + if method not in self._registered_call_handles.keys(): + self._registered_call_handles[method] = CallHandle(self._state, method) + return self._registered_call_handles[method].call_handle diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi index 27b56aa378b..181704cb85a 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -153,8 +153,9 @@ cdef class SSLChannelCredentials(ChannelCredentials): else: c_pem_root_certificates = self._pem_root_certificates if self._private_key is None and self._certificate_chain is None: - return grpc_ssl_credentials_create( - c_pem_root_certificates, NULL, NULL, NULL) + with nogil: + return grpc_ssl_credentials_create( + c_pem_root_certificates, NULL, NULL, NULL) else: if self._private_key: c_pem_key_certificate_pair.private_key = self._private_key @@ -164,8 +165,9 @@ cdef class SSLChannelCredentials(ChannelCredentials): c_pem_key_certificate_pair.certificate_chain = self._certificate_chain else: c_pem_key_certificate_pair.certificate_chain = NULL - return grpc_ssl_credentials_create( - c_pem_root_certificates, &c_pem_key_certificate_pair, NULL, NULL) + with nogil: + return grpc_ssl_credentials_create( + c_pem_root_certificates, &c_pem_key_certificate_pair, NULL, NULL) cdef class CompositeChannelCredentials(ChannelCredentials): @@ -314,7 +316,7 @@ def server_credentials_ssl_dynamic_cert_config(initial_cert_config, return credentials cdef grpc_ssl_certificate_config_reload_status _server_cert_config_fetcher_wrapper( - void* user_data, grpc_ssl_server_certificate_config **config) with gil: + void* user_data, grpc_ssl_server_certificate_config **config) noexcept with gil: # This is a credentials.ServerCertificateConfig cdef ServerCertificateConfig cert_config = None if not user_data: @@ -435,8 +437,9 @@ cdef class ComputeEngineChannelCredentials(ChannelCredentials): raise ValueError("Call credentials may not be NULL.") cdef grpc_channel_credentials *c(self) except *: - self._c_creds = grpc_google_default_credentials_create(self._call_creds) - return self._c_creds + with nogil: + self._c_creds = grpc_google_default_credentials_create(self._call_creds) + return self._c_creds def channel_credentials_compute_engine(call_creds): diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi index a925bdd2e69..b300883abae 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi @@ -12,18 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +cdef void __prefork() noexcept nogil -cdef extern from "pthread.h" nogil: - int pthread_atfork( - void (*prepare)() nogil, - void (*parent)() nogil, - void (*child)() nogil) +cdef void __postfork_parent() noexcept nogil -cdef void __prefork() nogil - -cdef void __postfork_parent() nogil - - -cdef void __postfork_child() nogil
\ No newline at end of file +cdef void __postfork_child() noexcept nogil diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi index 53657e8b1a9..d901cfddf43 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os _AWAIT_THREADS_TIMEOUT_SECONDS = 5 @@ -34,7 +35,7 @@ _GRPC_ENABLE_FORK_SUPPORT = ( _fork_handler_failed = False -cdef void __prefork() nogil: +cdef void __prefork() noexcept nogil: with gil: global _fork_handler_failed _fork_handler_failed = False @@ -48,14 +49,14 @@ cdef void __prefork() nogil: _fork_handler_failed = True -cdef void __postfork_parent() nogil: +cdef void __postfork_parent() noexcept nogil: with gil: with _fork_state.fork_in_progress_condition: _fork_state.fork_in_progress = False _fork_state.fork_in_progress_condition.notify_all() -cdef void __postfork_child() nogil: +cdef void __postfork_child() noexcept nogil: with gil: try: if _fork_handler_failed: @@ -90,7 +91,9 @@ def fork_handlers_and_grpc_init(): if _GRPC_ENABLE_FORK_SUPPORT: with _fork_state.fork_handler_registered_lock: if not _fork_state.fork_handler_registered: - pthread_atfork(&__prefork, &__postfork_parent, &__postfork_child) + os.register_at_fork(before=__prefork, + after_in_parent=__postfork_parent, + after_in_child=__postfork_child) _fork_state.fork_handler_registered = True diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi index 6e04e0cbfd4..43e751ff89e 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from libcpp.string cimport string + cimport libc.time ctypedef ssize_t intptr_t @@ -57,6 +59,28 @@ cdef extern from "<condition_variable>" namespace "std" nogil: # gRPC Core Declarations +cdef extern from "src/core/lib/channel/call_tracer.h" namespace "grpc_core": + cdef cppclass ClientCallTracer: + pass + + cdef cppclass ServerCallTracer: + string TraceId() nogil + string SpanId() nogil + bint IsSampled() nogil + + cdef cppclass ServerCallTracerFactory: + @staticmethod + void RegisterGlobal(ServerCallTracerFactory* factory) nogil + +cdef extern from "src/core/lib/channel/context.h": + ctypedef enum grpc_context_index: + GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE + +cdef extern from "src/core/lib/surface/call.h": + void grpc_call_context_set(grpc_call* call, grpc_context_index elem, + void* value, void (*destroy)(void* value)) nogil + void *grpc_call_context_get(grpc_call* call, grpc_context_index elem) nogil + cdef extern from "grpc/support/alloc.h": void *gpr_malloc(size_t size) nogil @@ -410,6 +434,12 @@ cdef extern from "grpc/grpc.h": grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, grpc_slice method, const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil + void *grpc_channel_register_call( + grpc_channel *channel, const char *method, const char *host, void *reserved) nogil + grpc_call *grpc_channel_create_registered_call( + grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, + grpc_completion_queue *completion_queue, void* registered_call_handle, + gpr_timespec deadline, void *reserved) nogil grpc_connectivity_state grpc_channel_check_connectivity_state( grpc_channel *channel, int try_to_connect) nogil void grpc_channel_watch_connectivity_state( diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/observability.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/observability.pyx.pxi new file mode 100644 index 00000000000..aa7dce5e8ac --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/observability.pyx.pxi @@ -0,0 +1,61 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import codecs +from typing import Optional + +from libcpp.cast cimport static_cast + +from grpc import _observability + + +cdef const char* CLIENT_CALL_TRACER = "client_call_tracer" +cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory" + + +def set_server_call_tracer_factory(object observability_plugin) -> None: + capsule = observability_plugin.create_server_call_tracer_factory() + capsule_ptr = cpython.PyCapsule_GetPointer(capsule, SERVER_CALL_TRACER_FACTORY) + _register_server_call_tracer_factory(capsule_ptr) + + +def clear_server_call_tracer_factory() -> None: + _register_server_call_tracer_factory(NULL) + + +def maybe_save_server_trace_context(RequestCallEvent event) -> None: + cdef ServerCallTracer* server_call_tracer + with _observability.get_plugin() as plugin: + if not (plugin and plugin.tracing_enabled): + return + server_call_tracer = static_cast['ServerCallTracer*'](_get_call_tracer(event.call.c_call)) + # TraceId and SpanId is hex string, need to convert to str + trace_id = _decode(codecs.decode(server_call_tracer.TraceId(), 'hex_codec')) + span_id = _decode(codecs.decode(server_call_tracer.SpanId(), 'hex_codec')) + is_sampled = server_call_tracer.IsSampled() + plugin.save_trace_context(trace_id, span_id, is_sampled) + + +cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr): + cdef ClientCallTracer* call_tracer = <ClientCallTracer*>capsule_ptr + grpc_call_context_set(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, call_tracer, NULL) + + +cdef void* _get_call_tracer(grpc_call* call): + cdef void* call_tracer = grpc_call_context_get(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE) + return call_tracer + + +cdef void _register_server_call_tracer_factory(void* capsule_ptr): + cdef ServerCallTracerFactory* call_tracer_factory = <ServerCallTracerFactory*>capsule_ptr + ServerCallTracerFactory.RegisterGlobal(call_tracer_factory) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi index da4b81bd97e..f59410073b7 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi @@ -13,16 +13,16 @@ # limitations under the License. # TODO(https://github.com/grpc/grpc/issues/15662): Reform this. -cdef void* _copy_pointer(void* pointer): +cdef void* _copy_pointer(void* pointer) noexcept: return pointer # TODO(https://github.com/grpc/grpc/issues/15662): Reform this. -cdef void _destroy_pointer(void* pointer): +cdef void _destroy_pointer(void* pointer) noexcept: pass -cdef int _compare_pointer(void* first_pointer, void* second_pointer): +cdef int _compare_pointer(void* first_pointer, void* second_pointer) noexcept: if first_pointer < second_pointer: return -1 elif first_pointer > second_pointer: diff --git a/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx index c7925676c34..9078f82fa31 100644 --- a/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx +++ b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx @@ -55,6 +55,7 @@ include "_cygrpc/tag.pyx.pxi" include "_cygrpc/time.pyx.pxi" include "_cygrpc/vtable.pyx.pxi" include "_cygrpc/_hooks.pyx.pxi" +include "_cygrpc/observability.pyx.pxi" include "_cygrpc/grpc_gevent.pyx.pxi" @@ -79,16 +80,10 @@ include "_cygrpc/aio/server.pyx.pxi" # # initialize gRPC # -cdef extern from "Python.h": - - int PyEval_InitThreads() - cdef _initialize(): - # We have Python callbacks called by c-core threads, this ensures the GIL - # is initialized. - PyEval_InitThreads() import ssl grpc_dont_init_openssl() # Load Arcadia certs in ComputePemRootCerts and do not override here. + _initialize() |