diff options
author | nkozlovskiy <[email protected]> | 2023-09-29 12:24:06 +0300 |
---|---|---|
committer | nkozlovskiy <[email protected]> | 2023-09-29 12:41:34 +0300 |
commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/grpcio/py3/grpc/_cython | |
parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) |
add ydb deps
Diffstat (limited to 'contrib/python/grpcio/py3/grpc/_cython')
61 files changed, 7364 insertions, 0 deletions
diff --git a/contrib/python/grpcio/py3/grpc/_cython/__init__.py b/contrib/python/grpcio/py3/grpc/_cython/__init__.py new file mode 100644 index 00000000000..5fb4f3c3cfd --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/__init__.py b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/__init__.py new file mode 100644 index 00000000000..5fb4f3c3cfd --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pxd.pxi new file mode 100644 index 00000000000..3eb10f52757 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pxd.pxi @@ -0,0 +1,16 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef object _custom_op_on_c_call(int op, grpc_call *call) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi new file mode 100644 index 00000000000..de4d71b8196 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi @@ -0,0 +1,35 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef object _custom_op_on_c_call(int op, grpc_call *call): + raise NotImplementedError("No custom hooks are implemented") + +def install_context_from_request_call_event(RequestCallEvent event): + pass + +def uninstall_context(): + pass + +def build_census_context(): + pass + +cdef class CensusContext: + pass + +def set_census_context_on_call(_CallState call_state, CensusContext census_ctx): + pass + +def get_deadline_from_context(): + return None diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pxd.pxi new file mode 100644 index 00000000000..867245a6944 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pxd.pxi @@ -0,0 +1,47 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _AioCall(GrpcCallWrapper): + cdef: + readonly AioChannel _channel + list _references + object _deadline + list _done_callbacks + + # Caches the picked event loop, so we can avoid the 30ns overhead each + # time we need access to the event loop. + object _loop + + # Flag indicates whether cancel being called or not. Cancellation from + # Core or peer works perfectly fine with normal procedure. However, we + # need this flag to clean up resources for cancellation from the + # application layer. Directly cancelling tasks might cause segfault + # because Core is holding a pointer for the callback handler. + bint _is_locally_cancelled + + # Following attributes are used for storing the status of the call and + # the initial metadata. Waiters are used for pausing the execution of + # tasks that are asking for one of the field when they are not yet + # available. + readonly AioRpcStatus _status + readonly tuple _initial_metadata + list _waiters_status + list _waiters_initial_metadata + + int _send_initial_metadata_flags + + cdef void _create_grpc_call(self, object timeout, bytes method, CallCredentials credentials) except * + cdef void _set_status(self, AioRpcStatus status) except * + cdef void _set_initial_metadata(self, tuple initial_metadata) except * diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi new file mode 100644 index 00000000000..7bce1850dc8 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -0,0 +1,508 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +_EMPTY_FLAGS = 0 +_EMPTY_MASK = 0 +_IMMUTABLE_EMPTY_METADATA = tuple() + +_UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled for unknown reason.' +_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n' + '\tstatus = {}\n' + '\tdetails = "{}"\n' + '>') + +_NON_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n' + '\tstatus = {}\n' + '\tdetails = "{}"\n' + '\tdebug_error_string = "{}"\n' + '>') + + +cdef int _get_send_initial_metadata_flags(object wait_for_ready) except *: + cdef int flags = 0 + # Wait-for-ready can be None, which means using default value in Core. + if wait_for_ready is not None: + flags |= InitialMetadataFlags.wait_for_ready_explicitly_set + if wait_for_ready: + flags |= InitialMetadataFlags.wait_for_ready + + flags &= InitialMetadataFlags.used_mask + return flags + + +cdef class _AioCall(GrpcCallWrapper): + + def __cinit__(self, AioChannel channel, object deadline, + bytes method, CallCredentials call_credentials, object wait_for_ready): + init_grpc_aio() + self.call = NULL + self._channel = channel + self._loop = channel.loop + self._references = [] + self._status = None + self._initial_metadata = None + self._waiters_status = [] + self._waiters_initial_metadata = [] + self._done_callbacks = [] + self._is_locally_cancelled = False + self._deadline = deadline + self._send_initial_metadata_flags = _get_send_initial_metadata_flags(wait_for_ready) + self._create_grpc_call(deadline, method, call_credentials) + + def __dealloc__(self): + if self.call: + grpc_call_unref(self.call) + shutdown_grpc_aio() + + def _repr(self) -> str: + """Assembles the RPC representation string.""" + # This needs to be loaded at run time once everything + # has been loaded. + from grpc import _common + + if not self.done(): + return '<{} object>'.format(self.__class__.__name__) + + if self._status.code() is StatusCode.ok: + return _OK_CALL_REPRESENTATION.format( + self.__class__.__name__, + _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[self._status.code()], + self._status.details()) + else: + return _NON_OK_CALL_REPRESENTATION.format( + self.__class__.__name__, + self._status.details(), + _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[self._status.code()], + self._status.debug_error_string()) + + def __repr__(self) -> str: + return self._repr() + + def __str__(self) -> str: + return self._repr() + + cdef void _create_grpc_call(self, + object deadline, + bytes method, + CallCredentials credentials) except *: + """Creates the corresponding Core object for this RPC. + + For unary calls, the grpc_call lives shortly and can be destroyed after + invoke start_batch. However, if either side is streaming, the grpc_call + life span will be longer than one function. So, it would better save it + as an instance variable than a stack variable, which reflects its + nature in Core. + """ + cdef grpc_slice method_slice + cdef gpr_timespec c_deadline = _timespec_from_time(deadline) + cdef grpc_call_error set_credentials_error + + method_slice = grpc_slice_from_copied_buffer( + <const char *> method, + <size_t> len(method) + ) + self.call = grpc_channel_create_call( + self._channel.channel, + NULL, + _EMPTY_MASK, + global_completion_queue(), + method_slice, + NULL, + c_deadline, + NULL + ) + + if credentials is not None: + set_credentials_error = grpc_call_set_credentials(self.call, credentials.c()) + if set_credentials_error != GRPC_CALL_OK: + raise InternalError("Credentials couldn't have been set: {0}".format(set_credentials_error)) + + grpc_slice_unref(method_slice) + + cdef void _set_status(self, AioRpcStatus status) except *: + cdef list waiters + + # No more waiters should be expected since status has been set. + self._status = status + + if self._initial_metadata is None: + self._set_initial_metadata(_IMMUTABLE_EMPTY_METADATA) + + for waiter in self._waiters_status: + if not waiter.done(): + waiter.set_result(None) + self._waiters_status = [] + + for callback in self._done_callbacks: + callback() + + cdef void _set_initial_metadata(self, tuple initial_metadata) except *: + if self._initial_metadata is not None: + # Some gRPC calls might end before the initial metadata arrived in + # the Call object. That causes this method to be invoked twice: 1. + # filled with an empty metadata; 2. updated with the actual user + # provided metadata. + return + + cdef list waiters + + # No more waiters should be expected since initial metadata has been + # set. + self._initial_metadata = initial_metadata + + for waiter in self._waiters_initial_metadata: + if not waiter.done(): + waiter.set_result(None) + self._waiters_initial_metadata = [] + + def add_done_callback(self, callback): + if self.done(): + callback() + else: + self._done_callbacks.append(callback) + + def time_remaining(self): + if self._deadline is None: + return None + else: + return max(0, self._deadline - time.time()) + + def cancel(self, str details): + """Cancels the RPC in Core with given RPC status. + + Above abstractions must invoke this method to set Core objects into + proper state. + """ + self._is_locally_cancelled = True + + cdef object details_bytes + cdef char *c_details + cdef grpc_call_error error + + self._set_status(AioRpcStatus( + StatusCode.cancelled, + details, + None, + None, + )) + + details_bytes = str_to_bytes(details) + self._references.append(details_bytes) + c_details = <char *>details_bytes + # By implementation, grpc_call_cancel_with_status always return OK + error = grpc_call_cancel_with_status( + self.call, + StatusCode.cancelled, + c_details, + NULL, + ) + assert error == GRPC_CALL_OK + + def done(self): + """Returns if the RPC call has finished. + + Checks if the status has been provided, either + because the RPC finished or because was cancelled.. + + Returns: + True if the RPC can be considered finished. + """ + return self._status is not None + + def cancelled(self): + """Returns if the RPC was cancelled. + + Returns: + True if the RPC was cancelled. + """ + if not self.done(): + return False + + return self._status.code() == StatusCode.cancelled + + async def status(self): + """Returns the status of the RPC call. + + It returns the finshed status of the RPC. If the RPC + has not finished yet this function will wait until the RPC + gets finished. + + Returns: + Finished status of the RPC as an AioRpcStatus object. + """ + if self._status is not None: + return self._status + + future = self._loop.create_future() + self._waiters_status.append(future) + await future + + return self._status + + def is_ok(self): + """Returns if the RPC is ended with ok.""" + return self.done() and self._status.code() == StatusCode.ok + + async def initial_metadata(self): + """Returns the initial metadata of the RPC call. + + If the initial metadata has not been received yet this function will + wait until the RPC gets finished. + + Returns: + The tuple object with the initial metadata. + """ + if self._initial_metadata is not None: + return self._initial_metadata + + future = self._loop.create_future() + self._waiters_initial_metadata.append(future) + await future + + return self._initial_metadata + + def is_locally_cancelled(self): + """Returns if the RPC was cancelled locally. + + Returns: + True when was cancelled locally, False when was cancelled remotelly or + is still ongoing. + """ + if self._is_locally_cancelled: + return True + + return False + + async def unary_unary(self, + bytes request, + tuple outbound_initial_metadata): + """Performs a unary unary RPC. + + Args: + request: the serialized requests in bytes. + outbound_initial_metadata: optional outbound metadata. + """ + cdef tuple ops + + cdef SendInitialMetadataOperation initial_metadata_op = SendInitialMetadataOperation( + outbound_initial_metadata, + self._send_initial_metadata_flags) + cdef SendMessageOperation send_message_op = SendMessageOperation(request, _EMPTY_FLAGS) + cdef SendCloseFromClientOperation send_close_op = SendCloseFromClientOperation(_EMPTY_FLAGS) + cdef ReceiveInitialMetadataOperation receive_initial_metadata_op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS) + cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS) + cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) + + ops = (initial_metadata_op, send_message_op, send_close_op, + receive_initial_metadata_op, receive_message_op, + receive_status_on_client_op) + + # Executes all operations in one batch. + # Might raise CancelledError, handling it in Python UnaryUnaryCall. + await execute_batch(self, + ops, + self._loop) + + self._set_initial_metadata(receive_initial_metadata_op.initial_metadata()) + + cdef grpc_status_code code + code = receive_status_on_client_op.code() + + self._set_status(AioRpcStatus( + code, + receive_status_on_client_op.details(), + receive_status_on_client_op.trailing_metadata(), + receive_status_on_client_op.error_string(), + )) + + if code == StatusCode.ok: + return receive_message_op.message() + else: + return None + + async def _handle_status_once_received(self): + """Handles the status sent by peer once received.""" + cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) + cdef tuple ops = (op,) + await execute_batch(self, ops, self._loop) + + # Halts if the RPC is locally cancelled + if self._is_locally_cancelled: + return + + self._set_status(AioRpcStatus( + op.code(), + op.details(), + op.trailing_metadata(), + op.error_string(), + )) + + async def receive_serialized_message(self): + """Receives one single raw message in bytes.""" + cdef bytes received_message + + # Receives a message. Returns None when failed: + # * EOF, no more messages to read; + # * The client application cancels; + # * The server sends final status. + received_message = await _receive_message( + self, + self._loop + ) + if received_message is not None: + return received_message + else: + return EOF + + async def send_serialized_message(self, bytes message): + """Sends one single raw message in bytes.""" + await _send_message(self, + message, + None, + False, + self._loop) + + async def send_receive_close(self): + """Half close the RPC on the client-side.""" + cdef SendCloseFromClientOperation op = SendCloseFromClientOperation(_EMPTY_FLAGS) + cdef tuple ops = (op,) + await execute_batch(self, ops, self._loop) + + async def initiate_unary_stream(self, + bytes request, + tuple outbound_initial_metadata): + """Implementation of the start of a unary-stream call.""" + # Peer may prematurely end this RPC at any point. We need a corutine + # that watches if the server sends the final status. + status_task = self._loop.create_task(self._handle_status_once_received()) + + cdef tuple outbound_ops + cdef Operation initial_metadata_op = SendInitialMetadataOperation( + outbound_initial_metadata, + self._send_initial_metadata_flags) + cdef Operation send_message_op = SendMessageOperation( + request, + _EMPTY_FLAGS) + cdef Operation send_close_op = SendCloseFromClientOperation( + _EMPTY_FLAGS) + + outbound_ops = ( + initial_metadata_op, + send_message_op, + send_close_op, + ) + + try: + # Sends out the request message. + await execute_batch(self, + outbound_ops, + self._loop) + + # Receives initial metadata. + self._set_initial_metadata( + await _receive_initial_metadata(self, + self._loop), + ) + except ExecuteBatchError as batch_error: + # Core should explain why this batch failed + await status_task + + async def stream_unary(self, + tuple outbound_initial_metadata, + object metadata_sent_observer): + """Actual implementation of the complete unary-stream call. + + Needs to pay extra attention to the raise mechanism. If we want to + propagate the final status exception, then we have to raise it. + Othersize, it would end normally and raise `StopAsyncIteration()`. + """ + try: + # Sends out initial_metadata ASAP. + await _send_initial_metadata(self, + outbound_initial_metadata, + self._send_initial_metadata_flags, + self._loop) + # Notify upper level that sending messages are allowed now. + metadata_sent_observer() + + # Receives initial metadata. + self._set_initial_metadata( + await _receive_initial_metadata(self, self._loop) + ) + except ExecuteBatchError: + # Core should explain why this batch failed + await self._handle_status_once_received() + + # Allow upper layer to proceed only if the status is set + metadata_sent_observer() + return None + + cdef tuple inbound_ops + cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS) + cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) + inbound_ops = (receive_message_op, receive_status_on_client_op) + + # Executes all operations in one batch. + await execute_batch(self, + inbound_ops, + self._loop) + + cdef grpc_status_code code + code = receive_status_on_client_op.code() + + self._set_status(AioRpcStatus( + code, + receive_status_on_client_op.details(), + receive_status_on_client_op.trailing_metadata(), + receive_status_on_client_op.error_string(), + )) + + if code == StatusCode.ok: + return receive_message_op.message() + else: + return None + + async def initiate_stream_stream(self, + tuple outbound_initial_metadata, + object metadata_sent_observer): + """Actual implementation of the complete stream-stream call. + + Needs to pay extra attention to the raise mechanism. If we want to + propagate the final status exception, then we have to raise it. + Othersize, it would end normally and raise `StopAsyncIteration()`. + """ + # Peer may prematurely end this RPC at any point. We need a corutine + # that watches if the server sends the final status. + status_task = self._loop.create_task(self._handle_status_once_received()) + + try: + # Sends out initial_metadata ASAP. + await _send_initial_metadata(self, + outbound_initial_metadata, + self._send_initial_metadata_flags, + self._loop) + # Notify upper level that sending messages are allowed now. + metadata_sent_observer() + + # Receives initial metadata. + self._set_initial_metadata( + await _receive_initial_metadata(self, self._loop) + ) + except ExecuteBatchError as batch_error: + # Core should explain why this batch failed + await status_task + + # Allow upper layer to proceed only if the status is set + metadata_sent_observer() diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi new file mode 100644 index 00000000000..e54e5107547 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi @@ -0,0 +1,57 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class CallbackFailureHandler: + cdef str _core_function_name + cdef object _error_details + cdef object _exception_type + + cdef handle(self, object future) + + +cdef struct CallbackContext: + # C struct to store callback context in the form of pointers. + # + # Attributes: + # functor: A grpc_completion_queue_functor represents the + # callback function in the only way Core understands. + # waiter: An asyncio.Future object that fulfills when the callback is + # invoked by Core. + # failure_handler: A CallbackFailureHandler object that called when Core + # returns 'success == 0' state. + # wrapper: A self-reference to the CallbackWrapper to help life cycle + # management. + grpc_completion_queue_functor functor + cpython.PyObject *waiter + cpython.PyObject *loop + cpython.PyObject *failure_handler + cpython.PyObject *callback_wrapper + + +cdef class CallbackWrapper: + cdef CallbackContext context + cdef object _reference_of_future + cdef object _reference_of_failure_handler + + @staticmethod + cdef void functor_run( + grpc_completion_queue_functor* functor, + int succeed) + + cdef grpc_completion_queue_functor *c_functor(self) + + +cdef class GrpcCallWrapper: + cdef grpc_call* call diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi new file mode 100644 index 00000000000..14a0098fc20 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -0,0 +1,185 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class CallbackFailureHandler: + + def __cinit__(self, + str core_function_name, + object error_details, + object exception_type): + """Handles failure by raising exception.""" + self._core_function_name = core_function_name + self._error_details = error_details + self._exception_type = exception_type + + cdef handle(self, object future): + future.set_exception(self._exception_type( + 'Failed "%s": %s' % (self._core_function_name, self._error_details) + )) + + +cdef class CallbackWrapper: + + def __cinit__(self, object future, object loop, CallbackFailureHandler failure_handler): + self.context.functor.functor_run = self.functor_run + self.context.waiter = <cpython.PyObject*>future + self.context.loop = <cpython.PyObject*>loop + self.context.failure_handler = <cpython.PyObject*>failure_handler + self.context.callback_wrapper = <cpython.PyObject*>self + # NOTE(lidiz) Not using a list here, because this class is critical in + # data path. We should make it as efficient as possible. + self._reference_of_future = future + self._reference_of_failure_handler = failure_handler + # NOTE(lidiz) We need to ensure when Core invokes our callback, the + # callback function itself is not deallocated. Othersise, we will get + # a segfault. We can view this as Core holding a ref. + cpython.Py_INCREF(self) + + @staticmethod + cdef void functor_run( + grpc_completion_queue_functor* functor, + int success): + cdef CallbackContext *context = <CallbackContext *>functor + cdef object waiter = <object>context.waiter + if not waiter.cancelled(): + if success == 0: + (<CallbackFailureHandler>context.failure_handler).handle(waiter) + else: + waiter.set_result(None) + cpython.Py_DECREF(<object>context.callback_wrapper) + + cdef grpc_completion_queue_functor *c_functor(self): + return &self.context.functor + + +cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler( + 'grpc_completion_queue_shutdown', + 'Unknown', + InternalError) + + +class ExecuteBatchError(InternalError): + """Raised when execute batch returns a failure from Core.""" + + +async def execute_batch(GrpcCallWrapper grpc_call_wrapper, + tuple operations, + object loop): + """The callback version of start batch operations.""" + cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None) + batch_operation_tag.prepare() + + cdef object future = loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper( + future, + loop, + CallbackFailureHandler('execute_batch', operations, ExecuteBatchError)) + cdef grpc_call_error error = grpc_call_start_batch( + grpc_call_wrapper.call, + batch_operation_tag.c_ops, + batch_operation_tag.c_nops, + wrapper.c_functor(), NULL) + + if error != GRPC_CALL_OK: + grpc_call_error_string = grpc_call_error_to_string(error).decode() + raise ExecuteBatchError("Failed grpc_call_start_batch: {} with grpc_call_error value: '{}'".format(error, grpc_call_error_string)) + + await future + + cdef grpc_event c_event + # Tag.event must be called, otherwise messages won't be parsed from C + batch_operation_tag.event(c_event) + + +cdef prepend_send_initial_metadata_op(tuple ops, tuple metadata): + # Eventually, this function should be the only function that produces + # SendInitialMetadataOperation. So we have more control over the flag. + return (SendInitialMetadataOperation( + metadata, + _EMPTY_FLAG + ),) + ops + + +async def _receive_message(GrpcCallWrapper grpc_call_wrapper, + object loop): + """Retrives parsed messages from Core. + + The messages maybe already in Core's buffer, so there isn't a 1-to-1 + mapping between this and the underlying "socket.read()". Also, eventually, + this function will end with an EOF, which reads empty message. + """ + cdef ReceiveMessageOperation receive_op = ReceiveMessageOperation(_EMPTY_FLAG) + cdef tuple ops = (receive_op,) + try: + await execute_batch(grpc_call_wrapper, ops, loop) + except ExecuteBatchError as e: + # NOTE(lidiz) The receive message operation has two ways to indicate + # finish state : 1) returns empty message due to EOF; 2) fails inside + # the callback (e.g. cancelled). + # + # Since they all indicates finish, they are better be merged. + _LOGGER.debug('Failed to receive any message from Core') + # NOTE(lidiz) The returned message might be an empty bytes (aka. b''). + # Please explicitly check if it is None or falsey string object! + return receive_op.message() + + +async def _send_message(GrpcCallWrapper grpc_call_wrapper, + bytes message, + Operation send_initial_metadata_op, + int write_flag, + object loop): + cdef SendMessageOperation op = SendMessageOperation(message, write_flag) + cdef tuple ops = (op,) + if send_initial_metadata_op is not None: + ops = (send_initial_metadata_op,) + ops + await execute_batch(grpc_call_wrapper, ops, loop) + + +async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper, + tuple metadata, + int flags, + object loop): + cdef SendInitialMetadataOperation op = SendInitialMetadataOperation( + metadata, + flags) + cdef tuple ops = (op,) + await execute_batch(grpc_call_wrapper, ops, loop) + + +async def _receive_initial_metadata(GrpcCallWrapper grpc_call_wrapper, + object loop): + cdef ReceiveInitialMetadataOperation op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS) + cdef tuple ops = (op,) + await execute_batch(grpc_call_wrapper, ops, loop) + return op.initial_metadata() + +async def _send_error_status_from_server(GrpcCallWrapper grpc_call_wrapper, + grpc_status_code code, + str details, + tuple trailing_metadata, + Operation send_initial_metadata_op, + object loop): + assert code != StatusCode.ok, 'Expecting non-ok status code.' + cdef SendStatusFromServerOperation op = SendStatusFromServerOperation( + trailing_metadata, + code, + details, + _EMPTY_FLAGS, + ) + cdef tuple ops = (op,) + if send_initial_metadata_op is not None: + ops = (send_initial_metadata_op,) + ops + await execute_batch(grpc_call_wrapper, ops, loop) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pxd.pxi new file mode 100644 index 00000000000..03b4990e488 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pxd.pxi @@ -0,0 +1,27 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef enum AioChannelStatus: + AIO_CHANNEL_STATUS_UNKNOWN + AIO_CHANNEL_STATUS_READY + AIO_CHANNEL_STATUS_CLOSING + AIO_CHANNEL_STATUS_DESTROYED + +cdef class AioChannel: + cdef: + grpc_channel * channel + object loop + bytes _target + AioChannelStatus _status + bint _is_secure diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pyx.pxi new file mode 100644 index 00000000000..4286ab1d271 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pyx.pxi @@ -0,0 +1,135 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +class _WatchConnectivityFailed(Exception): + """Dedicated exception class for watch connectivity failed. + + It might be failed due to deadline exceeded. + """ +cdef CallbackFailureHandler _WATCH_CONNECTIVITY_FAILURE_HANDLER = CallbackFailureHandler( + 'watch_connectivity_state', + 'Timed out', + _WatchConnectivityFailed) + + +cdef class AioChannel: + def __cinit__(self, bytes target, tuple options, ChannelCredentials credentials, object loop): + init_grpc_aio() + if options is None: + options = () + cdef _ChannelArgs channel_args = _ChannelArgs(options) + self._target = target + self.loop = loop + self._status = AIO_CHANNEL_STATUS_READY + + if credentials is None: + self._is_secure = False + creds = grpc_insecure_credentials_create(); + self.channel = grpc_channel_create(<char *>target, + creds, + channel_args.c_args()) + grpc_channel_credentials_release(creds) + else: + self._is_secure = True + creds = <grpc_channel_credentials *> credentials.c() + self.channel = grpc_channel_create(<char *>target, + creds, + channel_args.c_args()) + grpc_channel_credentials_release(creds) + + def __dealloc__(self): + shutdown_grpc_aio() + + def __repr__(self): + class_name = self.__class__.__name__ + id_ = id(self) + return f"<{class_name} {id_}>" + + def check_connectivity_state(self, bint try_to_connect): + """A Cython wrapper for Core's check connectivity state API.""" + if self._status == AIO_CHANNEL_STATUS_DESTROYED: + return ConnectivityState.shutdown + else: + return grpc_channel_check_connectivity_state( + self.channel, + try_to_connect, + ) + + async def watch_connectivity_state(self, + grpc_connectivity_state last_observed_state, + object deadline): + """Watch for one connectivity state change. + + Keeps mirroring the behavior from Core, so we can easily switch to + other design of API if necessary. + """ + if self._status in (AIO_CHANNEL_STATUS_DESTROYED, AIO_CHANNEL_STATUS_CLOSING): + raise UsageError('Channel is closed.') + + cdef gpr_timespec c_deadline = _timespec_from_time(deadline) + + cdef object future = self.loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper( + future, + self.loop, + _WATCH_CONNECTIVITY_FAILURE_HANDLER) + grpc_channel_watch_connectivity_state( + self.channel, + last_observed_state, + c_deadline, + global_completion_queue(), + wrapper.c_functor()) + + try: + await future + except _WatchConnectivityFailed: + return False + else: + return True + + def closing(self): + self._status = AIO_CHANNEL_STATUS_CLOSING + + def close(self): + self._status = AIO_CHANNEL_STATUS_DESTROYED + grpc_channel_destroy(self.channel) + + def closed(self): + return self._status in (AIO_CHANNEL_STATUS_CLOSING, AIO_CHANNEL_STATUS_DESTROYED) + + def call(self, + bytes method, + object deadline, + object python_call_credentials, + object wait_for_ready): + """Assembles a Cython Call object. + + Returns: + An _AioCall object. + """ + if self.closed(): + raise UsageError('Channel is closed.') + + cdef CallCredentials cython_call_credentials + if python_call_credentials is not None: + if not self._is_secure: + raise UsageError("Call credentials are only valid on secure channels") + + cython_call_credentials = python_call_credentials._credentials + else: + cython_call_credentials = None + + return _AioCall(self, deadline, method, cython_call_credentials, wait_for_ready) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi new file mode 100644 index 00000000000..f698390cd5c --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi @@ -0,0 +1,202 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION + +TYPE_METADATA_STRING = "Tuple[Tuple[str, Union[str, bytes]]...]" + + +cdef grpc_status_code get_status_code(object code) except *: + if isinstance(code, int): + if code >= StatusCode.ok and code <= StatusCode.data_loss: + return code + else: + return StatusCode.unknown + else: + try: + return code.value[0] + except (KeyError, AttributeError): + return StatusCode.unknown + + +cdef object deserialize(object deserializer, bytes raw_message): + """Perform deserialization on raw bytes. + + Failure to deserialize is a fatal error. + """ + if deserializer: + return deserializer(raw_message) + else: + return raw_message + + +cdef bytes serialize(object serializer, object message): + """Perform serialization on a message. + + Failure to serialize is a fatal error. + """ + if isinstance(message, str): + message = message.encode('utf-8') + if serializer: + return serializer(message) + else: + return message + + +class _EOF: + + def __bool__(self): + return False + + def __len__(self): + return 0 + + def _repr(self) -> str: + return '<grpc.aio.EOF>' + + def __repr__(self) -> str: + return self._repr() + + def __str__(self) -> str: + return self._repr() + + +EOF = _EOF() + +_COMPRESSION_METADATA_STRING_MAPPING = { + CompressionAlgorithm.none: 'identity', + CompressionAlgorithm.deflate: 'deflate', + CompressionAlgorithm.gzip: 'gzip', +} + +class BaseError(Exception): + """The base class for exceptions generated by gRPC AsyncIO stack.""" + + +class UsageError(BaseError): + """Raised when the usage of API by applications is inappropriate. + + For example, trying to invoke RPC on a closed channel, mixing two styles + of streaming API on the client side. This exception should not be + suppressed. + """ + + +class AbortError(BaseError): + """Raised when calling abort in servicer methods. + + This exception should not be suppressed. Applications may catch it to + perform certain clean-up logic, and then re-raise it. + """ + + +class InternalError(BaseError): + """Raised upon unexpected errors in native code.""" + + +def schedule_coro_threadsafe(object coro, object loop): + try: + return loop.create_task(coro) + except RuntimeError as runtime_error: + if 'Non-thread-safe operation' in str(runtime_error): + return asyncio.run_coroutine_threadsafe( + coro, + loop, + ) + else: + raise + + +def async_generator_to_generator(object agen, object loop): + """Converts an async generator into generator.""" + try: + while True: + future = asyncio.run_coroutine_threadsafe( + agen.__anext__(), + loop + ) + response = future.result() + if response is EOF: + break + else: + yield response + except StopAsyncIteration: + # If StopAsyncIteration is raised, end this generator. + pass + + +async def generator_to_async_generator(object gen, object loop, object thread_pool): + """Converts a generator into async generator. + + The generator might block, so we need to delegate the iteration to thread + pool. Also, we can't simply delegate __next__ to the thread pool, otherwise + we will see following error: + + TypeError: StopIteration interacts badly with generators and cannot be + raised into a Future + """ + queue = asyncio.Queue(maxsize=1) + + def yield_to_queue(): + try: + for item in gen: + asyncio.run_coroutine_threadsafe(queue.put(item), loop).result() + finally: + asyncio.run_coroutine_threadsafe(queue.put(EOF), loop).result() + + future = loop.run_in_executor( + thread_pool, + yield_to_queue, + ) + + while True: + response = await queue.get() + if response is EOF: + break + else: + yield response + + # Port the exception if there is any + await future + + +if PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7: + def get_working_loop(): + """Returns a running event loop. + + Due to a defect of asyncio.get_event_loop, its returned event loop might + not be set as the default event loop for the main thread. + """ + try: + return asyncio.get_running_loop() + except RuntimeError: + return asyncio.get_event_loop_policy().get_event_loop() +else: + def get_working_loop(): + """Returns a running event loop.""" + return asyncio.get_event_loop() + + +def raise_if_not_valid_trailing_metadata(object metadata): + if not hasattr(metadata, '__iter__') or isinstance(metadata, dict): + raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') + for item in metadata: + if not isinstance(item, tuple): + raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') + if len(item) != 2: + raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') + if not isinstance(item[0], str): + raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') + if not isinstance(item[1], str) and not isinstance(item[1], bytes): + raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi new file mode 100644 index 00000000000..578131f7eef --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -0,0 +1,52 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +ctypedef queue[grpc_event] cpp_event_queue + + +IF UNAME_SYSNAME == "Windows": + cdef extern from "winsock2.h" nogil: + ctypedef uint32_t WIN_SOCKET "SOCKET" + WIN_SOCKET win_socket "socket" (int af, int type, int protocol) + int win_socket_send "send" (WIN_SOCKET s, const char *buf, int len, int flags) + + +cdef void _unified_socket_write(int fd) nogil + + +cdef class BaseCompletionQueue: + cdef grpc_completion_queue *_cq + + cdef grpc_completion_queue* c_ptr(self) + + +cdef class _BoundEventLoop: + cdef readonly object loop + cdef readonly object read_socket # socket.socket + cdef bint _has_reader + + +cdef class PollerCompletionQueue(BaseCompletionQueue): + cdef bint _shutdown + cdef cpp_event_queue _queue + cdef mutex _queue_mutex + cdef object _poller_thread # threading.Thread + cdef int _write_fd + cdef object _read_socket # socket.socket + cdef object _write_socket # socket.socket + cdef dict _loops # Mapping[asyncio.AbstractLoop, _BoundEventLoop] + + cdef void _poll(self) nogil + cdef shutdown(self) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi new file mode 100644 index 00000000000..b9132c85602 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -0,0 +1,174 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import socket + +cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME) +cdef float _POLL_AWAKE_INTERVAL_S = 0.2 + +# This bool indicates if the event loop impl can monitor a given fd, or has +# loop.add_reader method. +cdef bint _has_fd_monitoring = True + +IF UNAME_SYSNAME == "Windows": + cdef void _unified_socket_write(int fd) nogil: + win_socket_send(<WIN_SOCKET>fd, b"1", 1, 0) +ELSE: + cimport posix.unistd as unistd + + cdef void _unified_socket_write(int fd) nogil: + unistd.write(fd, b"1", 1) + + +def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success): + CallbackWrapper.functor_run(callback_wrapper.c_functor(), success) + + +cdef class BaseCompletionQueue: + + cdef grpc_completion_queue* c_ptr(self): + return self._cq + + +cdef class _BoundEventLoop: + + def __cinit__(self, object loop, object read_socket, object handler): + global _has_fd_monitoring + self.loop = loop + self.read_socket = read_socket + reader_function = functools.partial( + handler, + loop + ) + # NOTE(lidiz) There isn't a way to cleanly pre-check if fd monitoring + # support is available or not. Checking the event loop policy is not + # good enough. The application can has its own loop implementation, or + # uses different types of event loops (e.g., 1 Proactor, 3 Selectors). + if _has_fd_monitoring: + try: + self.loop.add_reader(self.read_socket, reader_function) + self._has_reader = True + except NotImplementedError: + _has_fd_monitoring = False + self._has_reader = False + + def close(self): + if self.loop: + if self._has_reader: + self.loop.remove_reader(self.read_socket) + + +cdef class PollerCompletionQueue(BaseCompletionQueue): + + def __cinit__(self): + self._cq = grpc_completion_queue_create_for_next(NULL) + self._shutdown = False + self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True) + self._poller_thread.start() + + self._read_socket, self._write_socket = socket.socketpair() + self._write_fd = self._write_socket.fileno() + self._loops = {} + + # The read socket might be read by multiple threads. But only one of them will + # read the 1 byte sent by the poller thread. This setting is essential to allow + # multiple loops in multiple threads bound to the same poller. + self._read_socket.setblocking(False) + + self._queue = cpp_event_queue() + + def bind_loop(self, object loop): + if loop in self._loops: + return + else: + self._loops[loop] = _BoundEventLoop(loop, self._read_socket, self._handle_events) + + cdef void _poll(self) nogil: + cdef grpc_event event + cdef CallbackContext *context + + while not self._shutdown: + event = grpc_completion_queue_next(self._cq, + _GPR_INF_FUTURE, + NULL) + + if event.type == GRPC_QUEUE_TIMEOUT: + with gil: + raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!") + elif event.type == GRPC_QUEUE_SHUTDOWN: + self._shutdown = True + else: + self._queue_mutex.lock() + self._queue.push(event) + self._queue_mutex.unlock() + if _has_fd_monitoring: + _unified_socket_write(self._write_fd) + else: + with gil: + # Event loops can be paused or killed at any time. So, + # instead of deligate to any thread, the polling thread + # should handle the distribution of the event. + self._handle_events(None) + + def _poll_wrapper(self): + with nogil: + self._poll() + + cdef shutdown(self): + # Removes the socket hook from loops + for loop in self._loops: + self._loops.get(loop).close() + + # TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown + grpc_completion_queue_shutdown(self._cq) + while not self._shutdown: + self._poller_thread.join(timeout=_POLL_AWAKE_INTERVAL_S) + grpc_completion_queue_destroy(self._cq) + + # Clean up socket resources + self._read_socket.close() + self._write_socket.close() + + def _handle_events(self, object context_loop): + cdef bytes data + if _has_fd_monitoring: + # If fd monitoring is working, clean the socket without blocking. + data = self._read_socket.recv(1) + cdef grpc_event event + cdef CallbackContext *context + + while True: + self._queue_mutex.lock() + if self._queue.empty(): + self._queue_mutex.unlock() + break + else: + event = self._queue.front() + self._queue.pop() + self._queue_mutex.unlock() + + context = <CallbackContext *>event.tag + loop = <object>context.loop + if loop is context_loop: + # Executes callbacks: complete the future + CallbackWrapper.functor_run( + <grpc_completion_queue_functor *>event.tag, + event.success + ) + else: + loop.call_soon_threadsafe( + _handle_callback_wrapper, + <CallbackWrapper>context.callback_wrapper, + event.success + ) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi new file mode 100644 index 00000000000..ebf0660174d --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi @@ -0,0 +1,43 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +cdef class _AioState: + cdef object lock # threading.RLock + cdef int refcount + cdef object engine # AsyncIOEngine + cdef BaseCompletionQueue cq + + +cdef grpc_completion_queue *global_completion_queue() + + +cpdef init_grpc_aio() + + +cpdef shutdown_grpc_aio() + + +cdef extern from "src/core/lib/iomgr/timer_manager.h": + void grpc_timer_manager_set_threading(bint enabled) + + +cdef extern from "src/core/lib/iomgr/iomgr_internal.h": + void grpc_set_default_iomgr_platform() + + +cdef extern from "src/core/lib/iomgr/executor.h" namespace "grpc_core": + cdef cppclass Executor: + @staticmethod + void SetThreadingAll(bint enable) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi new file mode 100644 index 00000000000..7f9f52da7c0 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -0,0 +1,114 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum + +cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'poller').upper() +cdef _AioState _global_aio_state = _AioState() + + +class AsyncIOEngine(enum.Enum): + # NOTE(lidiz) the support for custom_io_manager is removed in favor of the + # EventEngine project, which will be the only IO platform in Core. + CUSTOM_IO_MANAGER = 'custom_io_manager' + POLLER = 'poller' + + +cdef _default_asyncio_engine(): + return AsyncIOEngine.POLLER + + +cdef grpc_completion_queue *global_completion_queue(): + return _global_aio_state.cq.c_ptr() + + +cdef class _AioState: + + def __cinit__(self): + self.lock = threading.RLock() + self.refcount = 0 + self.engine = None + self.cq = None + + +cdef _initialize_poller(): + # Initializes gRPC Core, must be called before other Core API + grpc_init() + + # Creates the only completion queue + _global_aio_state.cq = PollerCompletionQueue() + + +cdef _actual_aio_initialization(): + # Picks the engine for gRPC AsyncIO Stack + _global_aio_state.engine = AsyncIOEngine.__members__.get( + _GRPC_ASYNCIO_ENGINE, + _default_asyncio_engine(), + ) + _LOGGER.debug('Using %s as I/O engine', _global_aio_state.engine) + + # Initializes the process-level state accordingly + if _global_aio_state.engine is AsyncIOEngine.POLLER: + _initialize_poller() + else: + raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine) + + +def _grpc_shutdown_wrapper(_): + """A thin Python wrapper of Core's shutdown function. + + Define functions are not allowed in "cdef" functions, and Cython complains + about a simple lambda with a C function. + """ + grpc_shutdown() + + +cdef _actual_aio_shutdown(): + if _global_aio_state.engine is AsyncIOEngine.POLLER: + (<PollerCompletionQueue>_global_aio_state.cq).shutdown() + grpc_shutdown() + else: + raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine) + + +cdef _initialize_per_loop(): + cdef object loop = get_working_loop() + if _global_aio_state.engine is AsyncIOEngine.POLLER: + _global_aio_state.cq.bind_loop(loop) + + +cpdef init_grpc_aio(): + """Initializes the gRPC AsyncIO module. + + Expected to be invoked on critical class constructors. + E.g., AioChannel, AioServer. + """ + with _global_aio_state.lock: + _global_aio_state.refcount += 1 + if _global_aio_state.refcount == 1: + _actual_aio_initialization() + _initialize_per_loop() + + +cpdef shutdown_grpc_aio(): + """Shuts down the gRPC AsyncIO module. + + Expected to be invoked on critical class destructors. + E.g., AioChannel, AioServer. + """ + with _global_aio_state.lock: + assert _global_aio_state.refcount > 0 + _global_aio_state.refcount -= 1 + if not _global_aio_state.refcount: + _actual_aio_shutdown() diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi new file mode 100644 index 00000000000..3780d8ddf2f --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi @@ -0,0 +1,29 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Exceptions for the aio version of the RPC calls.""" + + +cdef class AioRpcStatus(Exception): + cdef readonly: + grpc_status_code _code + str _details + # Per the spec, only client-side status has trailing metadata. + tuple _trailing_metadata + str _debug_error_string + + cpdef grpc_status_code code(self) + cpdef str details(self) + cpdef tuple trailing_metadata(self) + cpdef str debug_error_string(self) + cdef grpc_status_code c_code(self) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pyx.pxi new file mode 100644 index 00000000000..07669fc1575 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pyx.pxi @@ -0,0 +1,44 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Exceptions for the aio version of the RPC calls.""" + + +cdef class AioRpcStatus(Exception): + + # The final status of gRPC is represented by three trailing metadata: + # `grpc-status`, `grpc-status-message`, abd `grpc-status-details`. + def __cinit__(self, + grpc_status_code code, + str details, + tuple trailing_metadata, + str debug_error_string): + self._code = code + self._details = details + self._trailing_metadata = trailing_metadata + self._debug_error_string = debug_error_string + + cpdef grpc_status_code code(self): + return self._code + + cpdef str details(self): + return self._details + + cpdef tuple trailing_metadata(self): + return self._trailing_metadata + + cpdef str debug_error_string(self): + return self._debug_error_string + + cdef grpc_status_code c_code(self): + return <grpc_status_code>self._code diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi new file mode 100644 index 00000000000..fe10c3883c3 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -0,0 +1,92 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class _HandlerCallDetails: + cdef readonly str method + cdef readonly tuple invocation_metadata + + +cdef class RPCState(GrpcCallWrapper): + cdef grpc_call_details details + cdef grpc_metadata_array request_metadata + cdef AioServer server + # NOTE(lidiz) Under certain corner case, receiving the client close + # operation won't immediately fail ongoing RECV_MESSAGE operations. Here I + # added a flag to workaround this unexpected behavior. + cdef bint client_closed + cdef object abort_exception + cdef bint metadata_sent + cdef bint status_sent + cdef grpc_status_code status_code + cdef str status_details + cdef tuple trailing_metadata + cdef object compression_algorithm + cdef bint disable_next_compression + cdef object callbacks + + cdef bytes method(self) + cdef tuple invocation_metadata(self) + cdef void raise_for_termination(self) except * + cdef int get_write_flag(self) + cdef Operation create_send_initial_metadata_op_if_not_sent(self) + + +cdef class _ServicerContext: + cdef RPCState _rpc_state + cdef object _loop # asyncio.AbstractEventLoop + cdef object _request_deserializer # Callable[[bytes], Any] + cdef object _response_serializer # Callable[[Any], bytes] + + +cdef class _SyncServicerContext: + cdef _ServicerContext _context + cdef list _callbacks + cdef object _loop # asyncio.AbstractEventLoop + + +cdef class _MessageReceiver: + cdef _ServicerContext _servicer_context + cdef object _agen + + +cdef enum AioServerStatus: + AIO_SERVER_STATUS_UNKNOWN + AIO_SERVER_STATUS_READY + AIO_SERVER_STATUS_RUNNING + AIO_SERVER_STATUS_STOPPED + AIO_SERVER_STATUS_STOPPING + + +cdef class _ConcurrentRpcLimiter: + cdef int _maximum_concurrent_rpcs + cdef int _active_rpcs + cdef object _active_rpcs_condition # asyncio.Condition + cdef object _loop # asyncio.EventLoop + + +cdef class AioServer: + cdef Server _server + cdef list _generic_handlers + cdef AioServerStatus _status + cdef object _loop # asyncio.EventLoop + cdef object _serving_task # asyncio.Task + cdef object _shutdown_lock # asyncio.Lock + cdef object _shutdown_completed # asyncio.Future + cdef CallbackWrapper _shutdown_callback_wrapper + cdef object _crash_exception # Exception + cdef tuple _interceptors + cdef object _thread_pool # concurrent.futures.ThreadPoolExecutor + cdef _ConcurrentRpcLimiter _limiter + + cdef thread_pool(self) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi new file mode 100644 index 00000000000..e85efdd0b9f --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -0,0 +1,1097 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import inspect +import traceback +import functools + + +cdef int _EMPTY_FLAG = 0 +cdef str _RPC_FINISHED_DETAILS = 'RPC already finished.' +cdef str _SERVER_STOPPED_DETAILS = 'Server already stopped.' + +cdef _augment_metadata(tuple metadata, object compression): + if compression is None: + return metadata + else: + return (( + GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, + _COMPRESSION_METADATA_STRING_MAPPING[compression] + ),) + metadata + + +cdef class _HandlerCallDetails: + def __cinit__(self, str method, tuple invocation_metadata): + self.method = method + self.invocation_metadata = invocation_metadata + + +class _ServerStoppedError(BaseError): + """Raised if the server is stopped.""" + + +cdef class RPCState: + + def __cinit__(self, AioServer server): + init_grpc_aio() + self.call = NULL + self.server = server + grpc_metadata_array_init(&self.request_metadata) + grpc_call_details_init(&self.details) + self.client_closed = False + self.abort_exception = None + self.metadata_sent = False + self.status_sent = False + self.status_code = StatusCode.ok + self.status_details = '' + self.trailing_metadata = _IMMUTABLE_EMPTY_METADATA + self.compression_algorithm = None + self.disable_next_compression = False + self.callbacks = [] + + cdef bytes method(self): + return _slice_bytes(self.details.method) + + cdef tuple invocation_metadata(self): + return _metadata(&self.request_metadata) + + cdef void raise_for_termination(self) except *: + """Raise exceptions if RPC is not running. + + Server method handlers may suppress the abort exception. We need to halt + the RPC execution in that case. This function needs to be called after + running application code. + + Also, the server may stop unexpected. We need to check before calling + into Core functions, otherwise, segfault. + """ + if self.abort_exception is not None: + raise self.abort_exception + if self.status_sent: + raise UsageError(_RPC_FINISHED_DETAILS) + if self.server._status == AIO_SERVER_STATUS_STOPPED: + raise _ServerStoppedError(_SERVER_STOPPED_DETAILS) + + cdef int get_write_flag(self): + if self.disable_next_compression: + self.disable_next_compression = False + return WriteFlag.no_compress + else: + return _EMPTY_FLAG + + cdef Operation create_send_initial_metadata_op_if_not_sent(self): + cdef SendInitialMetadataOperation op + if self.metadata_sent: + return None + else: + op = SendInitialMetadataOperation( + _augment_metadata(_IMMUTABLE_EMPTY_METADATA, self.compression_algorithm), + _EMPTY_FLAG + ) + return op + + def __dealloc__(self): + """Cleans the Core objects.""" + grpc_call_details_destroy(&self.details) + grpc_metadata_array_destroy(&self.request_metadata) + if self.call: + grpc_call_unref(self.call) + shutdown_grpc_aio() + + +cdef class _ServicerContext: + + def __cinit__(self, + RPCState rpc_state, + object request_deserializer, + object response_serializer, + object loop): + self._rpc_state = rpc_state + self._request_deserializer = request_deserializer + self._response_serializer = response_serializer + self._loop = loop + + async def read(self): + cdef bytes raw_message + self._rpc_state.raise_for_termination() + + raw_message = await _receive_message(self._rpc_state, self._loop) + self._rpc_state.raise_for_termination() + + if raw_message is None: + return EOF + else: + return deserialize(self._request_deserializer, + raw_message) + + async def write(self, object message): + self._rpc_state.raise_for_termination() + + await _send_message(self._rpc_state, + serialize(self._response_serializer, message), + self._rpc_state.create_send_initial_metadata_op_if_not_sent(), + self._rpc_state.get_write_flag(), + self._loop) + self._rpc_state.metadata_sent = True + + async def send_initial_metadata(self, object metadata): + self._rpc_state.raise_for_termination() + + if self._rpc_state.metadata_sent: + raise UsageError('Send initial metadata failed: already sent') + else: + await _send_initial_metadata( + self._rpc_state, + _augment_metadata(tuple(metadata), self._rpc_state.compression_algorithm), + _EMPTY_FLAG, + self._loop + ) + self._rpc_state.metadata_sent = True + + async def abort(self, + object code, + str details='', + tuple trailing_metadata=_IMMUTABLE_EMPTY_METADATA): + if self._rpc_state.abort_exception is not None: + raise UsageError('Abort already called!') + else: + # Keeps track of the exception object. After abort happen, the RPC + # should stop execution. However, if users decided to suppress it, it + # could lead to undefined behavior. + self._rpc_state.abort_exception = AbortError('Locally aborted.') + + if trailing_metadata == _IMMUTABLE_EMPTY_METADATA and self._rpc_state.trailing_metadata: + trailing_metadata = self._rpc_state.trailing_metadata + else: + raise_if_not_valid_trailing_metadata(trailing_metadata) + self._rpc_state.trailing_metadata = trailing_metadata + + if details == '' and self._rpc_state.status_details: + details = self._rpc_state.status_details + else: + self._rpc_state.status_details = details + + actual_code = get_status_code(code) + self._rpc_state.status_code = actual_code + + self._rpc_state.status_sent = True + await _send_error_status_from_server( + self._rpc_state, + actual_code, + details, + trailing_metadata, + self._rpc_state.create_send_initial_metadata_op_if_not_sent(), + self._loop + ) + + raise self._rpc_state.abort_exception + + async def abort_with_status(self, object status): + await self.abort(status.code, status.details, status.trailing_metadata) + + def set_trailing_metadata(self, object metadata): + raise_if_not_valid_trailing_metadata(metadata) + self._rpc_state.trailing_metadata = tuple(metadata) + + def trailing_metadata(self): + return self._rpc_state.trailing_metadata + + def invocation_metadata(self): + return self._rpc_state.invocation_metadata() + + def set_code(self, object code): + self._rpc_state.status_code = get_status_code(code) + + def code(self): + return self._rpc_state.status_code + + def set_details(self, str details): + self._rpc_state.status_details = details + + def details(self): + return self._rpc_state.status_details + + def set_compression(self, object compression): + if self._rpc_state.metadata_sent: + raise RuntimeError('Compression setting must be specified before sending initial metadata') + else: + self._rpc_state.compression_algorithm = compression + + def disable_next_message_compression(self): + self._rpc_state.disable_next_compression = True + + def peer(self): + cdef char *c_peer = NULL + c_peer = grpc_call_get_peer(self._rpc_state.call) + peer = (<bytes>c_peer).decode('utf8') + gpr_free(c_peer) + return peer + + def peer_identities(self): + cdef Call query_call = Call() + query_call.c_call = self._rpc_state.call + identities = peer_identities(query_call) + query_call.c_call = NULL + return identities + + def peer_identity_key(self): + cdef Call query_call = Call() + query_call.c_call = self._rpc_state.call + identity_key = peer_identity_key(query_call) + query_call.c_call = NULL + if identity_key: + return identity_key.decode('utf8') + else: + return None + + def auth_context(self): + cdef Call query_call = Call() + query_call.c_call = self._rpc_state.call + bytes_ctx = auth_context(query_call) + query_call.c_call = NULL + if bytes_ctx: + ctx = {} + for key in bytes_ctx: + ctx[key.decode('utf8')] = bytes_ctx[key] + return ctx + else: + return {} + + def time_remaining(self): + if self._rpc_state.details.deadline.seconds == _GPR_INF_FUTURE.seconds: + return None + else: + return max(_time_from_timespec(self._rpc_state.details.deadline) - time.time(), 0) + + def add_done_callback(self, callback): + cb = functools.partial(callback, self) + self._rpc_state.callbacks.append(cb) + + def done(self): + return self._rpc_state.status_sent + + def cancelled(self): + return self._rpc_state.status_code == StatusCode.cancelled + + +cdef class _SyncServicerContext: + """Sync servicer context for sync handler compatibility.""" + + def __cinit__(self, + _ServicerContext context): + self._context = context + self._callbacks = [] + self._loop = context._loop + + def abort(self, + object code, + str details='', + tuple trailing_metadata=_IMMUTABLE_EMPTY_METADATA): + future = asyncio.run_coroutine_threadsafe( + self._context.abort(code, details, trailing_metadata), + self._loop) + # Abort should raise an AbortError + future.exception() + + def send_initial_metadata(self, object metadata): + future = asyncio.run_coroutine_threadsafe( + self._context.send_initial_metadata(metadata), + self._loop) + future.result() + + def set_trailing_metadata(self, object metadata): + self._context.set_trailing_metadata(metadata) + + def invocation_metadata(self): + return self._context.invocation_metadata() + + def set_code(self, object code): + self._context.set_code(code) + + def set_details(self, str details): + self._context.set_details(details) + + def set_compression(self, object compression): + self._context.set_compression(compression) + + def disable_next_message_compression(self): + self._context.disable_next_message_compression() + + def add_callback(self, object callback): + self._callbacks.append(callback) + + def peer(self): + return self._context.peer() + + def peer_identities(self): + return self._context.peer_identities() + + def peer_identity_key(self): + return self._context.peer_identity_key() + + def auth_context(self): + return self._context.auth_context() + + def time_remaining(self): + return self._context.time_remaining() + + +async def _run_interceptor(object interceptors, object query_handler, + object handler_call_details): + interceptor = next(interceptors, None) + if interceptor: + continuation = functools.partial(_run_interceptor, interceptors, + query_handler) + return await interceptor.intercept_service(continuation, handler_call_details) + else: + return query_handler(handler_call_details) + + +def _is_async_handler(object handler): + """Inspect if a method handler is async or sync.""" + return inspect.isawaitable(handler) or inspect.iscoroutinefunction(handler) or inspect.isasyncgenfunction(handler) + + +async def _find_method_handler(str method, tuple metadata, list generic_handlers, + tuple interceptors): + def query_handlers(handler_call_details): + for generic_handler in generic_handlers: + method_handler = generic_handler.service(handler_call_details) + if method_handler is not None: + return method_handler + return None + + cdef _HandlerCallDetails handler_call_details = _HandlerCallDetails(method, + metadata) + # interceptor + if interceptors: + return await _run_interceptor(iter(interceptors), query_handlers, + handler_call_details) + else: + return query_handlers(handler_call_details) + + +async def _finish_handler_with_unary_response(RPCState rpc_state, + object unary_handler, + object request, + _ServicerContext servicer_context, + object response_serializer, + object loop): + """Finishes server method handler with a single response. + + This function executes the application handler, and handles response + sending, as well as errors. It is shared between unary-unary and + stream-unary handlers. + """ + # Executes application logic + cdef object response_message + cdef _SyncServicerContext sync_servicer_context + + if _is_async_handler(unary_handler): + # Run async method handlers in this coroutine + response_message = await unary_handler( + request, + servicer_context, + ) + else: + # Run sync method handlers in the thread pool + sync_servicer_context = _SyncServicerContext(servicer_context) + response_message = await loop.run_in_executor( + rpc_state.server.thread_pool(), + unary_handler, + request, + sync_servicer_context, + ) + # Support sync-stack callback + for callback in sync_servicer_context._callbacks: + callback() + + # Raises exception if aborted + rpc_state.raise_for_termination() + + # Serializes the response message + cdef bytes response_raw + if rpc_state.status_code == StatusCode.ok: + response_raw = serialize( + response_serializer, + response_message, + ) + else: + # Discards the response message if the status code is non-OK. + response_raw = b'' + + # Assembles the batch operations + cdef tuple finish_ops + finish_ops = ( + SendMessageOperation(response_raw, rpc_state.get_write_flag()), + SendStatusFromServerOperation( + rpc_state.trailing_metadata, + rpc_state.status_code, + rpc_state.status_details, + _EMPTY_FLAGS, + ), + ) + if not rpc_state.metadata_sent: + finish_ops = prepend_send_initial_metadata_op( + finish_ops, + None) + rpc_state.metadata_sent = True + rpc_state.status_sent = True + await execute_batch(rpc_state, finish_ops, loop) + + +async def _finish_handler_with_stream_responses(RPCState rpc_state, + object stream_handler, + object request, + _ServicerContext servicer_context, + object loop): + """Finishes server method handler with multiple responses. + + This function executes the application handler, and handles response + sending, as well as errors. It is shared between unary-stream and + stream-stream handlers. + """ + cdef object async_response_generator + cdef object response_message + + if inspect.iscoroutinefunction(stream_handler): + # Case 1: Coroutine async handler - using reader-writer API + # The handler uses reader / writer API, returns None. + await stream_handler( + request, + servicer_context, + ) + else: + if inspect.isasyncgenfunction(stream_handler): + # Case 2: Async handler - async generator + # The handler uses async generator API + async_response_generator = stream_handler( + request, + servicer_context, + ) + else: + # Case 3: Sync handler - normal generator + # NOTE(lidiz) Streaming handler in sync stack is either a generator + # function or a function returns a generator. + sync_servicer_context = _SyncServicerContext(servicer_context) + gen = stream_handler(request, sync_servicer_context) + async_response_generator = generator_to_async_generator(gen, + loop, + rpc_state.server.thread_pool()) + + # Consumes messages from the generator + async for response_message in async_response_generator: + # Raises exception if aborted + rpc_state.raise_for_termination() + + await servicer_context.write(response_message) + + # Raises exception if aborted + rpc_state.raise_for_termination() + + # Sends the final status of this RPC + cdef SendStatusFromServerOperation op = SendStatusFromServerOperation( + rpc_state.trailing_metadata, + rpc_state.status_code, + rpc_state.status_details, + _EMPTY_FLAGS, + ) + + cdef tuple finish_ops = (op,) + if not rpc_state.metadata_sent: + finish_ops = prepend_send_initial_metadata_op( + finish_ops, + None + ) + rpc_state.metadata_sent = True + rpc_state.status_sent = True + await execute_batch(rpc_state, finish_ops, loop) + + +async def _handle_unary_unary_rpc(object method_handler, + RPCState rpc_state, + object loop): + # Receives request message + cdef bytes request_raw = await _receive_message(rpc_state, loop) + if request_raw is None: + # The RPC was cancelled immediately after start on client side. + return + + # Deserializes the request message + cdef object request_message = deserialize( + method_handler.request_deserializer, + request_raw, + ) + + # Creates a dedecated ServicerContext + cdef _ServicerContext servicer_context = _ServicerContext( + rpc_state, + None, + None, + loop, + ) + + # Finishes the application handler + await _finish_handler_with_unary_response( + rpc_state, + method_handler.unary_unary, + request_message, + servicer_context, + method_handler.response_serializer, + loop + ) + + +async def _handle_unary_stream_rpc(object method_handler, + RPCState rpc_state, + object loop): + # Receives request message + cdef bytes request_raw = await _receive_message(rpc_state, loop) + if request_raw is None: + return + + # Deserializes the request message + cdef object request_message = deserialize( + method_handler.request_deserializer, + request_raw, + ) + + # Creates a dedecated ServicerContext + cdef _ServicerContext servicer_context = _ServicerContext( + rpc_state, + method_handler.request_deserializer, + method_handler.response_serializer, + loop, + ) + + # Finishes the application handler + await _finish_handler_with_stream_responses( + rpc_state, + method_handler.unary_stream, + request_message, + servicer_context, + loop, + ) + + +cdef class _MessageReceiver: + """Bridge between the async generator API and the reader-writer API.""" + + def __cinit__(self, _ServicerContext servicer_context): + self._servicer_context = servicer_context + self._agen = None + + async def _async_message_receiver(self): + """An async generator that receives messages.""" + cdef object message + while True: + message = await self._servicer_context.read() + if message is not EOF: + yield message + else: + break + + def __aiter__(self): + # Prevents never awaited warning if application never used the async generator + if self._agen is None: + self._agen = self._async_message_receiver() + return self._agen + + async def __anext__(self): + return await self.__aiter__().__anext__() + + +async def _handle_stream_unary_rpc(object method_handler, + RPCState rpc_state, + object loop): + # Creates a dedecated ServicerContext + cdef _ServicerContext servicer_context = _ServicerContext( + rpc_state, + method_handler.request_deserializer, + None, + loop, + ) + + # Prepares the request generator + cdef object request_iterator + if _is_async_handler(method_handler.stream_unary): + request_iterator = _MessageReceiver(servicer_context) + else: + request_iterator = async_generator_to_generator( + _MessageReceiver(servicer_context), + loop + ) + + # Finishes the application handler + await _finish_handler_with_unary_response( + rpc_state, + method_handler.stream_unary, + request_iterator, + servicer_context, + method_handler.response_serializer, + loop + ) + + +async def _handle_stream_stream_rpc(object method_handler, + RPCState rpc_state, + object loop): + # Creates a dedecated ServicerContext + cdef _ServicerContext servicer_context = _ServicerContext( + rpc_state, + method_handler.request_deserializer, + method_handler.response_serializer, + loop, + ) + + # Prepares the request generator + cdef object request_iterator + if _is_async_handler(method_handler.stream_stream): + request_iterator = _MessageReceiver(servicer_context) + else: + request_iterator = async_generator_to_generator( + _MessageReceiver(servicer_context), + loop + ) + + # Finishes the application handler + await _finish_handler_with_stream_responses( + rpc_state, + method_handler.stream_stream, + request_iterator, + servicer_context, + loop, + ) + + +async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop): + try: + try: + await rpc_coro + except AbortError as e: + # Caught AbortError check if it is the same one + assert rpc_state.abort_exception is e, 'Abort error has been replaced!' + return + else: + # Check if the abort exception got suppressed + if rpc_state.abort_exception is not None: + _LOGGER.error( + 'Abort error unexpectedly suppressed: %s', + traceback.format_exception(rpc_state.abort_exception) + ) + except (KeyboardInterrupt, SystemExit): + raise + except asyncio.CancelledError: + _LOGGER.debug('RPC cancelled for servicer method [%s]', _decode(rpc_state.method())) + except _ServerStoppedError: + _LOGGER.warning('Aborting method [%s] due to server stop.', _decode(rpc_state.method())) + except ExecuteBatchError: + # If client closed (aka. cancelled), ignore the failed batch operations. + if rpc_state.client_closed: + return + else: + raise + except Exception as e: + _LOGGER.exception('Unexpected [%s] raised by servicer method [%s]' % ( + type(e).__name__, + _decode(rpc_state.method()), + )) + if not rpc_state.status_sent and rpc_state.server._status != AIO_SERVER_STATUS_STOPPED: + # Allows users to raise other types of exception with specified status code + if rpc_state.status_code == StatusCode.ok: + status_code = StatusCode.unknown + else: + status_code = rpc_state.status_code + + rpc_state.status_sent = True + try: + await _send_error_status_from_server( + rpc_state, + status_code, + 'Unexpected %s: %s' % (type(e), e), + rpc_state.trailing_metadata, + rpc_state.create_send_initial_metadata_op_if_not_sent(), + loop + ) + except ExecuteBatchError: + _LOGGER.exception('Failed sending error status from server') + traceback.print_exc() + + +cdef _add_callback_handler(object rpc_task, RPCState rpc_state): + + def handle_callbacks(object unused_task): + try: + for callback in rpc_state.callbacks: + # The _ServicerContext object is bound in add_done_callback. + callback() + except: + _LOGGER.exception('Error in callback for method [%s]', _decode(rpc_state.method())) + + rpc_task.add_done_callback(handle_callbacks) + + +async def _handle_cancellation_from_core(object rpc_task, + RPCState rpc_state, + object loop): + cdef ReceiveCloseOnServerOperation op = ReceiveCloseOnServerOperation(_EMPTY_FLAG) + cdef tuple ops = (op,) + + # Awaits cancellation from peer. + await execute_batch(rpc_state, ops, loop) + rpc_state.client_closed = True + # If 1) received cancel signal; 2) the Task is not finished; 3) the server + # wasn't replying final status. For condition 3, it might cause inaccurate + # log that an RPC is both aborted and cancelled. + if op.cancelled() and not rpc_task.done() and not rpc_state.status_sent: + # Injects `CancelledError` to halt the RPC coroutine + rpc_task.cancel() + + +async def _schedule_rpc_coro(object rpc_coro, + RPCState rpc_state, + object loop): + # Schedules the RPC coroutine. + cdef object rpc_task = loop.create_task(_handle_exceptions( + rpc_state, + rpc_coro, + loop, + )) + _add_callback_handler(rpc_task, rpc_state) + await _handle_cancellation_from_core(rpc_task, rpc_state, loop) + + +async def _handle_rpc(list generic_handlers, tuple interceptors, + RPCState rpc_state, object loop): + cdef object method_handler + # Finds the method handler (application logic) + method_handler = await _find_method_handler( + rpc_state.method().decode(), + rpc_state.invocation_metadata(), + generic_handlers, + interceptors, + ) + if method_handler is None: + rpc_state.status_sent = True + await _send_error_status_from_server( + rpc_state, + StatusCode.unimplemented, + 'Method not found!', + _IMMUTABLE_EMPTY_METADATA, + rpc_state.create_send_initial_metadata_op_if_not_sent(), + loop + ) + return + + # Handles unary-unary case + if not method_handler.request_streaming and not method_handler.response_streaming: + await _handle_unary_unary_rpc(method_handler, + rpc_state, + loop) + return + + # Handles unary-stream case + if not method_handler.request_streaming and method_handler.response_streaming: + await _handle_unary_stream_rpc(method_handler, + rpc_state, + loop) + return + + # Handles stream-unary case + if method_handler.request_streaming and not method_handler.response_streaming: + await _handle_stream_unary_rpc(method_handler, + rpc_state, + loop) + return + + # Handles stream-stream case + if method_handler.request_streaming and method_handler.response_streaming: + await _handle_stream_stream_rpc(method_handler, + rpc_state, + loop) + return + + +class _RequestCallError(Exception): pass + +cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandler( + 'grpc_server_request_call', None, _RequestCallError) + + +cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler( + 'grpc_server_shutdown_and_notify', + None, + InternalError) + + +cdef class _ConcurrentRpcLimiter: + + def __cinit__(self, int maximum_concurrent_rpcs, object loop): + if maximum_concurrent_rpcs <= 0: + raise ValueError("maximum_concurrent_rpcs should be a postive integer") + self._maximum_concurrent_rpcs = maximum_concurrent_rpcs + self._active_rpcs = 0 + self._active_rpcs_condition = asyncio.Condition() + self._loop = loop + + async def check_before_request_call(self): + await self._active_rpcs_condition.acquire() + try: + predicate = lambda: self._active_rpcs < self._maximum_concurrent_rpcs + await self._active_rpcs_condition.wait_for(predicate) + self._active_rpcs += 1 + finally: + self._active_rpcs_condition.release() + + async def _decrease_active_rpcs_count_with_lock(self): + await self._active_rpcs_condition.acquire() + try: + self._active_rpcs -= 1 + self._active_rpcs_condition.notify() + finally: + self._active_rpcs_condition.release() + + def _decrease_active_rpcs_count(self, unused_future): + self._loop.create_task(self._decrease_active_rpcs_count_with_lock()) + + def decrease_once_finished(self, object rpc_task): + rpc_task.add_done_callback(self._decrease_active_rpcs_count) + + +cdef class AioServer: + + def __init__(self, loop, thread_pool, generic_handlers, interceptors, + options, maximum_concurrent_rpcs): + init_grpc_aio() + # NOTE(lidiz) Core objects won't be deallocated automatically. + # If AioServer.shutdown is not called, those objects will leak. + # TODO(rbellevi): Support xDS in aio server. + self._server = Server(options, False) + grpc_server_register_completion_queue( + self._server.c_server, + global_completion_queue(), + NULL + ) + + self._loop = loop + self._status = AIO_SERVER_STATUS_READY + self._generic_handlers = [] + self.add_generic_rpc_handlers(generic_handlers) + self._serving_task = None + + self._shutdown_lock = asyncio.Lock() + self._shutdown_completed = self._loop.create_future() + self._shutdown_callback_wrapper = CallbackWrapper( + self._shutdown_completed, + self._loop, + SERVER_SHUTDOWN_FAILURE_HANDLER) + self._crash_exception = None + + if interceptors: + self._interceptors = tuple(interceptors) + else: + self._interceptors = () + + self._thread_pool = thread_pool + if maximum_concurrent_rpcs is not None: + self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs, + loop) + + def add_generic_rpc_handlers(self, object generic_rpc_handlers): + self._generic_handlers.extend(generic_rpc_handlers) + + def add_insecure_port(self, address): + return self._server.add_http2_port(address) + + def add_secure_port(self, address, server_credentials): + return self._server.add_http2_port(address, + server_credentials._credentials) + + async def _request_call(self): + cdef grpc_call_error error + cdef RPCState rpc_state = RPCState(self) + cdef object future = self._loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper( + future, + self._loop, + REQUEST_CALL_FAILURE_HANDLER) + error = grpc_server_request_call( + self._server.c_server, &rpc_state.call, &rpc_state.details, + &rpc_state.request_metadata, + global_completion_queue(), global_completion_queue(), + wrapper.c_functor() + ) + if error != GRPC_CALL_OK: + raise InternalError("Error in grpc_server_request_call: %s" % error) + + await future + return rpc_state + + async def _server_main_loop(self, + object server_started): + self._server.start(backup_queue=False) + cdef RPCState rpc_state + server_started.set_result(True) + + while True: + # When shutdown begins, no more new connections. + if self._status != AIO_SERVER_STATUS_RUNNING: + break + + if self._limiter is not None: + await self._limiter.check_before_request_call() + + # Accepts new request from Core + rpc_state = await self._request_call() + + # Creates the dedicated RPC coroutine. If we schedule it right now, + # there is no guarantee if the cancellation listening coroutine is + # ready or not. So, we should control the ordering by scheduling + # the coroutine onto event loop inside of the cancellation + # coroutine. + rpc_coro = _handle_rpc(self._generic_handlers, + self._interceptors, + rpc_state, + self._loop) + + # Fires off a task that listens on the cancellation from client. + rpc_task = self._loop.create_task( + _schedule_rpc_coro( + rpc_coro, + rpc_state, + self._loop + ) + ) + + if self._limiter is not None: + self._limiter.decrease_once_finished(rpc_task) + + def _serving_task_crash_handler(self, object task): + """Shutdown the server immediately if unexpectedly exited.""" + if task.cancelled(): + return + if task.exception() is None: + return + if self._status != AIO_SERVER_STATUS_STOPPING: + self._crash_exception = task.exception() + _LOGGER.exception(self._crash_exception) + self._loop.create_task(self.shutdown(None)) + + async def start(self): + if self._status == AIO_SERVER_STATUS_RUNNING: + return + elif self._status != AIO_SERVER_STATUS_READY: + raise UsageError('Server not in ready state') + + self._status = AIO_SERVER_STATUS_RUNNING + cdef object server_started = self._loop.create_future() + self._serving_task = self._loop.create_task(self._server_main_loop(server_started)) + self._serving_task.add_done_callback(self._serving_task_crash_handler) + # Needs to explicitly wait for the server to start up. + # Otherwise, the actual start time of the server is un-controllable. + await server_started + + async def _start_shutting_down(self): + """Prepares the server to shutting down. + + This coroutine function is NOT coroutine-safe. + """ + # The shutdown callback won't be called until there is no live RPC. + grpc_server_shutdown_and_notify( + self._server.c_server, + global_completion_queue(), + self._shutdown_callback_wrapper.c_functor()) + + # Ensures the serving task (coroutine) exits. + try: + await self._serving_task + except _RequestCallError: + pass + + async def shutdown(self, grace): + """Gracefully shutdown the Core server. + + Application should only call shutdown once. + + Args: + grace: An optional float indicating the length of grace period in + seconds. + """ + if self._status == AIO_SERVER_STATUS_READY or self._status == AIO_SERVER_STATUS_STOPPED: + return + + async with self._shutdown_lock: + if self._status == AIO_SERVER_STATUS_RUNNING: + self._server.is_shutting_down = True + self._status = AIO_SERVER_STATUS_STOPPING + await self._start_shutting_down() + + if grace is None: + # Directly cancels all calls + grpc_server_cancel_all_calls(self._server.c_server) + await self._shutdown_completed + else: + try: + await asyncio.wait_for( + asyncio.shield(self._shutdown_completed), + grace, + ) + except asyncio.TimeoutError: + # Cancels all ongoing calls by the end of grace period. + grpc_server_cancel_all_calls(self._server.c_server) + await self._shutdown_completed + + async with self._shutdown_lock: + if self._status == AIO_SERVER_STATUS_STOPPING: + grpc_server_destroy(self._server.c_server) + self._server.c_server = NULL + self._server.is_shutdown = True + self._status = AIO_SERVER_STATUS_STOPPED + + async def wait_for_termination(self, object timeout): + if timeout is None: + await self._shutdown_completed + else: + try: + await asyncio.wait_for( + asyncio.shield(self._shutdown_completed), + timeout, + ) + except asyncio.TimeoutError: + if self._crash_exception is not None: + raise self._crash_exception + return True + if self._crash_exception is not None: + raise self._crash_exception + return False + + def __dealloc__(self): + """Deallocation of Core objects are ensured by Python layer.""" + # TODO(lidiz) if users create server, and then dealloc it immediately. + # There is a potential memory leak of created Core server. + if self._status != AIO_SERVER_STATUS_STOPPED: + _LOGGER.debug( + '__dealloc__ called on running server %s with status %d', + self, + self._status + ) + shutdown_grpc_aio() + + cdef thread_pool(self): + """Access the thread pool instance.""" + return self._thread_pool + + def is_running(self): + return self._status == AIO_SERVER_STATUS_RUNNING diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pxd.pxi new file mode 100644 index 00000000000..251efe15b39 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pxd.pxi @@ -0,0 +1,36 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef tuple _wrap_grpc_arg(grpc_arg arg) + + +cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg) + + +cdef class _ChannelArg: + + cdef grpc_arg c_argument + + cdef void c(self, argument, references) except * + + +cdef class _ChannelArgs: + + cdef readonly tuple _arguments + cdef list _channel_args + cdef readonly list _references + cdef grpc_channel_args _c_arguments + + cdef grpc_channel_args *c_args(self) except * diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pyx.pxi new file mode 100644 index 00000000000..9df308cdbcd --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pyx.pxi @@ -0,0 +1,85 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _GrpcArgWrapper: + + cdef grpc_arg arg + + +cdef tuple _wrap_grpc_arg(grpc_arg arg): + wrapped = _GrpcArgWrapper() + wrapped.arg = arg + return ("grpc.python._cygrpc._GrpcArgWrapper", wrapped) + + +cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg): + cdef _GrpcArgWrapper wrapped = wrapped_arg[1] + return wrapped.arg + + +cdef class _ChannelArg: + + cdef void c(self, argument, references) except *: + key, value = argument + cdef bytes encoded_key = _encode(key) + if encoded_key is not key: + references.append(encoded_key) + self.c_argument.key = encoded_key + if isinstance(value, int): + self.c_argument.type = GRPC_ARG_INTEGER + self.c_argument.value.integer = value + elif isinstance(value, (bytes, str, unicode,)): + self.c_argument.type = GRPC_ARG_STRING + encoded_value = _encode(value) + if encoded_value is not value: + references.append(encoded_value) + self.c_argument.value.string = encoded_value + elif isinstance(value, _GrpcArgWrapper): + self.c_argument = (<_GrpcArgWrapper>value).arg + elif hasattr(value, '__int__'): + # Pointer objects must override __int__() to return + # the underlying C address (Python ints are word size). The + # lifecycle of the pointer is fixed to the lifecycle of the + # python object wrapping it. + self.c_argument.type = GRPC_ARG_POINTER + self.c_argument.value.pointer.vtable = &default_vtable + self.c_argument.value.pointer.address = <void*>(<intptr_t>int(value)) + else: + raise TypeError( + 'Expected int, bytes, or behavior, got {}'.format(type(value))) + + +cdef class _ChannelArgs: + + def __cinit__(self, arguments): + self._arguments = () if arguments is None else tuple(arguments) + self._channel_args = [] + self._references = [] + self._c_arguments.arguments_length = len(self._arguments) + if self._c_arguments.arguments_length != 0: + self._c_arguments.arguments = <grpc_arg *>gpr_malloc( + self._c_arguments.arguments_length * sizeof(grpc_arg)) + for index, argument in enumerate(self._arguments): + channel_arg = _ChannelArg() + channel_arg.c(argument, self._references) + self._c_arguments.arguments[index] = channel_arg.c_argument + self._channel_args.append(channel_arg) + + cdef grpc_channel_args *c_args(self) except *: + return &self._c_arguments + + def __dealloc__(self): + if self._c_arguments.arguments != NULL: + gpr_free(self._c_arguments.arguments) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pxd.pxi new file mode 100644 index 00000000000..8babeb45361 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pxd.pxi @@ -0,0 +1,20 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Call: + + cdef grpc_call *c_call + cdef list references + diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pyx.pxi new file mode 100644 index 00000000000..f68e166b17a --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pyx.pxi @@ -0,0 +1,97 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Call: + + def __cinit__(self): + # Create an *empty* call + fork_handlers_and_grpc_init() + self.c_call = NULL + self.references = [] + + def _start_batch(self, operations, tag, retain_self): + if not self.is_valid: + raise ValueError("invalid call object cannot be used from Python") + cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag( + tag, operations, self if retain_self else None) + batch_operation_tag.prepare() + cpython.Py_INCREF(batch_operation_tag) + cdef grpc_call_error error + with nogil: + error = grpc_call_start_batch( + self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops, + <cpython.PyObject *>batch_operation_tag, NULL) + return error + + def start_client_batch(self, operations, tag): + # We don't reference this call in the operations tag because + # it should be cancelled when it goes out of scope + return self._start_batch(operations, tag, False) + + def start_server_batch(self, operations, tag): + return self._start_batch(operations, tag, True) + + def cancel( + self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE, + details=None): + details = str_to_bytes(details) + if not self.is_valid: + raise ValueError("invalid call object cannot be used from Python") + if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE): + raise ValueError("if error_code is specified, so must details " + "(and vice-versa)") + cdef grpc_call_error result + cdef char *c_details = NULL + if error_code != GRPC_STATUS__DO_NOT_USE: + self.references.append(details) + c_details = details + with nogil: + result = grpc_call_cancel_with_status( + self.c_call, error_code, c_details, NULL) + return result + else: + with nogil: + result = grpc_call_cancel(self.c_call, NULL) + return result + + def set_credentials(self, CallCredentials call_credentials not None): + cdef grpc_call_credentials *c_call_credentials = call_credentials.c() + cdef grpc_call_error call_error = grpc_call_set_credentials( + self.c_call, c_call_credentials) + grpc_call_credentials_release(c_call_credentials) + return call_error + + def peer(self): + cdef char *peer = NULL + with nogil: + peer = grpc_call_get_peer(self.c_call) + result = <bytes>peer + with nogil: + gpr_free(peer) + return result + + def __dealloc__(self): + with nogil: + if self.c_call != NULL: + grpc_call_unref(self.c_call) + grpc_shutdown() + + # The object *should* always be valid from Python. Used for debugging. + @property + def is_valid(self): + return self.c_call != NULL + + def _custom_op_on_c_call(self, int op): + return _custom_op_on_c_call(op, self.c_call) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi new file mode 100644 index 00000000000..eb27f2df7ad --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi @@ -0,0 +1,74 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef _check_call_error_no_metadata(c_call_error) + + +cdef _check_and_raise_call_error_no_metadata(c_call_error) + + +cdef _check_call_error(c_call_error, metadata) + + +cdef class _CallState: + + cdef grpc_call *c_call + cdef set due + + +cdef class _ChannelState: + + cdef object condition + cdef grpc_channel *c_channel + # A boolean field indicating that the channel is open (if True) or is being + # closed (i.e. a call to close is currently executing) or is closed (if + # False). + # TODO(https://github.com/grpc/grpc/issues/3064): Eliminate "is being closed" + # a state in which condition may be acquired by any thread, eliminate this + # field and just use the NULLness of c_channel as an indication that the + # channel is closed. + cdef object open + cdef object closed_reason + + # A dict from _BatchOperationTag to _CallState + cdef dict integrated_call_states + cdef grpc_completion_queue *c_call_completion_queue + + # A set of _CallState + cdef set segregated_call_states + + cdef set connectivity_due + cdef grpc_completion_queue *c_connectivity_completion_queue + + +cdef class IntegratedCall: + + cdef _ChannelState _channel_state + cdef _CallState _call_state + + +cdef class SegregatedCall: + + cdef _ChannelState _channel_state + cdef _CallState _call_state + cdef grpc_completion_queue *_c_completion_queue + + +cdef class Channel: + + cdef _ChannelState _state + + # TODO(https://github.com/grpc/grpc/issues/15662): Eliminate this. + cdef tuple _arguments diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi new file mode 100644 index 00000000000..d49a4210f7c --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -0,0 +1,516 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( + 'Internal gRPC call error %d. ' + + 'Please report to https://github.com/grpc/grpc/issues') + + +cdef str _call_error_metadata(metadata): + return 'metadata was invalid: %s' % metadata + + +cdef str _call_error_no_metadata(c_call_error): + return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error + + +cdef str _call_error(c_call_error, metadata): + if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: + return _call_error_metadata(metadata) + else: + return _call_error_no_metadata(c_call_error) + + +cdef _check_call_error_no_metadata(c_call_error): + if c_call_error != GRPC_CALL_OK: + return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error + else: + return None + + +cdef _check_and_raise_call_error_no_metadata(c_call_error): + error = _check_call_error_no_metadata(c_call_error) + if error is not None: + raise ValueError(error) + + +cdef _check_call_error(c_call_error, metadata): + if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: + return _call_error_metadata(metadata) + else: + return _check_call_error_no_metadata(c_call_error) + + +cdef void _raise_call_error_no_metadata(c_call_error) except *: + raise ValueError(_call_error_no_metadata(c_call_error)) + + +cdef void _raise_call_error(c_call_error, metadata) except *: + raise ValueError(_call_error(c_call_error, metadata)) + + +cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue): + grpc_completion_queue_shutdown(c_completion_queue) + grpc_completion_queue_destroy(c_completion_queue) + + +cdef class _CallState: + + def __cinit__(self): + self.due = set() + + +cdef class _ChannelState: + + def __cinit__(self): + self.condition = threading.Condition() + self.open = True + self.integrated_call_states = {} + self.segregated_call_states = set() + self.connectivity_due = set() + self.closed_reason = None + + +cdef tuple _operate(grpc_call *c_call, object operations, object user_tag): + cdef grpc_call_error c_call_error + cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None) + tag.prepare() + cpython.Py_INCREF(tag) + with nogil: + c_call_error = grpc_call_start_batch( + c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL) + return c_call_error, tag + + +cdef object _operate_from_integrated_call( + _ChannelState channel_state, _CallState call_state, object operations, + object user_tag): + cdef grpc_call_error c_call_error + cdef _BatchOperationTag tag + with channel_state.condition: + if call_state.due: + c_call_error, tag = _operate(call_state.c_call, operations, user_tag) + if c_call_error == GRPC_CALL_OK: + call_state.due.add(tag) + channel_state.integrated_call_states[tag] = call_state + return True + else: + _raise_call_error_no_metadata(c_call_error) + else: + return False + + +cdef object _operate_from_segregated_call( + _ChannelState channel_state, _CallState call_state, object operations, + object user_tag): + cdef grpc_call_error c_call_error + cdef _BatchOperationTag tag + with channel_state.condition: + if call_state.due: + c_call_error, tag = _operate(call_state.c_call, operations, user_tag) + if c_call_error == GRPC_CALL_OK: + call_state.due.add(tag) + return True + else: + _raise_call_error_no_metadata(c_call_error) + else: + return False + + +cdef _cancel( + _ChannelState channel_state, _CallState call_state, grpc_status_code code, + str details): + cdef grpc_call_error c_call_error + with channel_state.condition: + if call_state.due: + c_call_error = grpc_call_cancel_with_status( + call_state.c_call, code, _encode(details), NULL) + _check_and_raise_call_error_no_metadata(c_call_error) + + +cdef _next_call_event( + _ChannelState channel_state, grpc_completion_queue *c_completion_queue, + on_success, on_failure, deadline): + """Block on the next event out of the completion queue. + + On success, `on_success` will be invoked with the tag taken from the CQ. + In the case of a failure due to an exception raised in a signal handler, + `on_failure` will be invoked with no arguments. Note that this situation + can only occur on the main thread. + + Args: + channel_state: The state for the channel on which the RPC is running. + c_completion_queue: The CQ which will be polled. + on_success: A callable object to be invoked upon successful receipt of a + tag from the CQ. + on_failure: A callable object to be invoked in case a Python exception is + raised from a signal handler during polling. + deadline: The point after which the RPC will time out. + """ + try: + tag, event = _latent_event(c_completion_queue, deadline) + # NOTE(rbellevi): This broad except enables us to clean up resources before + # propagating any exceptions raised by signal handlers to the application. + except: + if on_failure is not None: + on_failure() + raise + else: + with channel_state.condition: + on_success(tag) + channel_state.condition.notify_all() + return event + + +# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler. +cdef void _call( + _ChannelState channel_state, _CallState call_state, + grpc_completion_queue *c_completion_queue, on_success, int flags, method, + host, object deadline, CallCredentials credentials, + object operationses_and_user_tags, object metadata, + object context) except *: + """Invokes an RPC. + + Args: + channel_state: A _ChannelState with its "open" attribute set to True. RPCs + may not be invoked on a closed channel. + call_state: An empty _CallState to be altered (specifically assigned a + c_call and having its due set populated) if the RPC invocation is + successful. + c_completion_queue: A grpc_completion_queue to be used for the call's + operations. + on_success: A behavior to be called if attempting to start operations for + the call succeeds. If called the behavior will be called while holding the + channel_state condition and passed the tags associated with operations + that were successfully started for the call. + flags: Flags to be passed to gRPC Core as part of call creation. + method: The fully-qualified name of the RPC method being invoked. + host: A "host" string to be passed to gRPC Core as part of call creation. + deadline: A float for the deadline of the RPC, or None if the RPC is to have + no deadline. + credentials: A _CallCredentials for the RPC or None. + operationses_and_user_tags: A sequence of length-two sequences the first + element of which is a sequence of Operations and the second element of + which is an object to be used as a tag. A SendInitialMetadataOperation + must be present in the first element of this value. + metadata: The metadata for this call. + context: Context object for distributed tracing. + """ + cdef grpc_slice method_slice + cdef grpc_slice host_slice + cdef grpc_slice *host_slice_ptr + cdef grpc_call_credentials *c_call_credentials + cdef grpc_call_error c_call_error + cdef tuple error_and_wrapper_tag + cdef _BatchOperationTag wrapper_tag + with channel_state.condition: + if channel_state.open: + method_slice = _slice_from_bytes(method) + if host is None: + host_slice_ptr = NULL + else: + host_slice = _slice_from_bytes(host) + host_slice_ptr = &host_slice + call_state.c_call = grpc_channel_create_call( + channel_state.c_channel, NULL, flags, + c_completion_queue, method_slice, host_slice_ptr, + _timespec_from_time(deadline), NULL) + grpc_slice_unref(method_slice) + if host_slice_ptr: + grpc_slice_unref(host_slice) + if context is not None: + set_census_context_on_call(call_state, context) + if credentials is not None: + c_call_credentials = credentials.c() + c_call_error = grpc_call_set_credentials( + call_state.c_call, c_call_credentials) + grpc_call_credentials_release(c_call_credentials) + if c_call_error != GRPC_CALL_OK: + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + _raise_call_error_no_metadata(c_call_error) + started_tags = set() + for operations, user_tag in operationses_and_user_tags: + c_call_error, tag = _operate(call_state.c_call, operations, user_tag) + if c_call_error == GRPC_CALL_OK: + started_tags.add(tag) + else: + grpc_call_cancel(call_state.c_call, NULL) + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + _raise_call_error(c_call_error, metadata) + else: + call_state.due.update(started_tags) + on_success(started_tags) + else: + raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason) + + +cdef void _process_integrated_call_tag( + _ChannelState state, _BatchOperationTag tag) except *: + cdef _CallState call_state = state.integrated_call_states.pop(tag) + call_state.due.remove(tag) + if not call_state.due: + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + + +cdef class IntegratedCall: + + def __cinit__(self, _ChannelState channel_state, _CallState call_state): + self._channel_state = channel_state + self._call_state = call_state + + def operate(self, operations, tag): + return _operate_from_integrated_call( + self._channel_state, self._call_state, operations, tag) + + def cancel(self, code, details): + _cancel(self._channel_state, self._call_state, code, details) + + +cdef IntegratedCall _integrated_call( + _ChannelState state, int flags, method, host, object deadline, + object metadata, CallCredentials credentials, operationses_and_user_tags, + object context): + call_state = _CallState() + + def on_success(started_tags): + for started_tag in started_tags: + state.integrated_call_states[started_tag] = call_state + + _call( + state, call_state, state.c_call_completion_queue, on_success, flags, + method, host, deadline, credentials, operationses_and_user_tags, metadata, context) + + return IntegratedCall(state, call_state) + + +cdef object _process_segregated_call_tag( + _ChannelState state, _CallState call_state, + grpc_completion_queue *c_completion_queue, _BatchOperationTag tag): + call_state.due.remove(tag) + if not call_state.due: + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + state.segregated_call_states.remove(call_state) + _destroy_c_completion_queue(c_completion_queue) + return True + else: + return False + + +cdef class SegregatedCall: + + def __cinit__(self, _ChannelState channel_state, _CallState call_state): + self._channel_state = channel_state + self._call_state = call_state + + def operate(self, operations, tag): + return _operate_from_segregated_call( + self._channel_state, self._call_state, operations, tag) + + def cancel(self, code, details): + _cancel(self._channel_state, self._call_state, code, details) + + def next_event(self): + def on_success(tag): + _process_segregated_call_tag( + self._channel_state, self._call_state, self._c_completion_queue, tag) + def on_failure(): + self._call_state.due.clear() + grpc_call_unref(self._call_state.c_call) + self._call_state.c_call = NULL + self._channel_state.segregated_call_states.remove(self._call_state) + _destroy_c_completion_queue(self._c_completion_queue) + return _next_call_event( + self._channel_state, self._c_completion_queue, on_success, on_failure, None) + + +cdef SegregatedCall _segregated_call( + _ChannelState state, int flags, method, host, object deadline, + object metadata, CallCredentials credentials, operationses_and_user_tags, + object context): + cdef _CallState call_state = _CallState() + cdef SegregatedCall segregated_call + cdef grpc_completion_queue *c_completion_queue + + def on_success(started_tags): + state.segregated_call_states.add(call_state) + + with state.condition: + if state.open: + c_completion_queue = (grpc_completion_queue_create_for_next(NULL)) + else: + raise ValueError('Cannot invoke RPC on closed channel!') + + try: + _call( + state, call_state, c_completion_queue, on_success, flags, method, host, + deadline, credentials, operationses_and_user_tags, metadata, + context) + except: + _destroy_c_completion_queue(c_completion_queue) + raise + + segregated_call = SegregatedCall(state, call_state) + segregated_call._c_completion_queue = c_completion_queue + return segregated_call + + +cdef object _watch_connectivity_state( + _ChannelState state, grpc_connectivity_state last_observed_state, + object deadline): + cdef _ConnectivityTag tag = _ConnectivityTag(object()) + with state.condition: + if state.open: + cpython.Py_INCREF(tag) + grpc_channel_watch_connectivity_state( + state.c_channel, last_observed_state, _timespec_from_time(deadline), + state.c_connectivity_completion_queue, <cpython.PyObject *>tag) + state.connectivity_due.add(tag) + else: + raise ValueError('Cannot monitor channel state: %s' % state.closed_reason) + completed_tag, event = _latent_event( + state.c_connectivity_completion_queue, None) + with state.condition: + state.connectivity_due.remove(completed_tag) + state.condition.notify_all() + return event + + +cdef _close(Channel channel, grpc_status_code code, object details, + drain_calls): + cdef _ChannelState state = channel._state + cdef _CallState call_state + encoded_details = _encode(details) + with state.condition: + if state.open: + state.open = False + state.closed_reason = details + for call_state in set(state.integrated_call_states.values()): + grpc_call_cancel_with_status( + call_state.c_call, code, encoded_details, NULL) + for call_state in state.segregated_call_states: + grpc_call_cancel_with_status( + call_state.c_call, code, encoded_details, NULL) + # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity + # watching. + + if drain_calls: + while not _calls_drained(state): + event = channel.next_call_event() + if event.completion_type == CompletionType.queue_timeout: + continue + event.tag(event) + else: + while state.integrated_call_states: + state.condition.wait() + while state.connectivity_due: + state.condition.wait() + + _destroy_c_completion_queue(state.c_call_completion_queue) + _destroy_c_completion_queue(state.c_connectivity_completion_queue) + grpc_channel_destroy(state.c_channel) + state.c_channel = NULL + grpc_shutdown() + state.condition.notify_all() + else: + # Another call to close already completed in the past or is currently + # being executed in another thread. + while state.c_channel != NULL: + state.condition.wait() + + +cdef _calls_drained(_ChannelState state): + return not (state.integrated_call_states or state.segregated_call_states or + state.connectivity_due) + +cdef class Channel: + + def __cinit__( + self, bytes target, object arguments, + ChannelCredentials channel_credentials): + arguments = () if arguments is None else tuple(arguments) + fork_handlers_and_grpc_init() + self._state = _ChannelState() + self._state.c_call_completion_queue = ( + grpc_completion_queue_create_for_next(NULL)) + self._state.c_connectivity_completion_queue = ( + grpc_completion_queue_create_for_next(NULL)) + self._arguments = arguments + cdef _ChannelArgs channel_args = _ChannelArgs(arguments) + c_channel_credentials = ( + channel_credentials.c() if channel_credentials is not None + else grpc_insecure_credentials_create()) + self._state.c_channel = grpc_channel_create( + <char *>target, c_channel_credentials, channel_args.c_args()) + grpc_channel_credentials_release(c_channel_credentials) + + def target(self): + cdef char *c_target + with self._state.condition: + c_target = grpc_channel_get_target(self._state.c_channel) + target = <bytes>c_target + gpr_free(c_target) + return target + + def integrated_call( + self, int flags, method, host, object deadline, object metadata, + CallCredentials credentials, operationses_and_tags, + object context = None): + return _integrated_call( + self._state, flags, method, host, deadline, metadata, credentials, + operationses_and_tags, context) + + def next_call_event(self): + def on_success(tag): + if tag is not None: + _process_integrated_call_tag(self._state, tag) + if is_fork_support_enabled(): + queue_deadline = time.time() + 1.0 + else: + queue_deadline = None + # NOTE(gnossen): It is acceptable for on_failure to be None here because + # failure conditions can only ever happen on the main thread and this + # method is only ever invoked on the channel spin thread. + return _next_call_event(self._state, self._state.c_call_completion_queue, + on_success, None, queue_deadline) + + def segregated_call( + self, int flags, method, host, object deadline, object metadata, + CallCredentials credentials, operationses_and_tags, + object context = None): + return _segregated_call( + self._state, flags, method, host, deadline, metadata, credentials, + operationses_and_tags, context) + + def check_connectivity_state(self, bint try_to_connect): + with self._state.condition: + if self._state.open: + return grpc_channel_check_connectivity_state( + self._state.c_channel, try_to_connect) + else: + raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason) + + def watch_connectivity_state( + self, grpc_connectivity_state last_observed_state, object deadline): + return _watch_connectivity_state(self._state, last_observed_state, deadline) + + def close(self, code, details): + _close(self, code, details, False) + + def close_on_fork(self, code, details): + _close(self, code, details, True) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channelz.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channelz.pyx.pxi new file mode 100644 index 00000000000..36c8cd121c7 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channelz.pyx.pxi @@ -0,0 +1,71 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def channelz_get_top_channels(start_channel_id): + cdef char *c_returned_str = grpc_channelz_get_top_channels( + start_channel_id, + ) + if c_returned_str == NULL: + raise ValueError('Failed to get top channels, please ensure your' \ + ' start_channel_id==%s is valid' % start_channel_id) + return c_returned_str + +def channelz_get_servers(start_server_id): + cdef char *c_returned_str = grpc_channelz_get_servers(start_server_id) + if c_returned_str == NULL: + raise ValueError('Failed to get servers, please ensure your' \ + ' start_server_id==%s is valid' % start_server_id) + return c_returned_str + +def channelz_get_server(server_id): + cdef char *c_returned_str = grpc_channelz_get_server(server_id) + if c_returned_str == NULL: + raise ValueError('Failed to get the server, please ensure your' \ + ' server_id==%s is valid' % server_id) + return c_returned_str + +def channelz_get_server_sockets(server_id, start_socket_id, max_results): + cdef char *c_returned_str = grpc_channelz_get_server_sockets( + server_id, + start_socket_id, + max_results, + ) + if c_returned_str == NULL: + raise ValueError('Failed to get server sockets, please ensure your' \ + ' server_id==%s and start_socket_id==%s and' \ + ' max_results==%s is valid' % + (server_id, start_socket_id, max_results)) + return c_returned_str + +def channelz_get_channel(channel_id): + cdef char *c_returned_str = grpc_channelz_get_channel(channel_id) + if c_returned_str == NULL: + raise ValueError('Failed to get the channel, please ensure your' \ + ' channel_id==%s is valid' % (channel_id)) + return c_returned_str + +def channelz_get_subchannel(subchannel_id): + cdef char *c_returned_str = grpc_channelz_get_subchannel(subchannel_id) + if c_returned_str == NULL: + raise ValueError('Failed to get the subchannel, please ensure your' \ + ' subchannel_id==%s is valid' % (subchannel_id)) + return c_returned_str + +def channelz_get_socket(socket_id): + cdef char *c_returned_str = grpc_channelz_get_socket(socket_id) + if c_returned_str == NULL: + raise ValueError('Failed to get the socket, please ensure your' \ + ' socket_id==%s is valid' % (socket_id)) + return c_returned_str diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pxd.pxi new file mode 100644 index 00000000000..ec13e60f9d3 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pxd.pxi @@ -0,0 +1,32 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef int g_interrupt_check_period_ms + +cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except * + + +cdef _interpret_event(grpc_event c_event) + +cdef class _LatentEventArg: + cdef grpc_completion_queue *c_completion_queue + cdef object deadline + +cdef class CompletionQueue: + + cdef grpc_completion_queue *c_completion_queue + cdef bint is_shutting_down + cdef bint is_shutdown + + cdef _interpret_event(self, grpc_event c_event) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pyx.pxi new file mode 100644 index 00000000000..2e4e0107739 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -0,0 +1,139 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +g_interrupt_check_period_ms = 200 + +cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *: + global g_interrupt_check_period_ms + cdef gpr_timespec c_increment + cdef gpr_timespec c_timeout + cdef gpr_timespec c_deadline + c_increment = gpr_time_from_millis(g_interrupt_check_period_ms, GPR_TIMESPAN) + if deadline is None: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + else: + c_deadline = _timespec_from_time(deadline) + + while True: + with nogil: + c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) + if gpr_time_cmp(c_timeout, c_deadline) > 0: + c_timeout = c_deadline + + c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL) + + if (c_event.type != GRPC_QUEUE_TIMEOUT or + gpr_time_cmp(c_timeout, c_deadline) == 0): + break + + # Handle any signals + cpython.PyErr_CheckSignals() + return c_event + +cdef _interpret_event(grpc_event c_event): + cdef _Tag tag + if c_event.type == GRPC_QUEUE_TIMEOUT: + # TODO(ericgribkoff) Do not coopt ConnectivityEvent here. + return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) + elif c_event.type == GRPC_QUEUE_SHUTDOWN: + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None) + else: + tag = <_Tag>c_event.tag + # We receive event tags only after they've been inc-ref'd elsewhere in + # the code. + cpython.Py_DECREF(tag) + return tag, tag.event(c_event) + +cdef _internal_latent_event(_LatentEventArg latent_event_arg): + cdef grpc_event c_event = _next(latent_event_arg.c_completion_queue, latent_event_arg.deadline) + return _interpret_event(c_event) + +cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline): + global g_gevent_activated + + latent_event_arg = _LatentEventArg() + latent_event_arg.c_completion_queue = c_completion_queue + latent_event_arg.deadline = deadline + + if g_gevent_activated: + # For gevent, completion_queue_next is run in a native thread pool. + global g_gevent_threadpool + + result = g_gevent_threadpool.apply(_internal_latent_event, (latent_event_arg,)) + return result + else: + return _internal_latent_event(latent_event_arg) + +cdef class CompletionQueue: + + def __cinit__(self, shutdown_cq=False): + cdef grpc_completion_queue_attributes c_attrs + fork_handlers_and_grpc_init() + if shutdown_cq: + c_attrs.version = 1 + c_attrs.cq_completion_type = GRPC_CQ_NEXT + c_attrs.cq_polling_type = GRPC_CQ_NON_LISTENING + c_attrs.cq_shutdown_cb = NULL + self.c_completion_queue = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&c_attrs), &c_attrs, NULL); + else: + self.c_completion_queue = grpc_completion_queue_create_for_next(NULL) + self.is_shutting_down = False + self.is_shutdown = False + + cdef _interpret_event(self, grpc_event c_event): + unused_tag, event = _interpret_event(c_event) + if event.completion_type == GRPC_QUEUE_SHUTDOWN: + self.is_shutdown = True + return event + + def _internal_poll(self, deadline): + return self._interpret_event(_next(self.c_completion_queue, deadline)) + + # We name this 'poll' to avoid problems with CPython's expectations for + # 'special' methods (like next and __next__). + def poll(self, deadline=None): + global g_gevent_activated + if g_gevent_activated: + return g_gevent_threadpool.apply(CompletionQueue._internal_poll, (self, deadline)) + else: + return self._internal_poll(deadline) + + def shutdown(self): + with nogil: + grpc_completion_queue_shutdown(self.c_completion_queue) + self.is_shutting_down = True + + def clear(self): + if not self.is_shutting_down: + raise ValueError('queue must be shutting down to be cleared') + while self.poll().type != GRPC_QUEUE_SHUTDOWN: + pass + + def __dealloc__(self): + cdef gpr_timespec c_deadline + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + if self.c_completion_queue != NULL: + # Ensure shutdown + if not self.is_shutting_down: + grpc_completion_queue_shutdown(self.c_completion_queue) + # Pump the queue (All outstanding calls should have been cancelled) + while not self.is_shutdown: + event = grpc_completion_queue_next( + self.c_completion_queue, c_deadline, NULL) + self._interpret_event(event) + grpc_completion_queue_destroy(self.c_completion_queue) + grpc_shutdown() diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pxd.pxi new file mode 100644 index 00000000000..827f6f17ca3 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pxd.pxi @@ -0,0 +1,117 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class CallCredentials: + + cdef grpc_call_credentials *c(self) except * + + # TODO(https://github.com/grpc/grpc/issues/12531): remove. + cdef grpc_call_credentials *c_credentials + + +cdef int _get_metadata( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) except * with gil + +cdef void _destroy(void *state) except * with gil + + +cdef class MetadataPluginCallCredentials(CallCredentials): + + cdef readonly object _metadata_plugin + cdef readonly bytes _name + + cdef grpc_call_credentials *c(self) except * + + +cdef grpc_call_credentials *_composition(call_credentialses) + + +cdef class CompositeCallCredentials(CallCredentials): + + cdef readonly tuple _call_credentialses + + cdef grpc_call_credentials *c(self) except * + + +cdef class ChannelCredentials: + + cdef grpc_channel_credentials *c(self) except * + + +cdef class SSLSessionCacheLRU: + + cdef grpc_ssl_session_cache *_cache + + +cdef class SSLChannelCredentials(ChannelCredentials): + + cdef readonly object _pem_root_certificates + cdef readonly object _private_key + cdef readonly object _certificate_chain + + cdef grpc_channel_credentials *c(self) except * + + +cdef class CompositeChannelCredentials(ChannelCredentials): + + cdef readonly tuple _call_credentialses + cdef readonly ChannelCredentials _channel_credentials + + cdef grpc_channel_credentials *c(self) except * + + +cdef class XDSChannelCredentials(ChannelCredentials): + + cdef readonly ChannelCredentials _fallback_credentials + + cdef grpc_channel_credentials *c(self) except * + + +cdef class ServerCertificateConfig: + + cdef grpc_ssl_server_certificate_config *c_cert_config + cdef const char *c_pem_root_certs + cdef grpc_ssl_pem_key_cert_pair *c_ssl_pem_key_cert_pairs + cdef size_t c_ssl_pem_key_cert_pairs_count + cdef list references + + +cdef class ServerCredentials: + + cdef grpc_server_credentials *c_credentials + cdef grpc_ssl_pem_key_cert_pair *c_ssl_pem_key_cert_pairs + cdef size_t c_ssl_pem_key_cert_pairs_count + cdef list references + # the cert config related state is used only if this credentials is + # created with cert config/fetcher + cdef object initial_cert_config + cdef object cert_config_fetcher + # whether C-core has asked for the initial_cert_config + cdef bint initial_cert_config_fetched + + +cdef class LocalChannelCredentials(ChannelCredentials): + + cdef grpc_local_connect_type _local_connect_type + + +cdef class ALTSChannelCredentials(ChannelCredentials): + cdef grpc_alts_credentials_options *c_options + + cdef grpc_channel_credentials *c(self) except * diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi new file mode 100644 index 00000000000..27b56aa378b --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -0,0 +1,443 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def _spawn_callback_in_thread(cb_func, args): + t = ForkManagedThread(target=cb_func, args=args) + t.setDaemon(True) + t.start() + +async_callback_func = _spawn_callback_in_thread + +def set_async_callback_func(callback_func): + global async_callback_func + async_callback_func = callback_func + +def _spawn_callback_async(callback, args): + async_callback_func(callback, args) + + +cdef class CallCredentials: + + cdef grpc_call_credentials *c(self) except *: + raise NotImplementedError() + + +cdef int _get_metadata(void *state, + grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, + void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, + grpc_status_code *status, + const char **error_details) except * with gil: + cdef size_t metadata_count + cdef grpc_metadata *c_metadata + def callback(metadata, grpc_status_code status, bytes error_details): + cdef char* c_error_details = NULL + if error_details is not None: + c_error_details = <char*> error_details + if status == StatusCode.ok: + _store_c_metadata(metadata, &c_metadata, &metadata_count) + with nogil: + cb(user_data, c_metadata, metadata_count, status, NULL) + _release_c_metadata(c_metadata, metadata_count) + else: + with nogil: + cb(user_data, NULL, 0, status, c_error_details) + args = context.service_url, context.method_name, callback, + plugin = <object>state + if plugin._stored_ctx is not None: + plugin._stored_ctx.copy().run(_spawn_callback_async, plugin, args) + else: + _spawn_callback_async(<object>state, args) + return 0 # Asynchronous return + + +cdef void _destroy(void *state) except * with gil: + cpython.Py_DECREF(<object>state) + grpc_shutdown() + + +cdef class MetadataPluginCallCredentials(CallCredentials): + + def __cinit__(self, metadata_plugin, name): + self._metadata_plugin = metadata_plugin + self._name = name + + cdef grpc_call_credentials *c(self) except *: + cdef grpc_metadata_credentials_plugin c_metadata_plugin + c_metadata_plugin.get_metadata = _get_metadata + c_metadata_plugin.destroy = _destroy + c_metadata_plugin.state = <void *>self._metadata_plugin + c_metadata_plugin.type = self._name + cpython.Py_INCREF(self._metadata_plugin) + fork_handlers_and_grpc_init() + # TODO(yihuazhang): Expose min_security_level via the Python API so that + # applications can decide what minimum security level their plugins require. + return grpc_metadata_credentials_create_from_plugin(c_metadata_plugin, GRPC_PRIVACY_AND_INTEGRITY, NULL) + + +cdef grpc_call_credentials *_composition(call_credentialses): + call_credentials_iterator = iter(call_credentialses) + cdef CallCredentials composition = next(call_credentials_iterator) + cdef grpc_call_credentials *c_composition = composition.c() + cdef CallCredentials additional_call_credentials + cdef grpc_call_credentials *c_additional_call_credentials + cdef grpc_call_credentials *c_next_composition + for additional_call_credentials in call_credentials_iterator: + c_additional_call_credentials = additional_call_credentials.c() + c_next_composition = grpc_composite_call_credentials_create( + c_composition, c_additional_call_credentials, NULL) + grpc_call_credentials_release(c_composition) + grpc_call_credentials_release(c_additional_call_credentials) + c_composition = c_next_composition + return c_composition + + +cdef class CompositeCallCredentials(CallCredentials): + + def __cinit__(self, call_credentialses): + self._call_credentialses = call_credentialses + + cdef grpc_call_credentials *c(self) except *: + return _composition(self._call_credentialses) + + +cdef class ChannelCredentials: + + cdef grpc_channel_credentials *c(self) except *: + raise NotImplementedError() + + +cdef class SSLSessionCacheLRU: + + def __cinit__(self, capacity): + fork_handlers_and_grpc_init() + self._cache = grpc_ssl_session_cache_create_lru(capacity) + + def __int__(self): + return <uintptr_t>self._cache + + def __dealloc__(self): + if self._cache != NULL: + grpc_ssl_session_cache_destroy(self._cache) + grpc_shutdown() + + +cdef class SSLChannelCredentials(ChannelCredentials): + + def __cinit__(self, pem_root_certificates, private_key, certificate_chain): + if pem_root_certificates is not None and not isinstance(pem_root_certificates, bytes): + raise TypeError('expected certificate to be bytes, got %s' % (type(pem_root_certificates))) + self._pem_root_certificates = pem_root_certificates + self._private_key = private_key + self._certificate_chain = certificate_chain + + cdef grpc_channel_credentials *c(self) except *: + cdef const char *c_pem_root_certificates + cdef grpc_ssl_pem_key_cert_pair c_pem_key_certificate_pair + if self._pem_root_certificates is None: + c_pem_root_certificates = NULL + else: + c_pem_root_certificates = self._pem_root_certificates + if self._private_key is None and self._certificate_chain is None: + return grpc_ssl_credentials_create( + c_pem_root_certificates, NULL, NULL, NULL) + else: + if self._private_key: + c_pem_key_certificate_pair.private_key = self._private_key + else: + c_pem_key_certificate_pair.private_key = NULL + if self._certificate_chain: + c_pem_key_certificate_pair.certificate_chain = self._certificate_chain + else: + c_pem_key_certificate_pair.certificate_chain = NULL + return grpc_ssl_credentials_create( + c_pem_root_certificates, &c_pem_key_certificate_pair, NULL, NULL) + + +cdef class CompositeChannelCredentials(ChannelCredentials): + + def __cinit__(self, call_credentialses, channel_credentials): + self._call_credentialses = call_credentialses + self._channel_credentials = channel_credentials + + cdef grpc_channel_credentials *c(self) except *: + cdef grpc_channel_credentials *c_channel_credentials + c_channel_credentials = self._channel_credentials.c() + cdef grpc_call_credentials *c_call_credentials_composition = _composition( + self._call_credentialses) + cdef grpc_channel_credentials *composition + c_composition = grpc_composite_channel_credentials_create( + c_channel_credentials, c_call_credentials_composition, NULL) + grpc_channel_credentials_release(c_channel_credentials) + grpc_call_credentials_release(c_call_credentials_composition) + return c_composition + + +cdef class XDSChannelCredentials(ChannelCredentials): + + def __cinit__(self, fallback_credentials): + self._fallback_credentials = fallback_credentials + + cdef grpc_channel_credentials *c(self) except *: + cdef grpc_channel_credentials *c_fallback_creds = self._fallback_credentials.c() + cdef grpc_channel_credentials *xds_creds = grpc_xds_credentials_create(c_fallback_creds) + grpc_channel_credentials_release(c_fallback_creds) + return xds_creds + + +cdef class ServerCertificateConfig: + + def __cinit__(self): + fork_handlers_and_grpc_init() + self.c_cert_config = NULL + self.c_pem_root_certs = NULL + self.c_ssl_pem_key_cert_pairs = NULL + self.references = [] + + def __dealloc__(self): + grpc_ssl_server_certificate_config_destroy(self.c_cert_config) + gpr_free(self.c_ssl_pem_key_cert_pairs) + grpc_shutdown() + + +cdef class ServerCredentials: + + def __cinit__(self): + fork_handlers_and_grpc_init() + self.c_credentials = NULL + self.references = [] + self.initial_cert_config = None + self.cert_config_fetcher = None + self.initial_cert_config_fetched = False + + def __dealloc__(self): + if self.c_credentials != NULL: + grpc_server_credentials_release(self.c_credentials) + grpc_shutdown() + +cdef const char* _get_c_pem_root_certs(pem_root_certs): + if pem_root_certs is None: + return NULL + else: + return pem_root_certs + +cdef grpc_ssl_pem_key_cert_pair* _create_c_ssl_pem_key_cert_pairs(pem_key_cert_pairs): + # return a malloc'ed grpc_ssl_pem_key_cert_pair from a _list_ of SslPemKeyCertPair + for pair in pem_key_cert_pairs: + if not isinstance(pair, SslPemKeyCertPair): + raise TypeError("expected pem_key_cert_pairs to be sequence of " + "SslPemKeyCertPair") + cdef size_t c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs) + cdef grpc_ssl_pem_key_cert_pair* c_ssl_pem_key_cert_pairs = NULL + with nogil: + c_ssl_pem_key_cert_pairs = ( + <grpc_ssl_pem_key_cert_pair *>gpr_malloc( + sizeof(grpc_ssl_pem_key_cert_pair) * c_ssl_pem_key_cert_pairs_count)) + for i in range(c_ssl_pem_key_cert_pairs_count): + c_ssl_pem_key_cert_pairs[i] = ( + (<SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair) + return c_ssl_pem_key_cert_pairs + +def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs, + bint force_client_auth): + pem_root_certs = str_to_bytes(pem_root_certs) + pem_key_cert_pairs = list(pem_key_cert_pairs) + cdef ServerCredentials credentials = ServerCredentials() + credentials.references.append(pem_root_certs) + credentials.references.append(pem_key_cert_pairs) + cdef const char * c_pem_root_certs = _get_c_pem_root_certs(pem_root_certs) + credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs) + credentials.c_ssl_pem_key_cert_pairs = _create_c_ssl_pem_key_cert_pairs(pem_key_cert_pairs) + cdef grpc_ssl_server_certificate_config *c_cert_config = NULL + c_cert_config = grpc_ssl_server_certificate_config_create( + c_pem_root_certs, credentials.c_ssl_pem_key_cert_pairs, + credentials.c_ssl_pem_key_cert_pairs_count) + cdef grpc_ssl_server_credentials_options* c_options = NULL + # C-core assumes ownership of c_cert_config + c_options = grpc_ssl_server_credentials_create_options_using_config( + GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY + if force_client_auth else + GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, + c_cert_config) + # C-core assumes ownership of c_options + credentials.c_credentials = grpc_ssl_server_credentials_create_with_options(c_options) + return credentials + +def server_certificate_config_ssl(pem_root_certs, pem_key_cert_pairs): + pem_root_certs = str_to_bytes(pem_root_certs) + pem_key_cert_pairs = list(pem_key_cert_pairs) + cdef ServerCertificateConfig cert_config = ServerCertificateConfig() + cert_config.references.append(pem_root_certs) + cert_config.references.append(pem_key_cert_pairs) + cert_config.c_pem_root_certs = _get_c_pem_root_certs(pem_root_certs) + cert_config.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs) + cert_config.c_ssl_pem_key_cert_pairs = _create_c_ssl_pem_key_cert_pairs(pem_key_cert_pairs) + cert_config.c_cert_config = grpc_ssl_server_certificate_config_create( + cert_config.c_pem_root_certs, cert_config.c_ssl_pem_key_cert_pairs, + cert_config.c_ssl_pem_key_cert_pairs_count) + return cert_config + +def server_credentials_ssl_dynamic_cert_config(initial_cert_config, + cert_config_fetcher, + bint force_client_auth): + if not isinstance(initial_cert_config, grpc.ServerCertificateConfiguration): + raise TypeError( + 'initial_cert_config must be a grpc.ServerCertificateConfiguration') + if not callable(cert_config_fetcher): + raise TypeError('cert_config_fetcher must be callable') + cdef ServerCredentials credentials = ServerCredentials() + credentials.initial_cert_config = initial_cert_config + credentials.cert_config_fetcher = cert_config_fetcher + cdef grpc_ssl_server_credentials_options* c_options = NULL + c_options = grpc_ssl_server_credentials_create_options_using_config_fetcher( + GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY + if force_client_auth else + GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, + _server_cert_config_fetcher_wrapper, + <void*>credentials) + # C-core assumes ownership of c_options + credentials.c_credentials = grpc_ssl_server_credentials_create_with_options(c_options) + return credentials + +cdef grpc_ssl_certificate_config_reload_status _server_cert_config_fetcher_wrapper( + void* user_data, grpc_ssl_server_certificate_config **config) with gil: + # This is a credentials.ServerCertificateConfig + cdef ServerCertificateConfig cert_config = None + if not user_data: + raise ValueError('internal error: user_data must be specified') + credentials = <ServerCredentials>user_data + if not credentials.initial_cert_config_fetched: + # C-core is asking for the initial cert config + credentials.initial_cert_config_fetched = True + cert_config = credentials.initial_cert_config._certificate_configuration + else: + user_cb = credentials.cert_config_fetcher + try: + cert_config_wrapper = user_cb() + except Exception: + _LOGGER.exception('Error fetching certificate config') + return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_FAIL + if cert_config_wrapper is None: + return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_UNCHANGED + elif not isinstance( + cert_config_wrapper, grpc.ServerCertificateConfiguration): + _LOGGER.error( + 'Error fetching certificate configuration: certificate ' + 'configuration must be of type grpc.ServerCertificateConfiguration, ' + 'not %s' % type(cert_config_wrapper).__name__) + return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_FAIL + else: + cert_config = cert_config_wrapper._certificate_configuration + config[0] = <grpc_ssl_server_certificate_config*>cert_config.c_cert_config + # our caller will assume ownership of memory, so we have to recreate + # a copy of c_cert_config here + cert_config.c_cert_config = grpc_ssl_server_certificate_config_create( + cert_config.c_pem_root_certs, cert_config.c_ssl_pem_key_cert_pairs, + cert_config.c_ssl_pem_key_cert_pairs_count) + return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_NEW + + +class LocalConnectionType: + uds = UDS + local_tcp = LOCAL_TCP + +cdef class LocalChannelCredentials(ChannelCredentials): + + def __cinit__(self, grpc_local_connect_type local_connect_type): + self._local_connect_type = local_connect_type + + cdef grpc_channel_credentials *c(self) except *: + cdef grpc_local_connect_type local_connect_type + local_connect_type = self._local_connect_type + return grpc_local_credentials_create(local_connect_type) + +def channel_credentials_local(grpc_local_connect_type local_connect_type): + return LocalChannelCredentials(local_connect_type) + +cdef class InsecureChannelCredentials(ChannelCredentials): + + cdef grpc_channel_credentials *c(self) except *: + return grpc_insecure_credentials_create() + +def channel_credentials_insecure(): + return InsecureChannelCredentials() + +def server_credentials_local(grpc_local_connect_type local_connect_type): + cdef ServerCredentials credentials = ServerCredentials() + credentials.c_credentials = grpc_local_server_credentials_create(local_connect_type) + return credentials + +def xds_server_credentials(ServerCredentials fallback_credentials): + cdef ServerCredentials credentials = ServerCredentials() + credentials.c_credentials = grpc_xds_server_credentials_create(fallback_credentials.c_credentials) + # NOTE: We do not need to call grpc_server_credentials_release on the + # fallback credentials here because this will be done by the __dealloc__ + # method of its Cython wrapper. + return credentials + +def insecure_server_credentials(): + cdef ServerCredentials credentials = ServerCredentials() + credentials.c_credentials = grpc_insecure_server_credentials_create() + return credentials + +cdef class ALTSChannelCredentials(ChannelCredentials): + + def __cinit__(self, list service_accounts): + self.c_options = grpc_alts_credentials_client_options_create() + cdef str account + for account in service_accounts: + grpc_alts_credentials_client_options_add_target_service_account( + self.c_options, str_to_bytes(account)) + + def __dealloc__(self): + if self.c_options != NULL: + grpc_alts_credentials_options_destroy(self.c_options) + + cdef grpc_channel_credentials *c(self) except *: + return grpc_alts_credentials_create(self.c_options) + + +def channel_credentials_alts(list service_accounts): + return ALTSChannelCredentials(service_accounts) + + +def server_credentials_alts(): + cdef ServerCredentials credentials = ServerCredentials() + cdef grpc_alts_credentials_options* c_options = grpc_alts_credentials_server_options_create() + credentials.c_credentials = grpc_alts_server_credentials_create(c_options) + # Options can be destroyed as deep copy was performed. + grpc_alts_credentials_options_destroy(c_options) + return credentials + + +cdef class ComputeEngineChannelCredentials(ChannelCredentials): + cdef grpc_channel_credentials* _c_creds + cdef grpc_call_credentials* _call_creds + + def __cinit__(self, CallCredentials call_creds): + self._c_creds = NULL + self._call_creds = call_creds.c() + if self._call_creds == NULL: + raise ValueError("Call credentials may not be NULL.") + + cdef grpc_channel_credentials *c(self) except *: + self._c_creds = grpc_google_default_credentials_create(self._call_creds) + return self._c_creds + + +def channel_credentials_compute_engine(call_creds): + return ComputeEngineChannelCredentials(call_creds) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/csds.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/csds.pyx.pxi new file mode 100644 index 00000000000..c33eb76e47f --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/csds.pyx.pxi @@ -0,0 +1,21 @@ +# Copyright 2021 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def dump_xds_configs(): + cdef grpc_slice client_config_in_slice + with nogil: + client_config_in_slice = grpc_dump_xds_configs() + cdef bytes result = _slice_bytes(client_config_in_slice) + return result diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pxd.pxi new file mode 100644 index 00000000000..0f173c6bd21 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pxd.pxi @@ -0,0 +1,47 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class BaseEvent: + pass + +cdef class ConnectivityEvent(BaseEvent): + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + + +cdef class RequestCallEvent(BaseEvent): + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + cdef readonly Call call + cdef readonly CallDetails call_details + cdef readonly tuple invocation_metadata + + +cdef class BatchOperationEvent(BaseEvent): + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + cdef readonly object batch_operations + + +cdef class ServerShutdownEvent(BaseEvent): + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pyx.pxi new file mode 100644 index 00000000000..49cd0392553 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pyx.pxi @@ -0,0 +1,54 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class ConnectivityEvent(BaseEvent): + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag): + self.completion_type = completion_type + self.success = success + self.tag = tag + + +cdef class RequestCallEvent(BaseEvent): + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag, + Call call, CallDetails call_details, tuple invocation_metadata): + self.completion_type = completion_type + self.success = success + self.tag = tag + self.call = call + self.call_details = call_details + self.invocation_metadata = invocation_metadata + + +cdef class BatchOperationEvent(BaseEvent): + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag, + object batch_operations): + self.completion_type = completion_type + self.success = success + self.tag = tag + self.batch_operations = batch_operations + + +cdef class ServerShutdownEvent(BaseEvent): + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag): + self.completion_type = completion_type + self.success = success + self.tag = tag diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi new file mode 100644 index 00000000000..a925bdd2e69 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi @@ -0,0 +1,29 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef extern from "pthread.h" nogil: + int pthread_atfork( + void (*prepare)() nogil, + void (*parent)() nogil, + void (*child)() nogil) + + +cdef void __prefork() nogil + + +cdef void __postfork_parent() nogil + + +cdef void __postfork_child() nogil
\ No newline at end of file diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi new file mode 100644 index 00000000000..53657e8b1a9 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi @@ -0,0 +1,208 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +_AWAIT_THREADS_TIMEOUT_SECONDS = 5 + +_TRUE_VALUES = ['yes', 'Yes', 'YES', 'true', 'True', 'TRUE', '1'] + +# This flag enables experimental support within gRPC Python for applications +# that will fork() without exec(). When enabled, gRPC Python will attempt to +# pause all of its internally created threads before the fork syscall proceeds. +# +# For this to be successful, the application must not have multiple threads of +# its own calling into gRPC when fork is invoked. Any callbacks from gRPC +# Python-spawned threads into user code (e.g., callbacks for asynchronous RPCs) +# must not block and should execute quickly. +# +# This flag is not supported on Windows. +# This flag is also not supported for non-native IO manager. +_GRPC_ENABLE_FORK_SUPPORT = ( + os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0') + .lower() in _TRUE_VALUES) + +_fork_handler_failed = False + +cdef void __prefork() nogil: + with gil: + global _fork_handler_failed + _fork_handler_failed = False + with _fork_state.fork_in_progress_condition: + _fork_state.fork_in_progress = True + if not _fork_state.active_thread_count.await_zero_threads( + _AWAIT_THREADS_TIMEOUT_SECONDS): + _LOGGER.error( + 'Failed to shutdown gRPC Python threads prior to fork. ' + 'Behavior after fork will be undefined.') + _fork_handler_failed = True + + +cdef void __postfork_parent() nogil: + with gil: + with _fork_state.fork_in_progress_condition: + _fork_state.fork_in_progress = False + _fork_state.fork_in_progress_condition.notify_all() + + +cdef void __postfork_child() nogil: + with gil: + try: + if _fork_handler_failed: + return + # Thread could be holding the fork_in_progress_condition inside of + # block_if_fork_in_progress() when fork occurs. Reset the lock here. + _fork_state.fork_in_progress_condition = threading.Condition() + # A thread in return_from_user_request_generator() may hold this lock + # when fork occurs. + _fork_state.active_thread_count = _ActiveThreadCount() + for state_to_reset in _fork_state.postfork_states_to_reset: + state_to_reset.reset_postfork_child() + _fork_state.postfork_states_to_reset = [] + _fork_state.fork_epoch += 1 + for channel in _fork_state.channels: + channel._close_on_fork() + with _fork_state.fork_in_progress_condition: + _fork_state.fork_in_progress = False + except: + _LOGGER.error('Exiting child due to raised exception') + _LOGGER.error(sys.exc_info()[0]) + os._exit(os.EX_USAGE) + + if grpc_is_initialized() > 0: + with gil: + _LOGGER.error('Failed to shutdown gRPC Core after fork()') + os._exit(os.EX_USAGE) + + +def fork_handlers_and_grpc_init(): + grpc_init() + if _GRPC_ENABLE_FORK_SUPPORT: + with _fork_state.fork_handler_registered_lock: + if not _fork_state.fork_handler_registered: + pthread_atfork(&__prefork, &__postfork_parent, &__postfork_child) + _fork_state.fork_handler_registered = True + + + + +class ForkManagedThread(object): + def __init__(self, target, args=()): + if _GRPC_ENABLE_FORK_SUPPORT: + def managed_target(*args): + try: + target(*args) + finally: + _fork_state.active_thread_count.decrement() + self._thread = threading.Thread(target=_run_with_context(managed_target), args=args) + else: + self._thread = threading.Thread(target=_run_with_context(target), args=args) + + def setDaemon(self, daemonic): + self._thread.daemon = daemonic + + def start(self): + if _GRPC_ENABLE_FORK_SUPPORT: + _fork_state.active_thread_count.increment() + self._thread.start() + + def join(self): + self._thread.join() + + +def block_if_fork_in_progress(postfork_state_to_reset=None): + if _GRPC_ENABLE_FORK_SUPPORT: + with _fork_state.fork_in_progress_condition: + if not _fork_state.fork_in_progress: + return + if postfork_state_to_reset is not None: + _fork_state.postfork_states_to_reset.append(postfork_state_to_reset) + _fork_state.active_thread_count.decrement() + _fork_state.fork_in_progress_condition.wait() + _fork_state.active_thread_count.increment() + + +def enter_user_request_generator(): + if _GRPC_ENABLE_FORK_SUPPORT: + _fork_state.active_thread_count.decrement() + + +def return_from_user_request_generator(): + if _GRPC_ENABLE_FORK_SUPPORT: + _fork_state.active_thread_count.increment() + block_if_fork_in_progress() + + +def get_fork_epoch(): + return _fork_state.fork_epoch + + +def is_fork_support_enabled(): + return _GRPC_ENABLE_FORK_SUPPORT + + +def fork_register_channel(channel): + if _GRPC_ENABLE_FORK_SUPPORT: + _fork_state.channels.add(channel) + + +def fork_unregister_channel(channel): + if _GRPC_ENABLE_FORK_SUPPORT: + _fork_state.channels.discard(channel) + + +class _ActiveThreadCount(object): + def __init__(self): + self._num_active_threads = 0 + self._condition = threading.Condition() + + def increment(self): + with self._condition: + self._num_active_threads += 1 + + def decrement(self): + with self._condition: + self._num_active_threads -= 1 + if self._num_active_threads == 0: + self._condition.notify_all() + + def await_zero_threads(self, timeout_secs): + end_time = time.time() + timeout_secs + wait_time = timeout_secs + with self._condition: + while True: + if self._num_active_threads > 0: + self._condition.wait(wait_time) + if self._num_active_threads == 0: + return True + # Thread count may have increased before this re-obtains the + # lock after a notify(). Wait again until timeout_secs has + # elapsed. + wait_time = end_time - time.time() + if wait_time <= 0: + return False + + +class _ForkState(object): + def __init__(self): + self.fork_in_progress_condition = threading.Condition() + self.fork_in_progress = False + self.postfork_states_to_reset = [] + self.fork_handler_registered_lock = threading.Lock() + self.fork_handler_registered = False + self.active_thread_count = _ActiveThreadCount() + self.fork_epoch = 0 + self.channels = set() + + +_fork_state = _ForkState() diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_windows.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_windows.pyx.pxi new file mode 100644 index 00000000000..67aaf4d033d --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_windows.pyx.pxi @@ -0,0 +1,61 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# No-op implementations for Windows. + +def fork_handlers_and_grpc_init(): + grpc_init() + + +class ForkManagedThread(object): + def __init__(self, target, args=()): + self._thread = threading.Thread(target=_run_with_context(target), args=args) + + def setDaemon(self, daemonic): + self._thread.daemon = daemonic + + def start(self): + self._thread.start() + + def join(self): + self._thread.join() + + +def block_if_fork_in_progress(postfork_state_to_reset=None): + pass + + +def enter_user_request_generator(): + pass + + +def return_from_user_request_generator(): + pass + + +def get_fork_epoch(): + return 0 + + +def is_fork_support_enabled(): + return False + + +def fork_register_channel(channel): + pass + + +def fork_unregister_channel(channel): + pass diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi new file mode 100644 index 00000000000..6e04e0cbfd4 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi @@ -0,0 +1,735 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cimport libc.time + +ctypedef ssize_t intptr_t +ctypedef size_t uintptr_t +ctypedef signed char int8_t +ctypedef signed short int16_t +ctypedef signed int int32_t +ctypedef signed long long int64_t +ctypedef unsigned char uint8_t +ctypedef unsigned short uint16_t +ctypedef unsigned int uint32_t +ctypedef unsigned long long uint64_t + +# C++ Utilities + +# NOTE(lidiz) Unfortunately, we can't use "cimport" here because Cython +# links it with exception handling. It introduces new dependencies. +cdef extern from "<queue>" namespace "std" nogil: + cdef cppclass queue[T]: + queue() + bint empty() + T& front() + T& back() + void pop() + void push(T&) + size_t size() + + +cdef extern from "<mutex>" namespace "std" nogil: + cdef cppclass mutex: + mutex() + void lock() + void unlock() + + cdef cppclass unique_lock[Mutex]: + unique_lock(Mutex&) + +cdef extern from "<condition_variable>" namespace "std" nogil: + cdef cppclass condition_variable: + condition_variable() + void notify_all() + void wait(unique_lock[mutex]&) + +# gRPC Core Declarations + +cdef extern from "grpc/support/alloc.h": + + void *gpr_malloc(size_t size) nogil + void *gpr_zalloc(size_t size) nogil + void gpr_free(void *ptr) nogil + void *gpr_realloc(void *p, size_t size) nogil + + +cdef extern from "grpc/byte_buffer_reader.h": + + struct grpc_byte_buffer_reader: + # We don't care about the internals + pass + + +cdef extern from "grpc/impl/codegen/grpc_types.h": + ctypedef struct grpc_completion_queue_functor: + void (*functor_run)(grpc_completion_queue_functor*, int); + + +cdef extern from "grpc/grpc.h": + + ctypedef struct grpc_slice: + # don't worry about writing out the members of grpc_slice; we never access + # them directly. + pass + + grpc_slice grpc_slice_ref(grpc_slice s) nogil + void grpc_slice_unref(grpc_slice s) nogil + grpc_slice grpc_empty_slice() nogil + grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil + grpc_slice grpc_slice_new_with_len( + void *p, size_t len, void (*destroy)(void *, size_t)) nogil + grpc_slice grpc_slice_malloc(size_t length) nogil + grpc_slice grpc_slice_from_copied_string(const char *source) nogil + grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len) nogil + grpc_slice grpc_slice_copy(grpc_slice s) nogil + + # Declare functions for function-like macros (because Cython)... + void *grpc_slice_start_ptr "GRPC_SLICE_START_PTR" (grpc_slice s) nogil + size_t grpc_slice_length "GRPC_SLICE_LENGTH" (grpc_slice s) nogil + + const int GPR_MS_PER_SEC + const int GPR_US_PER_SEC + const int GPR_NS_PER_SEC + + ctypedef enum gpr_clock_type: + GPR_CLOCK_MONOTONIC + GPR_CLOCK_REALTIME + GPR_CLOCK_PRECISE + GPR_TIMESPAN + + ctypedef struct gpr_timespec: + int64_t seconds "tv_sec" + int32_t nanoseconds "tv_nsec" + gpr_clock_type clock_type + + gpr_timespec gpr_time_0(gpr_clock_type type) nogil + gpr_timespec gpr_inf_future(gpr_clock_type type) nogil + gpr_timespec gpr_inf_past(gpr_clock_type type) nogil + + gpr_timespec gpr_now(gpr_clock_type clock) nogil + + gpr_timespec gpr_convert_clock_type(gpr_timespec t, + gpr_clock_type target_clock) nogil + + gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) nogil + gpr_timespec gpr_time_from_nanos(int64_t ns, gpr_clock_type type) nogil + double gpr_timespec_to_micros(gpr_timespec t) nogil + + gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) nogil + + int gpr_time_cmp(gpr_timespec a, gpr_timespec b) nogil + + ctypedef struct grpc_byte_buffer: + # We don't care about the internals. + pass + + grpc_byte_buffer *grpc_raw_byte_buffer_create(grpc_slice *slices, + size_t nslices) nogil + size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil + void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil + + int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, + grpc_byte_buffer *buffer) nogil + int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, + grpc_slice *slice) nogil + void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil + + ctypedef enum grpc_status_code: + GRPC_STATUS_OK + GRPC_STATUS_CANCELLED + GRPC_STATUS_UNKNOWN + GRPC_STATUS_INVALID_ARGUMENT + GRPC_STATUS_DEADLINE_EXCEEDED + GRPC_STATUS_NOT_FOUND + GRPC_STATUS_ALREADY_EXISTS + GRPC_STATUS_PERMISSION_DENIED + GRPC_STATUS_UNAUTHENTICATED + GRPC_STATUS_RESOURCE_EXHAUSTED + GRPC_STATUS_FAILED_PRECONDITION + GRPC_STATUS_ABORTED + GRPC_STATUS_OUT_OF_RANGE + GRPC_STATUS_UNIMPLEMENTED + GRPC_STATUS_INTERNAL + GRPC_STATUS_UNAVAILABLE + GRPC_STATUS_DATA_LOSS + GRPC_STATUS__DO_NOT_USE + + const char *GRPC_ARG_ENABLE_CENSUS + const char *GRPC_ARG_MAX_CONCURRENT_STREAMS + const char *GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH + const char *GRPC_ARG_MAX_SEND_MESSAGE_LENGTH + const char *GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER + const char *GRPC_ARG_DEFAULT_AUTHORITY + const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING + const char *GRPC_ARG_SECONDARY_USER_AGENT_STRING + const char *GRPC_SSL_TARGET_NAME_OVERRIDE_ARG + const char *GRPC_SSL_SESSION_CACHE_ARG + const char *_GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM \ + "GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM" + const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL + const char *GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET + + const int GRPC_WRITE_BUFFER_HINT + const int GRPC_WRITE_NO_COMPRESS + const int GRPC_WRITE_USED_MASK + + const int GRPC_INITIAL_METADATA_WAIT_FOR_READY + const int GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET + const int GRPC_INITIAL_METADATA_USED_MASK + + const int GRPC_MAX_COMPLETION_QUEUE_PLUCKERS + + ctypedef struct grpc_completion_queue: + # We don't care about the internals (and in fact don't know them) + pass + + ctypedef struct grpc_channel: + # We don't care about the internals (and in fact don't know them) + pass + + ctypedef struct grpc_server: + # We don't care about the internals (and in fact don't know them) + pass + + ctypedef struct grpc_call: + # We don't care about the internals (and in fact don't know them) + pass + + ctypedef enum grpc_arg_type: + GRPC_ARG_STRING + GRPC_ARG_INTEGER + GRPC_ARG_POINTER + + ctypedef struct grpc_arg_pointer_vtable: + void *(*copy)(void *) + void (*destroy)(void *) + int (*cmp)(void *, void *) + + ctypedef struct grpc_arg_value_pointer: + void *address "p" + grpc_arg_pointer_vtable *vtable + + union grpc_arg_value: + char *string + int integer + grpc_arg_value_pointer pointer + + ctypedef struct grpc_arg: + grpc_arg_type type + char *key + grpc_arg_value value + + ctypedef struct grpc_channel_args: + size_t arguments_length "num_args" + grpc_arg *arguments "args" + + ctypedef enum grpc_stream_compression_level: + GRPC_STREAM_COMPRESS_LEVEL_NONE + GRPC_STREAM_COMPRESS_LEVEL_LOW + GRPC_STREAM_COMPRESS_LEVEL_MED + GRPC_STREAM_COMPRESS_LEVEL_HIGH + + ctypedef enum grpc_call_error: + GRPC_CALL_OK + GRPC_CALL_ERROR + GRPC_CALL_ERROR_NOT_ON_SERVER + GRPC_CALL_ERROR_NOT_ON_CLIENT + GRPC_CALL_ERROR_ALREADY_ACCEPTED + GRPC_CALL_ERROR_ALREADY_INVOKED + GRPC_CALL_ERROR_NOT_INVOKED + GRPC_CALL_ERROR_ALREADY_FINISHED + GRPC_CALL_ERROR_TOO_MANY_OPERATIONS + GRPC_CALL_ERROR_INVALID_FLAGS + GRPC_CALL_ERROR_INVALID_METADATA + + ctypedef enum grpc_cq_completion_type: + GRPC_CQ_NEXT + GRPC_CQ_PLUCK + + ctypedef enum grpc_cq_polling_type: + GRPC_CQ_DEFAULT_POLLING + GRPC_CQ_NON_LISTENING + GRPC_CQ_NON_POLLING + + ctypedef struct grpc_completion_queue_attributes: + int version + grpc_cq_completion_type cq_completion_type + grpc_cq_polling_type cq_polling_type + void* cq_shutdown_cb + + ctypedef enum grpc_connectivity_state: + GRPC_CHANNEL_IDLE + GRPC_CHANNEL_CONNECTING + GRPC_CHANNEL_READY + GRPC_CHANNEL_TRANSIENT_FAILURE + GRPC_CHANNEL_SHUTDOWN + + ctypedef struct grpc_metadata: + grpc_slice key + grpc_slice value + # ignore the 'internal_data.obfuscated' fields. + + ctypedef enum grpc_completion_type: + GRPC_QUEUE_SHUTDOWN + GRPC_QUEUE_TIMEOUT + GRPC_OP_COMPLETE + + ctypedef struct grpc_event: + grpc_completion_type type + int success + void *tag + + ctypedef struct grpc_metadata_array: + size_t count + size_t capacity + grpc_metadata *metadata + + void grpc_metadata_array_init(grpc_metadata_array *array) nogil + void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil + + ctypedef struct grpc_call_details: + grpc_slice method + grpc_slice host + gpr_timespec deadline + + void grpc_call_details_init(grpc_call_details *details) nogil + void grpc_call_details_destroy(grpc_call_details *details) nogil + + ctypedef enum grpc_op_type: + GRPC_OP_SEND_INITIAL_METADATA + GRPC_OP_SEND_MESSAGE + GRPC_OP_SEND_CLOSE_FROM_CLIENT + GRPC_OP_SEND_STATUS_FROM_SERVER + GRPC_OP_RECV_INITIAL_METADATA + GRPC_OP_RECV_MESSAGE + GRPC_OP_RECV_STATUS_ON_CLIENT + GRPC_OP_RECV_CLOSE_ON_SERVER + + ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level: + uint8_t is_set + grpc_compression_level level + + ctypedef struct grpc_op_data_send_initial_metadata: + size_t count + grpc_metadata *metadata + grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level + + ctypedef struct grpc_op_data_send_status_from_server: + size_t trailing_metadata_count + grpc_metadata *trailing_metadata + grpc_status_code status + grpc_slice *status_details + + ctypedef struct grpc_op_data_recv_status_on_client: + grpc_metadata_array *trailing_metadata + grpc_status_code *status + grpc_slice *status_details + char** error_string + + ctypedef struct grpc_op_data_recv_close_on_server: + int *cancelled + + ctypedef struct grpc_op_data_send_message: + grpc_byte_buffer *send_message + + ctypedef struct grpc_op_data_receive_message: + grpc_byte_buffer **receive_message "recv_message" + + ctypedef struct grpc_op_data_receive_initial_metadata: + grpc_metadata_array *receive_initial_metadata "recv_initial_metadata" + + union grpc_op_data: + grpc_op_data_send_initial_metadata send_initial_metadata + grpc_op_data_send_message send_message + grpc_op_data_send_status_from_server send_status_from_server + grpc_op_data_receive_initial_metadata receive_initial_metadata "recv_initial_metadata" + grpc_op_data_receive_message receive_message "recv_message" + grpc_op_data_recv_status_on_client receive_status_on_client "recv_status_on_client" + grpc_op_data_recv_close_on_server receive_close_on_server "recv_close_on_server" + + ctypedef struct grpc_op: + grpc_op_type type "op" + uint32_t flags + void * reserved + grpc_op_data data + + void grpc_dont_init_openssl() nogil + void grpc_init() nogil + void grpc_shutdown() nogil + void grpc_shutdown_blocking() nogil + int grpc_is_initialized() nogil + + ctypedef struct grpc_completion_queue_factory: + pass + + grpc_completion_queue_factory *grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes* attributes) nogil + grpc_completion_queue *grpc_completion_queue_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attr, void* reserved) nogil + grpc_completion_queue *grpc_completion_queue_create_for_next(void *reserved) nogil + + grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, + gpr_timespec deadline, + void *reserved) nogil + grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, + gpr_timespec deadline, + void *reserved) nogil + void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil + void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil + + grpc_completion_queue *grpc_completion_queue_create_for_callback( + grpc_completion_queue_functor* shutdown_callback, + void *reserved) nogil + + grpc_call_error grpc_call_start_batch( + grpc_call *call, const grpc_op *ops, size_t nops, void *tag, + void *reserved) nogil + const char* grpc_call_error_to_string(grpc_call_error error) nogil + grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) nogil + grpc_call_error grpc_call_cancel_with_status(grpc_call *call, + grpc_status_code status, + const char *description, + void *reserved) nogil + char *grpc_call_get_peer(grpc_call *call) nogil + void grpc_call_unref(grpc_call *call) nogil + + grpc_call *grpc_channel_create_call( + grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, + grpc_completion_queue *completion_queue, grpc_slice method, + const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil + grpc_connectivity_state grpc_channel_check_connectivity_state( + grpc_channel *channel, int try_to_connect) nogil + void grpc_channel_watch_connectivity_state( + grpc_channel *channel, grpc_connectivity_state last_observed_state, + gpr_timespec deadline, grpc_completion_queue *cq, void *tag) nogil + char *grpc_channel_get_target(grpc_channel *channel) nogil + void grpc_channel_destroy(grpc_channel *channel) nogil + + grpc_server *grpc_server_create( + const grpc_channel_args *args, void *reserved) nogil + grpc_call_error grpc_server_request_call( + grpc_server *server, grpc_call **call, grpc_call_details *details, + grpc_metadata_array *request_metadata, grpc_completion_queue + *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void + *tag_new) nogil + void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + void *reserved) nogil + + ctypedef struct grpc_server_config_fetcher: + pass + + void grpc_server_set_config_fetcher( + grpc_server* server, grpc_server_config_fetcher* config_fetcher) nogil + + ctypedef struct grpc_server_xds_status_notifier: + void (*on_serving_status_update)(void* user_data, const char* uri, + grpc_status_code code, + const char* error_message) + void* user_data; + + grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create( + grpc_server_xds_status_notifier notifier, + const grpc_channel_args* args) nogil + + + void grpc_server_start(grpc_server *server) nogil + void grpc_server_shutdown_and_notify( + grpc_server *server, grpc_completion_queue *cq, void *tag) nogil + void grpc_server_cancel_all_calls(grpc_server *server) nogil + void grpc_server_destroy(grpc_server *server) nogil + + char* grpc_channelz_get_top_channels(intptr_t start_channel_id) + char* grpc_channelz_get_servers(intptr_t start_server_id) + char* grpc_channelz_get_server(intptr_t server_id) + char* grpc_channelz_get_server_sockets(intptr_t server_id, + intptr_t start_socket_id, + intptr_t max_results) + char* grpc_channelz_get_channel(intptr_t channel_id) + char* grpc_channelz_get_subchannel(intptr_t subchannel_id) + char* grpc_channelz_get_socket(intptr_t socket_id) + + grpc_slice grpc_dump_xds_configs() nogil + + +cdef extern from "grpc/grpc_security.h": + + # Declare this as an enum, this is the only way to make it a const in + # cython + enum: GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX + + ctypedef enum grpc_ssl_roots_override_result: + GRPC_SSL_ROOTS_OVERRIDE_OK + GRPC_SSL_ROOTS_OVERRIDE_FAILED_PERMANENTLY + GRPC_SSL_ROOTS_OVERRIDE_FAILED + + ctypedef enum grpc_ssl_client_certificate_request_type: + GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, + GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY + GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY + GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY + GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY + + ctypedef enum grpc_security_level: + GRPC_SECURITY_MIN + GRPC_SECURITY_NONE = GRPC_SECURITY_MIN + GRPC_INTEGRITY_ONLY + GRPC_PRIVACY_AND_INTEGRITY + GRPC_SECURITY_MAX = GRPC_PRIVACY_AND_INTEGRITY + + ctypedef enum grpc_ssl_certificate_config_reload_status: + GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_UNCHANGED + GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_NEW + GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_FAIL + + ctypedef struct grpc_ssl_server_certificate_config: + # We don't care about the internals + pass + + ctypedef struct grpc_ssl_server_credentials_options: + # We don't care about the internals + pass + + grpc_ssl_server_certificate_config * grpc_ssl_server_certificate_config_create( + const char *pem_root_certs, + const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs) + + void grpc_ssl_server_certificate_config_destroy(grpc_ssl_server_certificate_config *config) + + ctypedef grpc_ssl_certificate_config_reload_status (*grpc_ssl_server_certificate_config_callback)( + void *user_data, + grpc_ssl_server_certificate_config **config) + + grpc_ssl_server_credentials_options *grpc_ssl_server_credentials_create_options_using_config( + grpc_ssl_client_certificate_request_type client_certificate_request, + grpc_ssl_server_certificate_config *certificate_config) + + grpc_ssl_server_credentials_options* grpc_ssl_server_credentials_create_options_using_config_fetcher( + grpc_ssl_client_certificate_request_type client_certificate_request, + grpc_ssl_server_certificate_config_callback cb, + void *user_data) + + grpc_server_credentials *grpc_ssl_server_credentials_create_with_options( + grpc_ssl_server_credentials_options *options) + + ctypedef struct grpc_ssl_pem_key_cert_pair: + const char *private_key + const char *certificate_chain "cert_chain" + + ctypedef struct grpc_channel_credentials: + # We don't care about the internals (and in fact don't know them) + pass + + ctypedef struct grpc_call_credentials: + # We don't care about the internals (and in fact don't know them) + pass + + ctypedef struct grpc_ssl_session_cache: + # We don't care about the internals (and in fact don't know them) + pass + + ctypedef struct verify_peer_options: + # We don't care about the internals (and in fact don't know them) + pass + + ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs) + + grpc_ssl_session_cache *grpc_ssl_session_cache_create_lru(size_t capacity) + void grpc_ssl_session_cache_destroy(grpc_ssl_session_cache* cache) + + void grpc_set_ssl_roots_override_callback( + grpc_ssl_roots_override_callback cb) nogil + + grpc_channel_credentials *grpc_google_default_credentials_create(grpc_call_credentials* call_credentials) nogil + grpc_channel_credentials *grpc_ssl_credentials_create( + const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair, + verify_peer_options *verify_options, void *reserved) nogil + grpc_channel_credentials *grpc_composite_channel_credentials_create( + grpc_channel_credentials *creds1, grpc_call_credentials *creds2, + void *reserved) nogil + void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil + + grpc_channel_credentials *grpc_xds_credentials_create( + grpc_channel_credentials *fallback_creds) nogil + + grpc_channel_credentials *grpc_insecure_credentials_create() nogil + + grpc_server_credentials *grpc_xds_server_credentials_create( + grpc_server_credentials *fallback_creds) nogil + + grpc_server_credentials *grpc_insecure_server_credentials_create() nogil + + grpc_call_credentials *grpc_composite_call_credentials_create( + grpc_call_credentials *creds1, grpc_call_credentials *creds2, + void *reserved) nogil + grpc_call_credentials *grpc_google_compute_engine_credentials_create( + void *reserved) nogil + grpc_call_credentials *grpc_service_account_jwt_access_credentials_create( + const char *json_key, + gpr_timespec token_lifetime, void *reserved) nogil + grpc_call_credentials *grpc_google_refresh_token_credentials_create( + const char *json_refresh_token, void *reserved) nogil + grpc_call_credentials *grpc_google_iam_credentials_create( + const char *authorization_token, const char *authority_selector, + void *reserved) nogil + void grpc_call_credentials_release(grpc_call_credentials *creds) nogil + + grpc_channel *grpc_channel_create( + const char *target, grpc_channel_credentials *creds, + const grpc_channel_args *args) nogil + + ctypedef struct grpc_server_credentials: + # We don't care about the internals (and in fact don't know them) + pass + + void grpc_server_credentials_release(grpc_server_credentials *creds) nogil + + int grpc_server_add_http2_port(grpc_server *server, const char *addr, + grpc_server_credentials *creds) nogil + + grpc_call_error grpc_call_set_credentials(grpc_call *call, + grpc_call_credentials *creds) nogil + + ctypedef struct grpc_auth_context: + # We don't care about the internals (and in fact don't know them) + pass + + ctypedef struct grpc_auth_metadata_context: + const char *service_url + const char *method_name + const grpc_auth_context *channel_auth_context + + ctypedef void (*grpc_credentials_plugin_metadata_cb)( + void *user_data, const grpc_metadata *creds_md, size_t num_creds_md, + grpc_status_code status, const char *error_details) nogil + + ctypedef struct grpc_metadata_credentials_plugin: + int (*get_metadata)( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) except * + void (*destroy)(void *state) except * + void *state + const char *type + + grpc_call_credentials *grpc_metadata_credentials_create_from_plugin( + grpc_metadata_credentials_plugin plugin, grpc_security_level min_security_level, void *reserved) nogil + + ctypedef struct grpc_auth_property_iterator: + pass + + ctypedef struct grpc_auth_property: + char *name + char *value + size_t value_length + + grpc_auth_property *grpc_auth_property_iterator_next( + grpc_auth_property_iterator *it) + + grpc_auth_property_iterator grpc_auth_context_property_iterator( + const grpc_auth_context *ctx) + + grpc_auth_property_iterator grpc_auth_context_peer_identity( + const grpc_auth_context *ctx) + + char *grpc_auth_context_peer_identity_property_name( + const grpc_auth_context *ctx) + + grpc_auth_property_iterator grpc_auth_context_find_properties_by_name( + const grpc_auth_context *ctx, const char *name) + + grpc_auth_context_peer_is_authenticated( + const grpc_auth_context *ctx) + + grpc_auth_context *grpc_call_auth_context(grpc_call *call) + + void grpc_auth_context_release(grpc_auth_context *context) + + grpc_channel_credentials *grpc_local_credentials_create( + grpc_local_connect_type type) + grpc_server_credentials *grpc_local_server_credentials_create( + grpc_local_connect_type type) + + ctypedef struct grpc_alts_credentials_options: + # We don't care about the internals (and in fact don't know them) + pass + + grpc_channel_credentials *grpc_alts_credentials_create( + const grpc_alts_credentials_options *options) + grpc_server_credentials *grpc_alts_server_credentials_create( + const grpc_alts_credentials_options *options) + + grpc_alts_credentials_options* grpc_alts_credentials_client_options_create() + grpc_alts_credentials_options* grpc_alts_credentials_server_options_create() + void grpc_alts_credentials_options_destroy(grpc_alts_credentials_options *options) + void grpc_alts_credentials_client_options_add_target_service_account(grpc_alts_credentials_options *options, const char *service_account) + + + +cdef extern from "grpc/compression.h": + + ctypedef enum grpc_compression_algorithm: + GRPC_COMPRESS_NONE + GRPC_COMPRESS_DEFLATE + GRPC_COMPRESS_GZIP + GRPC_COMPRESS_STREAM_GZIP + GRPC_COMPRESS_ALGORITHMS_COUNT + + ctypedef enum grpc_compression_level: + GRPC_COMPRESS_LEVEL_NONE + GRPC_COMPRESS_LEVEL_LOW + GRPC_COMPRESS_LEVEL_MED + GRPC_COMPRESS_LEVEL_HIGH + GRPC_COMPRESS_LEVEL_COUNT + + ctypedef struct grpc_compression_options: + uint32_t enabled_algorithms_bitset + + int grpc_compression_algorithm_parse( + grpc_slice value, grpc_compression_algorithm *algorithm) nogil + int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, + const char **name) nogil + grpc_compression_algorithm grpc_compression_algorithm_for_level( + grpc_compression_level level, uint32_t accepted_encodings) nogil + void grpc_compression_options_init(grpc_compression_options *opts) nogil + void grpc_compression_options_enable_algorithm( + grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil + void grpc_compression_options_disable_algorithm( + grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil + int grpc_compression_options_is_algorithm_enabled( + const grpc_compression_options *opts, + grpc_compression_algorithm algorithm) nogil + +cdef extern from "grpc/impl/codegen/compression_types.h": + + const char *_GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY \ + "GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY" + + +cdef extern from "grpc/grpc_security_constants.h": + ctypedef enum grpc_local_connect_type: + UDS + LOCAL_TCP + +cdef extern from "src/core/lib/config/config_vars.h" namespace "grpc_core": + cdef cppclass ConfigVars: + @staticmethod + void Reset() diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi new file mode 100644 index 00000000000..baa9fb54a3e --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi @@ -0,0 +1,21 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +g_gevent_threadpool = None +g_gevent_activated = False + +cpdef void gevent_increment_channel_count() + +cpdef void gevent_decrement_channel_count() diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi new file mode 100644 index 00000000000..41d27df5948 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi @@ -0,0 +1,137 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +from libc cimport string +from cython.operator cimport dereference + +from cpython cimport Py_INCREF, Py_DECREF + +import atexit +import errno +import sys + +gevent_hub = None +g_gevent_pool = None +g_gevent_threadpool = None +g_gevent_activated = False + + +cdef queue[void*] g_greenlets_to_run +cdef condition_variable g_greenlets_cv +cdef mutex g_greenlets_mu +cdef bint g_shutdown_greenlets_to_run_queue = False +cdef int g_channel_count = 0 + + +cdef _submit_to_greenlet_queue(object cb, tuple args): + cdef tuple to_call = (cb,) + args + cdef unique_lock[mutex]* lk + Py_INCREF(to_call) + with nogil: + lk = new unique_lock[mutex](g_greenlets_mu) + g_greenlets_to_run.push(<void*>(to_call)) + del lk + g_greenlets_cv.notify_all() + + +cpdef void gevent_increment_channel_count(): + global g_channel_count + cdef int old_channel_count + with nogil: + lk = new unique_lock[mutex](g_greenlets_mu) + old_channel_count = g_channel_count + g_channel_count += 1 + del lk + if old_channel_count == 0: + run_spawn_greenlets() + + +cpdef void gevent_decrement_channel_count(): + global g_channel_count + with nogil: + lk = new unique_lock[mutex](g_greenlets_mu) + g_channel_count -= 1 + if g_channel_count == 0: + g_greenlets_cv.notify_all() + del lk + + +cdef object await_next_greenlet(): + cdef unique_lock[mutex]* lk + with nogil: + # Cython doesn't allow us to do proper stack allocations, so we can't take + # advantage of RAII. + lk = new unique_lock[mutex](g_greenlets_mu) + while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0: + if not g_greenlets_to_run.empty(): + break + g_greenlets_cv.wait(dereference(lk)) + if g_channel_count == 0: + del lk + return None + if g_shutdown_greenlets_to_run_queue: + del lk + return None + cdef object to_call = <object>g_greenlets_to_run.front() + Py_DECREF(to_call) + g_greenlets_to_run.pop() + del lk + return to_call + +def spawn_greenlets(): + while True: + to_call = g_gevent_threadpool.apply(await_next_greenlet, ()) + if to_call is None: + break + fn = to_call[0] + args = to_call[1:] + fn(*args) + +def run_spawn_greenlets(): + g_gevent_pool.spawn(spawn_greenlets) + +def shutdown_await_next_greenlet(): + global g_shutdown_greenlets_to_run_queue + cdef unique_lock[mutex]* lk + with nogil: + lk = new unique_lock[mutex](g_greenlets_mu) + g_shutdown_greenlets_to_run_queue = True + del lk + g_greenlets_cv.notify_all() + +def init_grpc_gevent(): + # Lazily import gevent + global gevent_hub + global g_gevent_threadpool + global g_gevent_activated + global g_interrupt_check_period_ms + global g_gevent_pool + + import gevent + import gevent.pool + + gevent_hub = gevent.hub + g_gevent_threadpool = gevent_hub.get_hub().threadpool + + g_gevent_activated = True + g_interrupt_check_period_ms = 2000 + + g_gevent_pool = gevent.pool.Group() + + + set_async_callback_func(_submit_to_greenlet_queue) + + # TODO: Document how this all works. + atexit.register(shutdown_await_next_greenlet) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_string.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_string.pyx.pxi new file mode 100644 index 00000000000..5c1e0679a97 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_string.pyx.pxi @@ -0,0 +1,51 @@ +# Copyright 2016 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This function will ascii encode unicode string inputs if necessary. +# In Python3, unicode strings are the default str type. +cdef bytes str_to_bytes(object s): + if s is None or isinstance(s, bytes): + return s + elif isinstance(s, unicode): + return s.encode('ascii') + else: + raise TypeError('Expected bytes, str, or unicode, not {}'.format(type(s))) + + +# TODO(https://github.com/grpc/grpc/issues/13782): It would be nice for us if +# the type of metadata that we accept were exactly the same as the type of +# metadata that we deliver to our users (so "str" for this function's +# parameter rather than "object"), but would it be nice for our users? Right +# now we haven't yet heard from enough users to know one way or another. +cdef bytes _encode(object string_or_none): + if string_or_none is None: + return b'' + elif isinstance(string_or_none, (bytes,)): + return <bytes>string_or_none + elif isinstance(string_or_none, (unicode,)): + return string_or_none.encode('utf8') + else: + raise TypeError('Expected str, not {}'.format(type(string_or_none))) + + +cdef str _decode(bytes bytestring): + if isinstance(bytestring, (str,)): + return <str>bytestring + else: + try: + return bytestring.decode('utf8') + except UnicodeDecodeError: + _LOGGER.exception('Invalid encoding on %s', bytestring) + return bytestring.decode('latin1') diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pxd.pxi new file mode 100644 index 00000000000..fc72ac1576b --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pxd.pxi @@ -0,0 +1,26 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef void _store_c_metadata( + metadata, grpc_metadata **c_metadata, size_t *c_count) except * + + +cdef void _release_c_metadata(grpc_metadata *c_metadata, int count) except * + + +cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice) + + +cdef tuple _metadata(grpc_metadata_array *c_metadata_array) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pyx.pxi new file mode 100644 index 00000000000..b2dd1e33808 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pyx.pxi @@ -0,0 +1,73 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + + +class InitialMetadataFlags: + used_mask = GRPC_INITIAL_METADATA_USED_MASK + wait_for_ready = GRPC_INITIAL_METADATA_WAIT_FOR_READY + wait_for_ready_explicitly_set = GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET + + +_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',)) + + +cdef void _store_c_metadata( + metadata, grpc_metadata **c_metadata, size_t *c_count) except *: + if metadata is None: + c_count[0] = 0 + c_metadata[0] = NULL + else: + metadatum_count = len(metadata) + if metadatum_count == 0: + c_count[0] = 0 + c_metadata[0] = NULL + else: + c_count[0] = metadatum_count + c_metadata[0] = <grpc_metadata *>gpr_malloc( + metadatum_count * sizeof(grpc_metadata)) + for index, (key, value) in enumerate(metadata): + encoded_key = _encode(key) + encoded_value = value if encoded_key[-4:] == b'-bin' else _encode(value) + if not isinstance(encoded_value, bytes): + raise TypeError('Binary metadata key="%s" expected bytes, got %s' % ( + key, + type(encoded_value) + )) + c_metadata[0][index].key = _slice_from_bytes(encoded_key) + c_metadata[0][index].value = _slice_from_bytes(encoded_value) + + +cdef void _release_c_metadata(grpc_metadata *c_metadata, int count) except *: + if 0 < count: + for index in range(count): + grpc_slice_unref(c_metadata[index].key) + grpc_slice_unref(c_metadata[index].value) + gpr_free(c_metadata) + + +cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice): + cdef bytes key = _slice_bytes(key_slice) + cdef bytes value = _slice_bytes(value_slice) + return <tuple>_Metadatum( + _decode(key), value if key[-4:] == b'-bin' else _decode(value)) + + +cdef tuple _metadata(grpc_metadata_array *c_metadata_array): + return tuple( + _metadatum( + c_metadata_array.metadata[index].key, + c_metadata_array.metadata[index].value) + for index in range(c_metadata_array.count)) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pxd.pxi new file mode 100644 index 00000000000..c9df32dadf5 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pxd.pxi @@ -0,0 +1,111 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self) except * + cdef void un_c(self) except * + + # TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this! + cdef grpc_op c_op + + +cdef class SendInitialMetadataOperation(Operation): + + cdef readonly object _initial_metadata; + cdef readonly int _flags + cdef grpc_metadata *_c_initial_metadata + cdef size_t _c_initial_metadata_count + + cdef void c(self) except * + cdef void un_c(self) except * + + +cdef class SendMessageOperation(Operation): + + cdef readonly bytes _message + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + + cdef void c(self) except * + cdef void un_c(self) except * + + +cdef class SendCloseFromClientOperation(Operation): + + cdef readonly int _flags + + cdef void c(self) except * + cdef void un_c(self) except * + + +cdef class SendStatusFromServerOperation(Operation): + + cdef readonly object _trailing_metadata + cdef readonly object _code + cdef readonly object _details + cdef readonly int _flags + cdef grpc_metadata *_c_trailing_metadata + cdef size_t _c_trailing_metadata_count + cdef grpc_slice _c_details + + cdef void c(self) except * + cdef void un_c(self) except * + + +cdef class ReceiveInitialMetadataOperation(Operation): + + cdef readonly int _flags + cdef tuple _initial_metadata + cdef grpc_metadata_array _c_initial_metadata + + cdef void c(self) except * + cdef void un_c(self) except * + + +cdef class ReceiveMessageOperation(Operation): + + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + cdef bytes _message + + cdef void c(self) except * + cdef void un_c(self) except * + + +cdef class ReceiveStatusOnClientOperation(Operation): + + cdef readonly int _flags + cdef grpc_metadata_array _c_trailing_metadata + cdef grpc_status_code _c_code + cdef grpc_slice _c_details + cdef const char* _c_error_string + cdef tuple _trailing_metadata + cdef object _code + cdef str _details + cdef str _error_string + + cdef void c(self) except * + cdef void un_c(self) except * + + +cdef class ReceiveCloseOnServerOperation(Operation): + + cdef readonly int _flags + cdef object _cancelled + cdef int _c_cancelled + + cdef void c(self) except * + cdef void un_c(self) except * diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pyx.pxi new file mode 100644 index 00000000000..3f3fd75407c --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pyx.pxi @@ -0,0 +1,250 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self) except *: + raise NotImplementedError() + + cdef void un_c(self) except *: + raise NotImplementedError() + + +cdef class SendInitialMetadataOperation(Operation): + + def __cinit__(self, initial_metadata, flags): + self._initial_metadata = initial_metadata + self._flags = flags + + def type(self): + return GRPC_OP_SEND_INITIAL_METADATA + + cdef void c(self) except *: + self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA + self.c_op.flags = self._flags + _store_c_metadata( + self._initial_metadata, &self._c_initial_metadata, + &self._c_initial_metadata_count) + self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata + self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count + self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0 + + cdef void un_c(self) except *: + _release_c_metadata( + self._c_initial_metadata, self._c_initial_metadata_count) + + +cdef class SendMessageOperation(Operation): + + def __cinit__(self, bytes message, int flags): + if message is None: + self._message = b'' + else: + self._message = message + self._flags = flags + + def type(self): + return GRPC_OP_SEND_MESSAGE + + cdef void c(self) except *: + self.c_op.type = GRPC_OP_SEND_MESSAGE + self.c_op.flags = self._flags + cdef grpc_slice message_slice = grpc_slice_from_copied_buffer( + self._message, len(self._message)) + self._c_message_byte_buffer = grpc_raw_byte_buffer_create( + &message_slice, 1) + grpc_slice_unref(message_slice) + self.c_op.data.send_message.send_message = self._c_message_byte_buffer + + cdef void un_c(self) except *: + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + + +cdef class SendCloseFromClientOperation(Operation): + + def __cinit__(self, int flags): + self._flags = flags + + def type(self): + return GRPC_OP_SEND_CLOSE_FROM_CLIENT + + cdef void c(self) except *: + self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT + self.c_op.flags = self._flags + + cdef void un_c(self) except *: + pass + + +cdef class SendStatusFromServerOperation(Operation): + + def __cinit__(self, trailing_metadata, code, object details, int flags): + self._trailing_metadata = trailing_metadata + self._code = code + self._details = details + self._flags = flags + + def type(self): + return GRPC_OP_SEND_STATUS_FROM_SERVER + + cdef void c(self) except *: + self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER + self.c_op.flags = self._flags + _store_c_metadata( + self._trailing_metadata, &self._c_trailing_metadata, + &self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.trailing_metadata = ( + self._c_trailing_metadata) + self.c_op.data.send_status_from_server.trailing_metadata_count = ( + self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.status = self._code + self._c_details = _slice_from_bytes(_encode(self._details)) + self.c_op.data.send_status_from_server.status_details = &self._c_details + + cdef void un_c(self) except *: + grpc_slice_unref(self._c_details) + _release_c_metadata( + self._c_trailing_metadata, self._c_trailing_metadata_count) + + +cdef class ReceiveInitialMetadataOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_INITIAL_METADATA + + cdef void c(self) except *: + self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_initial_metadata) + self.c_op.data.receive_initial_metadata.receive_initial_metadata = ( + &self._c_initial_metadata) + + cdef void un_c(self) except *: + self._initial_metadata = _metadata(&self._c_initial_metadata) + grpc_metadata_array_destroy(&self._c_initial_metadata) + + def initial_metadata(self): + return self._initial_metadata + + +cdef class ReceiveMessageOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_MESSAGE + + cdef void c(self) except *: + self.c_op.type = GRPC_OP_RECV_MESSAGE + self.c_op.flags = self._flags + self.c_op.data.receive_message.receive_message = ( + &self._c_message_byte_buffer) + + cdef void un_c(self) except *: + cdef grpc_byte_buffer_reader message_reader + cdef bint message_reader_status + cdef grpc_slice message_slice + cdef size_t message_slice_length + cdef void *message_slice_pointer + if self._c_message_byte_buffer != NULL: + message_reader_status = grpc_byte_buffer_reader_init( + &message_reader, self._c_message_byte_buffer) + if message_reader_status: + message = bytearray() + while grpc_byte_buffer_reader_next(&message_reader, &message_slice): + message_slice_pointer = grpc_slice_start_ptr(message_slice) + message_slice_length = grpc_slice_length(message_slice) + message += (<char *>message_slice_pointer)[:message_slice_length] + grpc_slice_unref(message_slice) + grpc_byte_buffer_reader_destroy(&message_reader) + self._message = bytes(message) + else: + self._message = None + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + else: + self._message = None + + def message(self): + return self._message + + +cdef class ReceiveStatusOnClientOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_STATUS_ON_CLIENT + + cdef void c(self) except *: + self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.trailing_metadata = ( + &self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.status = ( + &self._c_code) + self.c_op.data.receive_status_on_client.status_details = ( + &self._c_details) + self.c_op.data.receive_status_on_client.error_string = ( + &self._c_error_string) + + cdef void un_c(self) except *: + self._trailing_metadata = _metadata(&self._c_trailing_metadata) + grpc_metadata_array_destroy(&self._c_trailing_metadata) + self._code = self._c_code + self._details = _decode(_slice_bytes(self._c_details)) + grpc_slice_unref(self._c_details) + if self._c_error_string != NULL: + self._error_string = _decode(self._c_error_string) + gpr_free(<void*>self._c_error_string) + else: + self._error_string = "" + + def trailing_metadata(self): + return self._trailing_metadata + + def code(self): + return self._code + + def details(self): + return self._details + + def error_string(self): + return self._error_string + + +cdef class ReceiveCloseOnServerOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_CLOSE_ON_SERVER + + cdef void c(self) except *: + self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER + self.c_op.flags = self._flags + self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled + + cdef void un_c(self) except *: + self._cancelled = bool(self._c_cancelled) + + def cancelled(self): + return self._cancelled diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi new file mode 100644 index 00000000000..3182aa54de5 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi @@ -0,0 +1,20 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef extern from "grpc/impl/propagation_bits.h": + cdef int _GRPC_PROPAGATE_DEADLINE "GRPC_PROPAGATE_DEADLINE" + cdef int _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT "GRPC_PROPAGATE_CENSUS_STATS_CONTEXT" + cdef int _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT "GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT" + cdef int _GRPC_PROPAGATE_CANCELLATION "GRPC_PROPAGATE_CANCELLATION" + cdef int _GRPC_PROPAGATE_DEFAULTS "GRPC_PROPAGATE_DEFAULTS" diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi new file mode 100644 index 00000000000..2dcc76a2db2 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi @@ -0,0 +1,20 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class PropagationConstants: + GRPC_PROPAGATE_DEADLINE = _GRPC_PROPAGATE_DEADLINE + GRPC_PROPAGATE_CENSUS_STATS_CONTEXT = _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT + GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT = _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT + GRPC_PROPAGATE_CANCELLATION = _GRPC_PROPAGATE_CANCELLATION + GRPC_PROPAGATE_DEFAULTS = _GRPC_PROPAGATE_DEFAULTS diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pxd.pxi new file mode 100644 index 00000000000..35e1bdb0aeb --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pxd.pxi @@ -0,0 +1,34 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef bytes _slice_bytes(grpc_slice slice) +cdef grpc_slice _copy_slice(grpc_slice slice) nogil +cdef grpc_slice _slice_from_bytes(bytes value) nogil + + +cdef class CallDetails: + + cdef grpc_call_details c_details + + +cdef class SslPemKeyCertPair: + + cdef grpc_ssl_pem_key_cert_pair c_pair + cdef readonly object private_key, certificate_chain + + +cdef class CompressionOptions: + + cdef grpc_compression_options c_options diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pyx.pxi new file mode 100644 index 00000000000..281cf8613f0 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pyx.pxi @@ -0,0 +1,201 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef bytes _slice_bytes(grpc_slice slice): + cdef void *start = grpc_slice_start_ptr(slice) + cdef size_t length = grpc_slice_length(slice) + return (<const char *>start)[:length] + +cdef grpc_slice _copy_slice(grpc_slice slice) nogil: + cdef void *start = grpc_slice_start_ptr(slice) + cdef size_t length = grpc_slice_length(slice) + return grpc_slice_from_copied_buffer(<const char *>start, length) + +cdef grpc_slice _slice_from_bytes(bytes value) nogil: + cdef const char *value_ptr + cdef size_t length + with gil: + value_ptr = <const char *>value + length = len(value) + return grpc_slice_from_copied_buffer(value_ptr, length) + + +class ConnectivityState: + idle = GRPC_CHANNEL_IDLE + connecting = GRPC_CHANNEL_CONNECTING + ready = GRPC_CHANNEL_READY + transient_failure = GRPC_CHANNEL_TRANSIENT_FAILURE + shutdown = GRPC_CHANNEL_SHUTDOWN + + +class ChannelArgKey: + enable_census = GRPC_ARG_ENABLE_CENSUS + max_concurrent_streams = GRPC_ARG_MAX_CONCURRENT_STREAMS + max_receive_message_length = GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH + max_send_message_length = GRPC_ARG_MAX_SEND_MESSAGE_LENGTH + http2_initial_sequence_number = GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER + default_authority = GRPC_ARG_DEFAULT_AUTHORITY + primary_user_agent_string = GRPC_ARG_PRIMARY_USER_AGENT_STRING + secondary_user_agent_string = GRPC_ARG_SECONDARY_USER_AGENT_STRING + ssl_session_cache = GRPC_SSL_SESSION_CACHE_ARG + ssl_target_name_override = GRPC_SSL_TARGET_NAME_OVERRIDE_ARG + + +class WriteFlag: + buffer_hint = GRPC_WRITE_BUFFER_HINT + no_compress = GRPC_WRITE_NO_COMPRESS + + +class StatusCode: + ok = GRPC_STATUS_OK + cancelled = GRPC_STATUS_CANCELLED + unknown = GRPC_STATUS_UNKNOWN + invalid_argument = GRPC_STATUS_INVALID_ARGUMENT + deadline_exceeded = GRPC_STATUS_DEADLINE_EXCEEDED + not_found = GRPC_STATUS_NOT_FOUND + already_exists = GRPC_STATUS_ALREADY_EXISTS + permission_denied = GRPC_STATUS_PERMISSION_DENIED + unauthenticated = GRPC_STATUS_UNAUTHENTICATED + resource_exhausted = GRPC_STATUS_RESOURCE_EXHAUSTED + failed_precondition = GRPC_STATUS_FAILED_PRECONDITION + aborted = GRPC_STATUS_ABORTED + out_of_range = GRPC_STATUS_OUT_OF_RANGE + unimplemented = GRPC_STATUS_UNIMPLEMENTED + internal = GRPC_STATUS_INTERNAL + unavailable = GRPC_STATUS_UNAVAILABLE + data_loss = GRPC_STATUS_DATA_LOSS + + +class CallError: + ok = GRPC_CALL_OK + error = GRPC_CALL_ERROR + not_on_server = GRPC_CALL_ERROR_NOT_ON_SERVER + not_on_client = GRPC_CALL_ERROR_NOT_ON_CLIENT + already_accepted = GRPC_CALL_ERROR_ALREADY_ACCEPTED + already_invoked = GRPC_CALL_ERROR_ALREADY_INVOKED + not_invoked = GRPC_CALL_ERROR_NOT_INVOKED + already_finished = GRPC_CALL_ERROR_ALREADY_FINISHED + too_many_operations = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS + invalid_flags = GRPC_CALL_ERROR_INVALID_FLAGS + invalid_metadata = GRPC_CALL_ERROR_INVALID_METADATA + + +class CompletionType: + queue_shutdown = GRPC_QUEUE_SHUTDOWN + queue_timeout = GRPC_QUEUE_TIMEOUT + operation_complete = GRPC_OP_COMPLETE + + +class OperationType: + send_initial_metadata = GRPC_OP_SEND_INITIAL_METADATA + send_message = GRPC_OP_SEND_MESSAGE + send_close_from_client = GRPC_OP_SEND_CLOSE_FROM_CLIENT + send_status_from_server = GRPC_OP_SEND_STATUS_FROM_SERVER + receive_initial_metadata = GRPC_OP_RECV_INITIAL_METADATA + receive_message = GRPC_OP_RECV_MESSAGE + receive_status_on_client = GRPC_OP_RECV_STATUS_ON_CLIENT + receive_close_on_server = GRPC_OP_RECV_CLOSE_ON_SERVER + +GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM= ( + _GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM) + +GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY = ( + _GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY) + +class CompressionAlgorithm: + none = GRPC_COMPRESS_NONE + deflate = GRPC_COMPRESS_DEFLATE + gzip = GRPC_COMPRESS_GZIP + + +class CompressionLevel: + none = GRPC_COMPRESS_LEVEL_NONE + low = GRPC_COMPRESS_LEVEL_LOW + medium = GRPC_COMPRESS_LEVEL_MED + high = GRPC_COMPRESS_LEVEL_HIGH + + +cdef class CallDetails: + + def __cinit__(self): + fork_handlers_and_grpc_init() + with nogil: + grpc_call_details_init(&self.c_details) + + def __dealloc__(self): + with nogil: + grpc_call_details_destroy(&self.c_details) + grpc_shutdown() + + @property + def method(self): + return _slice_bytes(self.c_details.method) + + @property + def host(self): + return _slice_bytes(self.c_details.host) + + @property + def deadline(self): + return _time_from_timespec(self.c_details.deadline) + + +cdef class SslPemKeyCertPair: + + def __cinit__(self, bytes private_key, bytes certificate_chain): + self.private_key = private_key + self.certificate_chain = certificate_chain + self.c_pair.private_key = self.private_key + self.c_pair.certificate_chain = self.certificate_chain + + +cdef class CompressionOptions: + + def __cinit__(self): + with nogil: + grpc_compression_options_init(&self.c_options) + + def enable_algorithm(self, grpc_compression_algorithm algorithm): + with nogil: + grpc_compression_options_enable_algorithm(&self.c_options, algorithm) + + def disable_algorithm(self, grpc_compression_algorithm algorithm): + with nogil: + grpc_compression_options_disable_algorithm(&self.c_options, algorithm) + + def is_algorithm_enabled(self, grpc_compression_algorithm algorithm): + cdef int result + with nogil: + result = grpc_compression_options_is_algorithm_enabled( + &self.c_options, algorithm) + return result + + def to_channel_arg(self): + return ( + GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, + self.c_options.enabled_algorithms_bitset, + ) + + +def compression_algorithm_name(grpc_compression_algorithm algorithm): + cdef const char* name + with nogil: + grpc_compression_algorithm_name(algorithm, &name) + # Let Cython do the right thing with string casting + return name + + +def reset_grpc_config_vars(): + ConfigVars.Reset() diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pxd.pxi new file mode 100644 index 00000000000..e6e79536bbe --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pxd.pxi @@ -0,0 +1,17 @@ +# Copyright 2016 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef grpc_ssl_roots_override_result ssl_roots_override_callback( + char **pem_root_certs) nogil diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pyx.pxi new file mode 100644 index 00000000000..9cc3fd5a211 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pyx.pxi @@ -0,0 +1,85 @@ +# Copyright 2016 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from libc.string cimport memcpy + +cdef grpc_ssl_roots_override_result ssl_roots_override_callback( + char **pem_root_certs) nogil: + with gil: + temporary_pem_root_certs = '' + pem_root_certs[0] = <char *>gpr_malloc(len(temporary_pem_root_certs) + 1) + memcpy( + pem_root_certs[0], <char *>temporary_pem_root_certs, + len(temporary_pem_root_certs)) + pem_root_certs[0][len(temporary_pem_root_certs)] = '\0' + + return GRPC_SSL_ROOTS_OVERRIDE_OK + + +def peer_identities(Call call): + cdef grpc_auth_context* auth_context + cdef grpc_auth_property_iterator properties + cdef const grpc_auth_property* property + + auth_context = grpc_call_auth_context(call.c_call) + if auth_context == NULL: + return None + properties = grpc_auth_context_peer_identity(auth_context) + identities = [] + while True: + property = grpc_auth_property_iterator_next(&properties) + if property == NULL: + break + if property.value != NULL: + identities.append(<bytes>(property.value)) + grpc_auth_context_release(auth_context) + return identities if identities else None + +def peer_identity_key(Call call): + cdef grpc_auth_context* auth_context + cdef const char* c_key + auth_context = grpc_call_auth_context(call.c_call) + if auth_context == NULL: + return None + c_key = grpc_auth_context_peer_identity_property_name(auth_context) + if c_key == NULL: + key = None + else: + key = <bytes> grpc_auth_context_peer_identity_property_name(auth_context) + grpc_auth_context_release(auth_context) + return key + +def auth_context(Call call): + cdef grpc_auth_context* auth_context + cdef grpc_auth_property_iterator properties + cdef const grpc_auth_property* property + + auth_context = grpc_call_auth_context(call.c_call) + if auth_context == NULL: + return {} + properties = grpc_auth_context_property_iterator(auth_context) + py_auth_context = {} + while True: + property = grpc_auth_property_iterator_next(&properties) + if property == NULL: + break + if property.name != NULL and property.value != NULL: + key = <bytes> property.name + if key in py_auth_context: + py_auth_context[key].append(<bytes>(property.value)) + else: + py_auth_context[key] = [<bytes> property.value] + grpc_auth_context_release(auth_context) + return py_auth_context + diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pxd.pxi new file mode 100644 index 00000000000..b89ed99d97b --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pxd.pxi @@ -0,0 +1,29 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class Server: + + cdef grpc_server *c_server + + cdef bint is_started # start has been called + cdef bint is_shutting_down # shutdown has been called + cdef bint is_shutdown # notification of complete shutdown received + # used at dealloc when user forgets to shutdown + cdef CompletionQueue backup_shutdown_queue + # TODO(https://github.com/grpc/grpc/issues/15662): Elide this. + cdef list references + cdef list registered_completion_queues + + cdef _c_shutdown(self, CompletionQueue queue, tag) + cdef notify_shutdown_complete(self) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pyx.pxi new file mode 100644 index 00000000000..29dabec61d9 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pyx.pxi @@ -0,0 +1,165 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Server: + + def __cinit__(self, object arguments, bint xds): + fork_handlers_and_grpc_init() + self.references = [] + self.registered_completion_queues = [] + self.is_started = False + self.is_shutting_down = False + self.is_shutdown = False + self.c_server = NULL + cdef _ChannelArgs channel_args = _ChannelArgs(arguments) + self.c_server = grpc_server_create(channel_args.c_args(), NULL) + cdef grpc_server_xds_status_notifier notifier + notifier.on_serving_status_update = NULL + notifier.user_data = NULL + if xds: + grpc_server_set_config_fetcher(self.c_server, + grpc_server_config_fetcher_xds_create(notifier, channel_args.c_args())) + self.references.append(arguments) + + def request_call( + self, CompletionQueue call_queue not None, + CompletionQueue server_queue not None, tag): + if not self.is_started or self.is_shutting_down: + raise ValueError("server must be started and not shutting down") + if server_queue not in self.registered_completion_queues: + raise ValueError("server_queue must be a registered completion queue") + cdef _RequestCallTag request_call_tag = _RequestCallTag(tag) + request_call_tag.prepare() + cpython.Py_INCREF(request_call_tag) + return grpc_server_request_call( + self.c_server, &request_call_tag.call.c_call, + &request_call_tag.call_details.c_details, + &request_call_tag.c_invocation_metadata, + call_queue.c_completion_queue, server_queue.c_completion_queue, + <cpython.PyObject *>request_call_tag) + + def register_completion_queue( + self, CompletionQueue queue not None): + if self.is_started: + raise ValueError("cannot register completion queues after start") + with nogil: + grpc_server_register_completion_queue( + self.c_server, queue.c_completion_queue, NULL) + self.registered_completion_queues.append(queue) + + def start(self, backup_queue=True): + """Start the Cython gRPC Server. + + Args: + backup_queue: a bool indicates whether to spawn a backup completion + queue. In the case that no CQ is bound to the server, and the shutdown + of server becomes un-observable. + """ + if self.is_started: + raise ValueError("the server has already started") + if backup_queue: + self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True) + self.register_completion_queue(self.backup_shutdown_queue) + self.is_started = True + with nogil: + grpc_server_start(self.c_server) + if backup_queue: + # Ensure the core has gotten a chance to do the start-up work + self.backup_shutdown_queue.poll(deadline=time.time()) + + def add_http2_port(self, bytes address, + ServerCredentials server_credentials=None): + address = str_to_bytes(address) + self.references.append(address) + cdef int result + cdef char *address_c_string = address + if server_credentials is not None: + self.references.append(server_credentials) + with nogil: + result = grpc_server_add_http2_port( + self.c_server, address_c_string, server_credentials.c_credentials) + else: + with nogil: + creds = grpc_insecure_server_credentials_create() + result = grpc_server_add_http2_port(self.c_server, + address_c_string, creds) + grpc_server_credentials_release(creds) + return result + + cdef _c_shutdown(self, CompletionQueue queue, tag): + self.is_shutting_down = True + cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self) + cpython.Py_INCREF(server_shutdown_tag) + with nogil: + grpc_server_shutdown_and_notify( + self.c_server, queue.c_completion_queue, + <cpython.PyObject *>server_shutdown_tag) + + def shutdown(self, CompletionQueue queue not None, tag): + if queue.is_shutting_down: + raise ValueError("queue must be live") + elif not self.is_started: + raise ValueError("the server hasn't started yet") + elif self.is_shutting_down: + return + elif queue not in self.registered_completion_queues: + raise ValueError("expected registered completion queue") + else: + self._c_shutdown(queue, tag) + + cdef notify_shutdown_complete(self): + # called only after our server shutdown tag has emerged from a completion + # queue. + self.is_shutdown = True + + def cancel_all_calls(self): + if not self.is_shutting_down: + raise UsageError("the server must be shutting down to cancel all calls") + elif self.is_shutdown: + return + else: + with nogil: + grpc_server_cancel_all_calls(self.c_server) + + # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any, + # portion of this is safe to call from __dealloc__, and potentially remove + # backup_shutdown_queue. + def destroy(self): + if self.c_server != NULL: + if not self.is_started: + pass + elif self.is_shutdown: + pass + elif not self.is_shutting_down: + if self.backup_shutdown_queue is None: + raise InternalError('Server shutdown failed: no completion queue.') + else: + # the user didn't call shutdown - use our backup queue + self._c_shutdown(self.backup_shutdown_queue, None) + # and now we wait + while not self.is_shutdown: + self.backup_shutdown_queue.poll() + else: + # We're in the process of shutting down, but have not shutdown; can't do + # much but repeatedly release the GIL and wait + while not self.is_shutdown: + time.sleep(0) + with nogil: + grpc_server_destroy(self.c_server) + self.c_server = NULL + + def __dealloc__(self): + if self.c_server == NULL: + grpc_shutdown() diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pxd.pxi new file mode 100644 index 00000000000..7af169fa3f9 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pxd.pxi @@ -0,0 +1,58 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _Tag: + + cdef BaseEvent event(self, grpc_event c_event) + + +cdef class _ConnectivityTag(_Tag): + + cdef readonly object _user_tag + + cdef ConnectivityEvent event(self, grpc_event c_event) + + +cdef class _RequestCallTag(_Tag): + + cdef readonly object _user_tag + cdef Call call + cdef CallDetails call_details + cdef grpc_metadata_array c_invocation_metadata + + cdef void prepare(self) except * + cdef RequestCallEvent event(self, grpc_event c_event) + + +cdef class _BatchOperationTag(_Tag): + + cdef object _user_tag + cdef readonly object _operations + cdef readonly object _retained_call + cdef grpc_op *c_ops + cdef size_t c_nops + + cdef void prepare(self) except * + cdef BatchOperationEvent event(self, grpc_event c_event) + + +cdef class _ServerShutdownTag(_Tag): + + cdef readonly object _user_tag + # This allows CompletionQueue to notify the Python Server object that the + # underlying GRPC core server has shutdown + cdef readonly Server _shutting_down_server + + cdef ServerShutdownEvent event(self, grpc_event c_event) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pyx.pxi new file mode 100644 index 00000000000..7e62ff63897 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pyx.pxi @@ -0,0 +1,88 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _Tag: + + cdef BaseEvent event(self, grpc_event c_event): + raise NotImplementedError() + + +cdef class _ConnectivityTag(_Tag): + + def __cinit__(self, user_tag): + self._user_tag = user_tag + + cdef ConnectivityEvent event(self, grpc_event c_event): + return ConnectivityEvent(c_event.type, c_event.success, self._user_tag) + + +cdef class _RequestCallTag(_Tag): + + def __cinit__(self, user_tag): + self._user_tag = user_tag + self.call = None + self.call_details = None + + cdef void prepare(self) except *: + self.call = Call() + self.call_details = CallDetails() + grpc_metadata_array_init(&self.c_invocation_metadata) + + cdef RequestCallEvent event(self, grpc_event c_event): + cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata) + grpc_metadata_array_destroy(&self.c_invocation_metadata) + return RequestCallEvent( + c_event.type, c_event.success, self._user_tag, self.call, + self.call_details, invocation_metadata) + + +cdef class _BatchOperationTag: + + def __cinit__(self, user_tag, operations, call): + self._user_tag = user_tag + self._operations = operations + self._retained_call = call + + cdef void prepare(self) except *: + cdef Operation operation + self.c_nops = 0 if self._operations is None else len(self._operations) + if 0 < self.c_nops: + self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops) + for index, operation in enumerate(self._operations): + operation.c() + self.c_ops[index] = operation.c_op + + cdef BatchOperationEvent event(self, grpc_event c_event): + cdef Operation operation + if 0 < self.c_nops: + for operation in self._operations: + operation.un_c() + gpr_free(self.c_ops) + return BatchOperationEvent( + c_event.type, c_event.success, self._user_tag, self._operations) + else: + return BatchOperationEvent( + c_event.type, c_event.success, self._user_tag, ()) + + +cdef class _ServerShutdownTag(_Tag): + + def __cinit__(self, user_tag, shutting_down_server): + self._user_tag = user_tag + self._shutting_down_server = shutting_down_server + + cdef ServerShutdownEvent event(self, grpc_event c_event): + self._shutting_down_server.notify_shutdown_complete() + return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag) diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/thread.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/thread.pyx.pxi new file mode 100644 index 00000000000..be4cb8b9a8e --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/thread.pyx.pxi @@ -0,0 +1,59 @@ +# Copyright 2020 The gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def _contextvars_supported(): + """Determines if the contextvars module is supported. + + We use a 'try it and see if it works approach' here rather than predicting + based on interpreter version in order to support older interpreters that + may have a backported module based on, e.g. `threading.local`. + + Returns: + A bool indicating whether `contextvars` are supported in the current + environment. + """ + try: + import contextvars + return True + except ImportError: + return False + + +def _run_with_context(target): + """Runs a callable with contextvars propagated. + + If contextvars are supported, the calling thread's context will be copied + and propagated. If they are not supported, this function is equivalent + to the identity function. + + Args: + target: A callable object to wrap. + Returns: + A callable object with the same signature as `target` but with + contextvars propagated. + """ + + +if _contextvars_supported(): + import contextvars + def _run_with_context(target): + ctx = contextvars.copy_context() + def _run(*args): + ctx.run(target, *args) + return _run +else: + def _run_with_context(target): + def _run(*args): + target(*args) + return _run diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pxd.pxi new file mode 100644 index 00000000000..c46e8a98b04 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pxd.pxi @@ -0,0 +1,19 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef gpr_timespec _timespec_from_time(object time) except * + + +cdef double _time_from_timespec(gpr_timespec timespec) except * diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pyx.pxi new file mode 100644 index 00000000000..6d181bb1d60 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pyx.pxi @@ -0,0 +1,29 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef gpr_timespec _timespec_from_time(object time) except *: + if time is None: + return gpr_inf_future(GPR_CLOCK_REALTIME) + else: + return gpr_time_from_nanos( + <int64_t>(<double>time * GPR_NS_PER_SEC), + GPR_CLOCK_REALTIME, + ) + + +cdef double _time_from_timespec(gpr_timespec timespec) except *: + cdef gpr_timespec real_timespec = gpr_convert_clock_type( + timespec, GPR_CLOCK_REALTIME) + return gpr_timespec_to_micros(real_timespec) / GPR_US_PER_SEC diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pxd.pxi new file mode 100644 index 00000000000..c96e5cb6696 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pxd.pxi @@ -0,0 +1,23 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef void* _copy_pointer(void* pointer) + +cdef void _destroy_pointer(void* pointer) + +cdef int _compare_pointer(void* first_pointer, void* second_pointer) + + +cdef grpc_arg_pointer_vtable default_vtable diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi new file mode 100644 index 00000000000..da4b81bd97e --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi @@ -0,0 +1,36 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO(https://github.com/grpc/grpc/issues/15662): Reform this. +cdef void* _copy_pointer(void* pointer): + return pointer + + +# TODO(https://github.com/grpc/grpc/issues/15662): Reform this. +cdef void _destroy_pointer(void* pointer): + pass + + +cdef int _compare_pointer(void* first_pointer, void* second_pointer): + if first_pointer < second_pointer: + return -1 + elif first_pointer > second_pointer: + return 1 + else: + return 0 + +cdef grpc_arg_pointer_vtable default_vtable +default_vtable.copy = &_copy_pointer +default_vtable.destroy = &_destroy_pointer +default_vtable.cmp = &_compare_pointer diff --git a/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pxd b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pxd new file mode 100644 index 00000000000..ed04119143a --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pxd @@ -0,0 +1,50 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +cimport cpython + +include "_cygrpc/grpc.pxi" + +include "_cygrpc/arguments.pxd.pxi" +include "_cygrpc/call.pxd.pxi" +include "_cygrpc/channel.pxd.pxi" +include "_cygrpc/credentials.pxd.pxi" +include "_cygrpc/completion_queue.pxd.pxi" +include "_cygrpc/event.pxd.pxi" +include "_cygrpc/metadata.pxd.pxi" +include "_cygrpc/operation.pxd.pxi" +include "_cygrpc/propagation_bits.pxd.pxi" +include "_cygrpc/records.pxd.pxi" +include "_cygrpc/security.pxd.pxi" +include "_cygrpc/server.pxd.pxi" +include "_cygrpc/tag.pxd.pxi" +include "_cygrpc/time.pxd.pxi" +include "_cygrpc/vtable.pxd.pxi" +include "_cygrpc/_hooks.pxd.pxi" + + +include "_cygrpc/grpc_gevent.pxd.pxi" + +IF UNAME_SYSNAME != "Windows": + include "_cygrpc/fork_posix.pxd.pxi" + +# Following pxi files are part of the Aio module +include "_cygrpc/aio/completion_queue.pxd.pxi" +include "_cygrpc/aio/rpc_status.pxd.pxi" +include "_cygrpc/aio/grpc_aio.pxd.pxi" +include "_cygrpc/aio/callback_common.pxd.pxi" +include "_cygrpc/aio/call.pxd.pxi" +include "_cygrpc/aio/channel.pxd.pxi" +include "_cygrpc/aio/server.pxd.pxi" diff --git a/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx new file mode 100644 index 00000000000..c7925676c34 --- /dev/null +++ b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx @@ -0,0 +1,94 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +cimport cpython + +import logging +import os +import sys +import threading +import time + +import grpc + +try: + import asyncio +except ImportError: + # TODO(https://github.com/grpc/grpc/issues/19728) Improve how Aio Cython is + # distributed without breaking none compatible Python versions. For now, if + # Asyncio package is not available we just skip it. + pass + +# The only copy of Python logger for the Cython extension +_LOGGER = logging.getLogger(__name__) + +# TODO(atash): figure out why the coverage tool gets confused about the Cython +# coverage plugin when the following files don't have a '.pxi' suffix. +include "_cygrpc/grpc_string.pyx.pxi" +include "_cygrpc/arguments.pyx.pxi" +include "_cygrpc/call.pyx.pxi" +include "_cygrpc/channel.pyx.pxi" +include "_cygrpc/channelz.pyx.pxi" +include "_cygrpc/csds.pyx.pxi" +include "_cygrpc/credentials.pyx.pxi" +include "_cygrpc/completion_queue.pyx.pxi" +include "_cygrpc/event.pyx.pxi" +include "_cygrpc/metadata.pyx.pxi" +include "_cygrpc/operation.pyx.pxi" +include "_cygrpc/propagation_bits.pyx.pxi" +include "_cygrpc/records.pyx.pxi" +include "_cygrpc/security.pyx.pxi" +include "_cygrpc/server.pyx.pxi" +include "_cygrpc/tag.pyx.pxi" +include "_cygrpc/time.pyx.pxi" +include "_cygrpc/vtable.pyx.pxi" +include "_cygrpc/_hooks.pyx.pxi" + +include "_cygrpc/grpc_gevent.pyx.pxi" + +include "_cygrpc/thread.pyx.pxi" + +IF UNAME_SYSNAME == "Windows": + include "_cygrpc/fork_windows.pyx.pxi" +ELSE: + include "_cygrpc/fork_posix.pyx.pxi" + +# Following pxi files are part of the Aio module +include "_cygrpc/aio/common.pyx.pxi" +include "_cygrpc/aio/rpc_status.pyx.pxi" +include "_cygrpc/aio/completion_queue.pyx.pxi" +include "_cygrpc/aio/callback_common.pyx.pxi" +include "_cygrpc/aio/grpc_aio.pyx.pxi" +include "_cygrpc/aio/call.pyx.pxi" +include "_cygrpc/aio/channel.pyx.pxi" +include "_cygrpc/aio/server.pyx.pxi" + + +# +# initialize gRPC +# +cdef extern from "Python.h": + + int PyEval_InitThreads() + +cdef _initialize(): + # We have Python callbacks called by c-core threads, this ensures the GIL + # is initialized. + PyEval_InitThreads() + import ssl + grpc_dont_init_openssl() + # Load Arcadia certs in ComputePemRootCerts and do not override here. + +_initialize() |