summaryrefslogtreecommitdiffstats
path: root/contrib/python/grpcio/py3/grpc/_cython
diff options
context:
space:
mode:
authornkozlovskiy <[email protected]>2023-09-29 12:24:06 +0300
committernkozlovskiy <[email protected]>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/grpcio/py3/grpc/_cython
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
add ydb deps
Diffstat (limited to 'contrib/python/grpcio/py3/grpc/_cython')
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/__init__.py13
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/__init__.py13
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pxd.pxi16
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi35
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pxd.pxi47
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi508
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi57
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi185
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pxd.pxi27
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pyx.pxi135
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi202
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi52
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi174
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi43
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi114
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi29
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pyx.pxi44
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi92
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi1097
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pxd.pxi36
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pyx.pxi85
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pxd.pxi20
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pyx.pxi97
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi74
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi516
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channelz.pyx.pxi71
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pxd.pxi32
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pyx.pxi139
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pxd.pxi117
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi443
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/csds.pyx.pxi21
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pxd.pxi47
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pyx.pxi54
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi29
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi208
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_windows.pyx.pxi61
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi735
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi21
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi137
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_string.pyx.pxi51
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pxd.pxi26
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pyx.pxi73
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pxd.pxi111
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pyx.pxi250
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi20
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi20
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pxd.pxi34
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pyx.pxi201
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pxd.pxi17
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pyx.pxi85
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pxd.pxi29
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pyx.pxi165
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pxd.pxi58
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pyx.pxi88
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/thread.pyx.pxi59
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pxd.pxi19
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pyx.pxi29
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pxd.pxi23
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi36
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/cygrpc.pxd50
-rw-r--r--contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx94
61 files changed, 7364 insertions, 0 deletions
diff --git a/contrib/python/grpcio/py3/grpc/_cython/__init__.py b/contrib/python/grpcio/py3/grpc/_cython/__init__.py
new file mode 100644
index 00000000000..5fb4f3c3cfd
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/__init__.py b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/__init__.py
new file mode 100644
index 00000000000..5fb4f3c3cfd
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pxd.pxi
new file mode 100644
index 00000000000..3eb10f52757
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pxd.pxi
@@ -0,0 +1,16 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef object _custom_op_on_c_call(int op, grpc_call *call)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi
new file mode 100644
index 00000000000..de4d71b8196
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/_hooks.pyx.pxi
@@ -0,0 +1,35 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef object _custom_op_on_c_call(int op, grpc_call *call):
+ raise NotImplementedError("No custom hooks are implemented")
+
+def install_context_from_request_call_event(RequestCallEvent event):
+ pass
+
+def uninstall_context():
+ pass
+
+def build_census_context():
+ pass
+
+cdef class CensusContext:
+ pass
+
+def set_census_context_on_call(_CallState call_state, CensusContext census_ctx):
+ pass
+
+def get_deadline_from_context():
+ return None
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pxd.pxi
new file mode 100644
index 00000000000..867245a6944
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pxd.pxi
@@ -0,0 +1,47 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _AioCall(GrpcCallWrapper):
+ cdef:
+ readonly AioChannel _channel
+ list _references
+ object _deadline
+ list _done_callbacks
+
+ # Caches the picked event loop, so we can avoid the 30ns overhead each
+ # time we need access to the event loop.
+ object _loop
+
+ # Flag indicates whether cancel being called or not. Cancellation from
+ # Core or peer works perfectly fine with normal procedure. However, we
+ # need this flag to clean up resources for cancellation from the
+ # application layer. Directly cancelling tasks might cause segfault
+ # because Core is holding a pointer for the callback handler.
+ bint _is_locally_cancelled
+
+ # Following attributes are used for storing the status of the call and
+ # the initial metadata. Waiters are used for pausing the execution of
+ # tasks that are asking for one of the field when they are not yet
+ # available.
+ readonly AioRpcStatus _status
+ readonly tuple _initial_metadata
+ list _waiters_status
+ list _waiters_initial_metadata
+
+ int _send_initial_metadata_flags
+
+ cdef void _create_grpc_call(self, object timeout, bytes method, CallCredentials credentials) except *
+ cdef void _set_status(self, AioRpcStatus status) except *
+ cdef void _set_initial_metadata(self, tuple initial_metadata) except *
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi
new file mode 100644
index 00000000000..7bce1850dc8
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/call.pyx.pxi
@@ -0,0 +1,508 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+_EMPTY_FLAGS = 0
+_EMPTY_MASK = 0
+_IMMUTABLE_EMPTY_METADATA = tuple()
+
+_UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled for unknown reason.'
+_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n'
+ '\tstatus = {}\n'
+ '\tdetails = "{}"\n'
+ '>')
+
+_NON_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n'
+ '\tstatus = {}\n'
+ '\tdetails = "{}"\n'
+ '\tdebug_error_string = "{}"\n'
+ '>')
+
+
+cdef int _get_send_initial_metadata_flags(object wait_for_ready) except *:
+ cdef int flags = 0
+ # Wait-for-ready can be None, which means using default value in Core.
+ if wait_for_ready is not None:
+ flags |= InitialMetadataFlags.wait_for_ready_explicitly_set
+ if wait_for_ready:
+ flags |= InitialMetadataFlags.wait_for_ready
+
+ flags &= InitialMetadataFlags.used_mask
+ return flags
+
+
+cdef class _AioCall(GrpcCallWrapper):
+
+ def __cinit__(self, AioChannel channel, object deadline,
+ bytes method, CallCredentials call_credentials, object wait_for_ready):
+ init_grpc_aio()
+ self.call = NULL
+ self._channel = channel
+ self._loop = channel.loop
+ self._references = []
+ self._status = None
+ self._initial_metadata = None
+ self._waiters_status = []
+ self._waiters_initial_metadata = []
+ self._done_callbacks = []
+ self._is_locally_cancelled = False
+ self._deadline = deadline
+ self._send_initial_metadata_flags = _get_send_initial_metadata_flags(wait_for_ready)
+ self._create_grpc_call(deadline, method, call_credentials)
+
+ def __dealloc__(self):
+ if self.call:
+ grpc_call_unref(self.call)
+ shutdown_grpc_aio()
+
+ def _repr(self) -> str:
+ """Assembles the RPC representation string."""
+ # This needs to be loaded at run time once everything
+ # has been loaded.
+ from grpc import _common
+
+ if not self.done():
+ return '<{} object>'.format(self.__class__.__name__)
+
+ if self._status.code() is StatusCode.ok:
+ return _OK_CALL_REPRESENTATION.format(
+ self.__class__.__name__,
+ _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[self._status.code()],
+ self._status.details())
+ else:
+ return _NON_OK_CALL_REPRESENTATION.format(
+ self.__class__.__name__,
+ self._status.details(),
+ _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[self._status.code()],
+ self._status.debug_error_string())
+
+ def __repr__(self) -> str:
+ return self._repr()
+
+ def __str__(self) -> str:
+ return self._repr()
+
+ cdef void _create_grpc_call(self,
+ object deadline,
+ bytes method,
+ CallCredentials credentials) except *:
+ """Creates the corresponding Core object for this RPC.
+
+ For unary calls, the grpc_call lives shortly and can be destroyed after
+ invoke start_batch. However, if either side is streaming, the grpc_call
+ life span will be longer than one function. So, it would better save it
+ as an instance variable than a stack variable, which reflects its
+ nature in Core.
+ """
+ cdef grpc_slice method_slice
+ cdef gpr_timespec c_deadline = _timespec_from_time(deadline)
+ cdef grpc_call_error set_credentials_error
+
+ method_slice = grpc_slice_from_copied_buffer(
+ <const char *> method,
+ <size_t> len(method)
+ )
+ self.call = grpc_channel_create_call(
+ self._channel.channel,
+ NULL,
+ _EMPTY_MASK,
+ global_completion_queue(),
+ method_slice,
+ NULL,
+ c_deadline,
+ NULL
+ )
+
+ if credentials is not None:
+ set_credentials_error = grpc_call_set_credentials(self.call, credentials.c())
+ if set_credentials_error != GRPC_CALL_OK:
+ raise InternalError("Credentials couldn't have been set: {0}".format(set_credentials_error))
+
+ grpc_slice_unref(method_slice)
+
+ cdef void _set_status(self, AioRpcStatus status) except *:
+ cdef list waiters
+
+ # No more waiters should be expected since status has been set.
+ self._status = status
+
+ if self._initial_metadata is None:
+ self._set_initial_metadata(_IMMUTABLE_EMPTY_METADATA)
+
+ for waiter in self._waiters_status:
+ if not waiter.done():
+ waiter.set_result(None)
+ self._waiters_status = []
+
+ for callback in self._done_callbacks:
+ callback()
+
+ cdef void _set_initial_metadata(self, tuple initial_metadata) except *:
+ if self._initial_metadata is not None:
+ # Some gRPC calls might end before the initial metadata arrived in
+ # the Call object. That causes this method to be invoked twice: 1.
+ # filled with an empty metadata; 2. updated with the actual user
+ # provided metadata.
+ return
+
+ cdef list waiters
+
+ # No more waiters should be expected since initial metadata has been
+ # set.
+ self._initial_metadata = initial_metadata
+
+ for waiter in self._waiters_initial_metadata:
+ if not waiter.done():
+ waiter.set_result(None)
+ self._waiters_initial_metadata = []
+
+ def add_done_callback(self, callback):
+ if self.done():
+ callback()
+ else:
+ self._done_callbacks.append(callback)
+
+ def time_remaining(self):
+ if self._deadline is None:
+ return None
+ else:
+ return max(0, self._deadline - time.time())
+
+ def cancel(self, str details):
+ """Cancels the RPC in Core with given RPC status.
+
+ Above abstractions must invoke this method to set Core objects into
+ proper state.
+ """
+ self._is_locally_cancelled = True
+
+ cdef object details_bytes
+ cdef char *c_details
+ cdef grpc_call_error error
+
+ self._set_status(AioRpcStatus(
+ StatusCode.cancelled,
+ details,
+ None,
+ None,
+ ))
+
+ details_bytes = str_to_bytes(details)
+ self._references.append(details_bytes)
+ c_details = <char *>details_bytes
+ # By implementation, grpc_call_cancel_with_status always return OK
+ error = grpc_call_cancel_with_status(
+ self.call,
+ StatusCode.cancelled,
+ c_details,
+ NULL,
+ )
+ assert error == GRPC_CALL_OK
+
+ def done(self):
+ """Returns if the RPC call has finished.
+
+ Checks if the status has been provided, either
+ because the RPC finished or because was cancelled..
+
+ Returns:
+ True if the RPC can be considered finished.
+ """
+ return self._status is not None
+
+ def cancelled(self):
+ """Returns if the RPC was cancelled.
+
+ Returns:
+ True if the RPC was cancelled.
+ """
+ if not self.done():
+ return False
+
+ return self._status.code() == StatusCode.cancelled
+
+ async def status(self):
+ """Returns the status of the RPC call.
+
+ It returns the finshed status of the RPC. If the RPC
+ has not finished yet this function will wait until the RPC
+ gets finished.
+
+ Returns:
+ Finished status of the RPC as an AioRpcStatus object.
+ """
+ if self._status is not None:
+ return self._status
+
+ future = self._loop.create_future()
+ self._waiters_status.append(future)
+ await future
+
+ return self._status
+
+ def is_ok(self):
+ """Returns if the RPC is ended with ok."""
+ return self.done() and self._status.code() == StatusCode.ok
+
+ async def initial_metadata(self):
+ """Returns the initial metadata of the RPC call.
+
+ If the initial metadata has not been received yet this function will
+ wait until the RPC gets finished.
+
+ Returns:
+ The tuple object with the initial metadata.
+ """
+ if self._initial_metadata is not None:
+ return self._initial_metadata
+
+ future = self._loop.create_future()
+ self._waiters_initial_metadata.append(future)
+ await future
+
+ return self._initial_metadata
+
+ def is_locally_cancelled(self):
+ """Returns if the RPC was cancelled locally.
+
+ Returns:
+ True when was cancelled locally, False when was cancelled remotelly or
+ is still ongoing.
+ """
+ if self._is_locally_cancelled:
+ return True
+
+ return False
+
+ async def unary_unary(self,
+ bytes request,
+ tuple outbound_initial_metadata):
+ """Performs a unary unary RPC.
+
+ Args:
+ request: the serialized requests in bytes.
+ outbound_initial_metadata: optional outbound metadata.
+ """
+ cdef tuple ops
+
+ cdef SendInitialMetadataOperation initial_metadata_op = SendInitialMetadataOperation(
+ outbound_initial_metadata,
+ self._send_initial_metadata_flags)
+ cdef SendMessageOperation send_message_op = SendMessageOperation(request, _EMPTY_FLAGS)
+ cdef SendCloseFromClientOperation send_close_op = SendCloseFromClientOperation(_EMPTY_FLAGS)
+ cdef ReceiveInitialMetadataOperation receive_initial_metadata_op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
+ cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
+ cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
+
+ ops = (initial_metadata_op, send_message_op, send_close_op,
+ receive_initial_metadata_op, receive_message_op,
+ receive_status_on_client_op)
+
+ # Executes all operations in one batch.
+ # Might raise CancelledError, handling it in Python UnaryUnaryCall.
+ await execute_batch(self,
+ ops,
+ self._loop)
+
+ self._set_initial_metadata(receive_initial_metadata_op.initial_metadata())
+
+ cdef grpc_status_code code
+ code = receive_status_on_client_op.code()
+
+ self._set_status(AioRpcStatus(
+ code,
+ receive_status_on_client_op.details(),
+ receive_status_on_client_op.trailing_metadata(),
+ receive_status_on_client_op.error_string(),
+ ))
+
+ if code == StatusCode.ok:
+ return receive_message_op.message()
+ else:
+ return None
+
+ async def _handle_status_once_received(self):
+ """Handles the status sent by peer once received."""
+ cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
+ cdef tuple ops = (op,)
+ await execute_batch(self, ops, self._loop)
+
+ # Halts if the RPC is locally cancelled
+ if self._is_locally_cancelled:
+ return
+
+ self._set_status(AioRpcStatus(
+ op.code(),
+ op.details(),
+ op.trailing_metadata(),
+ op.error_string(),
+ ))
+
+ async def receive_serialized_message(self):
+ """Receives one single raw message in bytes."""
+ cdef bytes received_message
+
+ # Receives a message. Returns None when failed:
+ # * EOF, no more messages to read;
+ # * The client application cancels;
+ # * The server sends final status.
+ received_message = await _receive_message(
+ self,
+ self._loop
+ )
+ if received_message is not None:
+ return received_message
+ else:
+ return EOF
+
+ async def send_serialized_message(self, bytes message):
+ """Sends one single raw message in bytes."""
+ await _send_message(self,
+ message,
+ None,
+ False,
+ self._loop)
+
+ async def send_receive_close(self):
+ """Half close the RPC on the client-side."""
+ cdef SendCloseFromClientOperation op = SendCloseFromClientOperation(_EMPTY_FLAGS)
+ cdef tuple ops = (op,)
+ await execute_batch(self, ops, self._loop)
+
+ async def initiate_unary_stream(self,
+ bytes request,
+ tuple outbound_initial_metadata):
+ """Implementation of the start of a unary-stream call."""
+ # Peer may prematurely end this RPC at any point. We need a corutine
+ # that watches if the server sends the final status.
+ status_task = self._loop.create_task(self._handle_status_once_received())
+
+ cdef tuple outbound_ops
+ cdef Operation initial_metadata_op = SendInitialMetadataOperation(
+ outbound_initial_metadata,
+ self._send_initial_metadata_flags)
+ cdef Operation send_message_op = SendMessageOperation(
+ request,
+ _EMPTY_FLAGS)
+ cdef Operation send_close_op = SendCloseFromClientOperation(
+ _EMPTY_FLAGS)
+
+ outbound_ops = (
+ initial_metadata_op,
+ send_message_op,
+ send_close_op,
+ )
+
+ try:
+ # Sends out the request message.
+ await execute_batch(self,
+ outbound_ops,
+ self._loop)
+
+ # Receives initial metadata.
+ self._set_initial_metadata(
+ await _receive_initial_metadata(self,
+ self._loop),
+ )
+ except ExecuteBatchError as batch_error:
+ # Core should explain why this batch failed
+ await status_task
+
+ async def stream_unary(self,
+ tuple outbound_initial_metadata,
+ object metadata_sent_observer):
+ """Actual implementation of the complete unary-stream call.
+
+ Needs to pay extra attention to the raise mechanism. If we want to
+ propagate the final status exception, then we have to raise it.
+ Othersize, it would end normally and raise `StopAsyncIteration()`.
+ """
+ try:
+ # Sends out initial_metadata ASAP.
+ await _send_initial_metadata(self,
+ outbound_initial_metadata,
+ self._send_initial_metadata_flags,
+ self._loop)
+ # Notify upper level that sending messages are allowed now.
+ metadata_sent_observer()
+
+ # Receives initial metadata.
+ self._set_initial_metadata(
+ await _receive_initial_metadata(self, self._loop)
+ )
+ except ExecuteBatchError:
+ # Core should explain why this batch failed
+ await self._handle_status_once_received()
+
+ # Allow upper layer to proceed only if the status is set
+ metadata_sent_observer()
+ return None
+
+ cdef tuple inbound_ops
+ cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
+ cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
+ inbound_ops = (receive_message_op, receive_status_on_client_op)
+
+ # Executes all operations in one batch.
+ await execute_batch(self,
+ inbound_ops,
+ self._loop)
+
+ cdef grpc_status_code code
+ code = receive_status_on_client_op.code()
+
+ self._set_status(AioRpcStatus(
+ code,
+ receive_status_on_client_op.details(),
+ receive_status_on_client_op.trailing_metadata(),
+ receive_status_on_client_op.error_string(),
+ ))
+
+ if code == StatusCode.ok:
+ return receive_message_op.message()
+ else:
+ return None
+
+ async def initiate_stream_stream(self,
+ tuple outbound_initial_metadata,
+ object metadata_sent_observer):
+ """Actual implementation of the complete stream-stream call.
+
+ Needs to pay extra attention to the raise mechanism. If we want to
+ propagate the final status exception, then we have to raise it.
+ Othersize, it would end normally and raise `StopAsyncIteration()`.
+ """
+ # Peer may prematurely end this RPC at any point. We need a corutine
+ # that watches if the server sends the final status.
+ status_task = self._loop.create_task(self._handle_status_once_received())
+
+ try:
+ # Sends out initial_metadata ASAP.
+ await _send_initial_metadata(self,
+ outbound_initial_metadata,
+ self._send_initial_metadata_flags,
+ self._loop)
+ # Notify upper level that sending messages are allowed now.
+ metadata_sent_observer()
+
+ # Receives initial metadata.
+ self._set_initial_metadata(
+ await _receive_initial_metadata(self, self._loop)
+ )
+ except ExecuteBatchError as batch_error:
+ # Core should explain why this batch failed
+ await status_task
+
+ # Allow upper layer to proceed only if the status is set
+ metadata_sent_observer()
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
new file mode 100644
index 00000000000..e54e5107547
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
@@ -0,0 +1,57 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class CallbackFailureHandler:
+ cdef str _core_function_name
+ cdef object _error_details
+ cdef object _exception_type
+
+ cdef handle(self, object future)
+
+
+cdef struct CallbackContext:
+ # C struct to store callback context in the form of pointers.
+ #
+ # Attributes:
+ # functor: A grpc_completion_queue_functor represents the
+ # callback function in the only way Core understands.
+ # waiter: An asyncio.Future object that fulfills when the callback is
+ # invoked by Core.
+ # failure_handler: A CallbackFailureHandler object that called when Core
+ # returns 'success == 0' state.
+ # wrapper: A self-reference to the CallbackWrapper to help life cycle
+ # management.
+ grpc_completion_queue_functor functor
+ cpython.PyObject *waiter
+ cpython.PyObject *loop
+ cpython.PyObject *failure_handler
+ cpython.PyObject *callback_wrapper
+
+
+cdef class CallbackWrapper:
+ cdef CallbackContext context
+ cdef object _reference_of_future
+ cdef object _reference_of_failure_handler
+
+ @staticmethod
+ cdef void functor_run(
+ grpc_completion_queue_functor* functor,
+ int succeed)
+
+ cdef grpc_completion_queue_functor *c_functor(self)
+
+
+cdef class GrpcCallWrapper:
+ cdef grpc_call* call
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
new file mode 100644
index 00000000000..14a0098fc20
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
@@ -0,0 +1,185 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class CallbackFailureHandler:
+
+ def __cinit__(self,
+ str core_function_name,
+ object error_details,
+ object exception_type):
+ """Handles failure by raising exception."""
+ self._core_function_name = core_function_name
+ self._error_details = error_details
+ self._exception_type = exception_type
+
+ cdef handle(self, object future):
+ future.set_exception(self._exception_type(
+ 'Failed "%s": %s' % (self._core_function_name, self._error_details)
+ ))
+
+
+cdef class CallbackWrapper:
+
+ def __cinit__(self, object future, object loop, CallbackFailureHandler failure_handler):
+ self.context.functor.functor_run = self.functor_run
+ self.context.waiter = <cpython.PyObject*>future
+ self.context.loop = <cpython.PyObject*>loop
+ self.context.failure_handler = <cpython.PyObject*>failure_handler
+ self.context.callback_wrapper = <cpython.PyObject*>self
+ # NOTE(lidiz) Not using a list here, because this class is critical in
+ # data path. We should make it as efficient as possible.
+ self._reference_of_future = future
+ self._reference_of_failure_handler = failure_handler
+ # NOTE(lidiz) We need to ensure when Core invokes our callback, the
+ # callback function itself is not deallocated. Othersise, we will get
+ # a segfault. We can view this as Core holding a ref.
+ cpython.Py_INCREF(self)
+
+ @staticmethod
+ cdef void functor_run(
+ grpc_completion_queue_functor* functor,
+ int success):
+ cdef CallbackContext *context = <CallbackContext *>functor
+ cdef object waiter = <object>context.waiter
+ if not waiter.cancelled():
+ if success == 0:
+ (<CallbackFailureHandler>context.failure_handler).handle(waiter)
+ else:
+ waiter.set_result(None)
+ cpython.Py_DECREF(<object>context.callback_wrapper)
+
+ cdef grpc_completion_queue_functor *c_functor(self):
+ return &self.context.functor
+
+
+cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler(
+ 'grpc_completion_queue_shutdown',
+ 'Unknown',
+ InternalError)
+
+
+class ExecuteBatchError(InternalError):
+ """Raised when execute batch returns a failure from Core."""
+
+
+async def execute_batch(GrpcCallWrapper grpc_call_wrapper,
+ tuple operations,
+ object loop):
+ """The callback version of start batch operations."""
+ cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None)
+ batch_operation_tag.prepare()
+
+ cdef object future = loop.create_future()
+ cdef CallbackWrapper wrapper = CallbackWrapper(
+ future,
+ loop,
+ CallbackFailureHandler('execute_batch', operations, ExecuteBatchError))
+ cdef grpc_call_error error = grpc_call_start_batch(
+ grpc_call_wrapper.call,
+ batch_operation_tag.c_ops,
+ batch_operation_tag.c_nops,
+ wrapper.c_functor(), NULL)
+
+ if error != GRPC_CALL_OK:
+ grpc_call_error_string = grpc_call_error_to_string(error).decode()
+ raise ExecuteBatchError("Failed grpc_call_start_batch: {} with grpc_call_error value: '{}'".format(error, grpc_call_error_string))
+
+ await future
+
+ cdef grpc_event c_event
+ # Tag.event must be called, otherwise messages won't be parsed from C
+ batch_operation_tag.event(c_event)
+
+
+cdef prepend_send_initial_metadata_op(tuple ops, tuple metadata):
+ # Eventually, this function should be the only function that produces
+ # SendInitialMetadataOperation. So we have more control over the flag.
+ return (SendInitialMetadataOperation(
+ metadata,
+ _EMPTY_FLAG
+ ),) + ops
+
+
+async def _receive_message(GrpcCallWrapper grpc_call_wrapper,
+ object loop):
+ """Retrives parsed messages from Core.
+
+ The messages maybe already in Core's buffer, so there isn't a 1-to-1
+ mapping between this and the underlying "socket.read()". Also, eventually,
+ this function will end with an EOF, which reads empty message.
+ """
+ cdef ReceiveMessageOperation receive_op = ReceiveMessageOperation(_EMPTY_FLAG)
+ cdef tuple ops = (receive_op,)
+ try:
+ await execute_batch(grpc_call_wrapper, ops, loop)
+ except ExecuteBatchError as e:
+ # NOTE(lidiz) The receive message operation has two ways to indicate
+ # finish state : 1) returns empty message due to EOF; 2) fails inside
+ # the callback (e.g. cancelled).
+ #
+ # Since they all indicates finish, they are better be merged.
+ _LOGGER.debug('Failed to receive any message from Core')
+ # NOTE(lidiz) The returned message might be an empty bytes (aka. b'').
+ # Please explicitly check if it is None or falsey string object!
+ return receive_op.message()
+
+
+async def _send_message(GrpcCallWrapper grpc_call_wrapper,
+ bytes message,
+ Operation send_initial_metadata_op,
+ int write_flag,
+ object loop):
+ cdef SendMessageOperation op = SendMessageOperation(message, write_flag)
+ cdef tuple ops = (op,)
+ if send_initial_metadata_op is not None:
+ ops = (send_initial_metadata_op,) + ops
+ await execute_batch(grpc_call_wrapper, ops, loop)
+
+
+async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper,
+ tuple metadata,
+ int flags,
+ object loop):
+ cdef SendInitialMetadataOperation op = SendInitialMetadataOperation(
+ metadata,
+ flags)
+ cdef tuple ops = (op,)
+ await execute_batch(grpc_call_wrapper, ops, loop)
+
+
+async def _receive_initial_metadata(GrpcCallWrapper grpc_call_wrapper,
+ object loop):
+ cdef ReceiveInitialMetadataOperation op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
+ cdef tuple ops = (op,)
+ await execute_batch(grpc_call_wrapper, ops, loop)
+ return op.initial_metadata()
+
+async def _send_error_status_from_server(GrpcCallWrapper grpc_call_wrapper,
+ grpc_status_code code,
+ str details,
+ tuple trailing_metadata,
+ Operation send_initial_metadata_op,
+ object loop):
+ assert code != StatusCode.ok, 'Expecting non-ok status code.'
+ cdef SendStatusFromServerOperation op = SendStatusFromServerOperation(
+ trailing_metadata,
+ code,
+ details,
+ _EMPTY_FLAGS,
+ )
+ cdef tuple ops = (op,)
+ if send_initial_metadata_op is not None:
+ ops = (send_initial_metadata_op,) + ops
+ await execute_batch(grpc_call_wrapper, ops, loop)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pxd.pxi
new file mode 100644
index 00000000000..03b4990e488
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pxd.pxi
@@ -0,0 +1,27 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef enum AioChannelStatus:
+ AIO_CHANNEL_STATUS_UNKNOWN
+ AIO_CHANNEL_STATUS_READY
+ AIO_CHANNEL_STATUS_CLOSING
+ AIO_CHANNEL_STATUS_DESTROYED
+
+cdef class AioChannel:
+ cdef:
+ grpc_channel * channel
+ object loop
+ bytes _target
+ AioChannelStatus _status
+ bint _is_secure
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
new file mode 100644
index 00000000000..4286ab1d271
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
@@ -0,0 +1,135 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+class _WatchConnectivityFailed(Exception):
+ """Dedicated exception class for watch connectivity failed.
+
+ It might be failed due to deadline exceeded.
+ """
+cdef CallbackFailureHandler _WATCH_CONNECTIVITY_FAILURE_HANDLER = CallbackFailureHandler(
+ 'watch_connectivity_state',
+ 'Timed out',
+ _WatchConnectivityFailed)
+
+
+cdef class AioChannel:
+ def __cinit__(self, bytes target, tuple options, ChannelCredentials credentials, object loop):
+ init_grpc_aio()
+ if options is None:
+ options = ()
+ cdef _ChannelArgs channel_args = _ChannelArgs(options)
+ self._target = target
+ self.loop = loop
+ self._status = AIO_CHANNEL_STATUS_READY
+
+ if credentials is None:
+ self._is_secure = False
+ creds = grpc_insecure_credentials_create();
+ self.channel = grpc_channel_create(<char *>target,
+ creds,
+ channel_args.c_args())
+ grpc_channel_credentials_release(creds)
+ else:
+ self._is_secure = True
+ creds = <grpc_channel_credentials *> credentials.c()
+ self.channel = grpc_channel_create(<char *>target,
+ creds,
+ channel_args.c_args())
+ grpc_channel_credentials_release(creds)
+
+ def __dealloc__(self):
+ shutdown_grpc_aio()
+
+ def __repr__(self):
+ class_name = self.__class__.__name__
+ id_ = id(self)
+ return f"<{class_name} {id_}>"
+
+ def check_connectivity_state(self, bint try_to_connect):
+ """A Cython wrapper for Core's check connectivity state API."""
+ if self._status == AIO_CHANNEL_STATUS_DESTROYED:
+ return ConnectivityState.shutdown
+ else:
+ return grpc_channel_check_connectivity_state(
+ self.channel,
+ try_to_connect,
+ )
+
+ async def watch_connectivity_state(self,
+ grpc_connectivity_state last_observed_state,
+ object deadline):
+ """Watch for one connectivity state change.
+
+ Keeps mirroring the behavior from Core, so we can easily switch to
+ other design of API if necessary.
+ """
+ if self._status in (AIO_CHANNEL_STATUS_DESTROYED, AIO_CHANNEL_STATUS_CLOSING):
+ raise UsageError('Channel is closed.')
+
+ cdef gpr_timespec c_deadline = _timespec_from_time(deadline)
+
+ cdef object future = self.loop.create_future()
+ cdef CallbackWrapper wrapper = CallbackWrapper(
+ future,
+ self.loop,
+ _WATCH_CONNECTIVITY_FAILURE_HANDLER)
+ grpc_channel_watch_connectivity_state(
+ self.channel,
+ last_observed_state,
+ c_deadline,
+ global_completion_queue(),
+ wrapper.c_functor())
+
+ try:
+ await future
+ except _WatchConnectivityFailed:
+ return False
+ else:
+ return True
+
+ def closing(self):
+ self._status = AIO_CHANNEL_STATUS_CLOSING
+
+ def close(self):
+ self._status = AIO_CHANNEL_STATUS_DESTROYED
+ grpc_channel_destroy(self.channel)
+
+ def closed(self):
+ return self._status in (AIO_CHANNEL_STATUS_CLOSING, AIO_CHANNEL_STATUS_DESTROYED)
+
+ def call(self,
+ bytes method,
+ object deadline,
+ object python_call_credentials,
+ object wait_for_ready):
+ """Assembles a Cython Call object.
+
+ Returns:
+ An _AioCall object.
+ """
+ if self.closed():
+ raise UsageError('Channel is closed.')
+
+ cdef CallCredentials cython_call_credentials
+ if python_call_credentials is not None:
+ if not self._is_secure:
+ raise UsageError("Call credentials are only valid on secure channels")
+
+ cython_call_credentials = python_call_credentials._credentials
+ else:
+ cython_call_credentials = None
+
+ return _AioCall(self, deadline, method, cython_call_credentials, wait_for_ready)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi
new file mode 100644
index 00000000000..f698390cd5c
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/common.pyx.pxi
@@ -0,0 +1,202 @@
+# Copyright 2019 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION
+
+TYPE_METADATA_STRING = "Tuple[Tuple[str, Union[str, bytes]]...]"
+
+
+cdef grpc_status_code get_status_code(object code) except *:
+ if isinstance(code, int):
+ if code >= StatusCode.ok and code <= StatusCode.data_loss:
+ return code
+ else:
+ return StatusCode.unknown
+ else:
+ try:
+ return code.value[0]
+ except (KeyError, AttributeError):
+ return StatusCode.unknown
+
+
+cdef object deserialize(object deserializer, bytes raw_message):
+ """Perform deserialization on raw bytes.
+
+ Failure to deserialize is a fatal error.
+ """
+ if deserializer:
+ return deserializer(raw_message)
+ else:
+ return raw_message
+
+
+cdef bytes serialize(object serializer, object message):
+ """Perform serialization on a message.
+
+ Failure to serialize is a fatal error.
+ """
+ if isinstance(message, str):
+ message = message.encode('utf-8')
+ if serializer:
+ return serializer(message)
+ else:
+ return message
+
+
+class _EOF:
+
+ def __bool__(self):
+ return False
+
+ def __len__(self):
+ return 0
+
+ def _repr(self) -> str:
+ return '<grpc.aio.EOF>'
+
+ def __repr__(self) -> str:
+ return self._repr()
+
+ def __str__(self) -> str:
+ return self._repr()
+
+
+EOF = _EOF()
+
+_COMPRESSION_METADATA_STRING_MAPPING = {
+ CompressionAlgorithm.none: 'identity',
+ CompressionAlgorithm.deflate: 'deflate',
+ CompressionAlgorithm.gzip: 'gzip',
+}
+
+class BaseError(Exception):
+ """The base class for exceptions generated by gRPC AsyncIO stack."""
+
+
+class UsageError(BaseError):
+ """Raised when the usage of API by applications is inappropriate.
+
+ For example, trying to invoke RPC on a closed channel, mixing two styles
+ of streaming API on the client side. This exception should not be
+ suppressed.
+ """
+
+
+class AbortError(BaseError):
+ """Raised when calling abort in servicer methods.
+
+ This exception should not be suppressed. Applications may catch it to
+ perform certain clean-up logic, and then re-raise it.
+ """
+
+
+class InternalError(BaseError):
+ """Raised upon unexpected errors in native code."""
+
+
+def schedule_coro_threadsafe(object coro, object loop):
+ try:
+ return loop.create_task(coro)
+ except RuntimeError as runtime_error:
+ if 'Non-thread-safe operation' in str(runtime_error):
+ return asyncio.run_coroutine_threadsafe(
+ coro,
+ loop,
+ )
+ else:
+ raise
+
+
+def async_generator_to_generator(object agen, object loop):
+ """Converts an async generator into generator."""
+ try:
+ while True:
+ future = asyncio.run_coroutine_threadsafe(
+ agen.__anext__(),
+ loop
+ )
+ response = future.result()
+ if response is EOF:
+ break
+ else:
+ yield response
+ except StopAsyncIteration:
+ # If StopAsyncIteration is raised, end this generator.
+ pass
+
+
+async def generator_to_async_generator(object gen, object loop, object thread_pool):
+ """Converts a generator into async generator.
+
+ The generator might block, so we need to delegate the iteration to thread
+ pool. Also, we can't simply delegate __next__ to the thread pool, otherwise
+ we will see following error:
+
+ TypeError: StopIteration interacts badly with generators and cannot be
+ raised into a Future
+ """
+ queue = asyncio.Queue(maxsize=1)
+
+ def yield_to_queue():
+ try:
+ for item in gen:
+ asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
+ finally:
+ asyncio.run_coroutine_threadsafe(queue.put(EOF), loop).result()
+
+ future = loop.run_in_executor(
+ thread_pool,
+ yield_to_queue,
+ )
+
+ while True:
+ response = await queue.get()
+ if response is EOF:
+ break
+ else:
+ yield response
+
+ # Port the exception if there is any
+ await future
+
+
+if PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7:
+ def get_working_loop():
+ """Returns a running event loop.
+
+ Due to a defect of asyncio.get_event_loop, its returned event loop might
+ not be set as the default event loop for the main thread.
+ """
+ try:
+ return asyncio.get_running_loop()
+ except RuntimeError:
+ return asyncio.get_event_loop_policy().get_event_loop()
+else:
+ def get_working_loop():
+ """Returns a running event loop."""
+ return asyncio.get_event_loop()
+
+
+def raise_if_not_valid_trailing_metadata(object metadata):
+ if not hasattr(metadata, '__iter__') or isinstance(metadata, dict):
+ raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
+ for item in metadata:
+ if not isinstance(item, tuple):
+ raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
+ if len(item) != 2:
+ raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
+ if not isinstance(item[0], str):
+ raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
+ if not isinstance(item[1], str) and not isinstance(item[1], bytes):
+ raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
new file mode 100644
index 00000000000..578131f7eef
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
@@ -0,0 +1,52 @@
+# Copyright 2020 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+ctypedef queue[grpc_event] cpp_event_queue
+
+
+IF UNAME_SYSNAME == "Windows":
+ cdef extern from "winsock2.h" nogil:
+ ctypedef uint32_t WIN_SOCKET "SOCKET"
+ WIN_SOCKET win_socket "socket" (int af, int type, int protocol)
+ int win_socket_send "send" (WIN_SOCKET s, const char *buf, int len, int flags)
+
+
+cdef void _unified_socket_write(int fd) nogil
+
+
+cdef class BaseCompletionQueue:
+ cdef grpc_completion_queue *_cq
+
+ cdef grpc_completion_queue* c_ptr(self)
+
+
+cdef class _BoundEventLoop:
+ cdef readonly object loop
+ cdef readonly object read_socket # socket.socket
+ cdef bint _has_reader
+
+
+cdef class PollerCompletionQueue(BaseCompletionQueue):
+ cdef bint _shutdown
+ cdef cpp_event_queue _queue
+ cdef mutex _queue_mutex
+ cdef object _poller_thread # threading.Thread
+ cdef int _write_fd
+ cdef object _read_socket # socket.socket
+ cdef object _write_socket # socket.socket
+ cdef dict _loops # Mapping[asyncio.AbstractLoop, _BoundEventLoop]
+
+ cdef void _poll(self) nogil
+ cdef shutdown(self)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
new file mode 100644
index 00000000000..b9132c85602
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
@@ -0,0 +1,174 @@
+# Copyright 2020 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import socket
+
+cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME)
+cdef float _POLL_AWAKE_INTERVAL_S = 0.2
+
+# This bool indicates if the event loop impl can monitor a given fd, or has
+# loop.add_reader method.
+cdef bint _has_fd_monitoring = True
+
+IF UNAME_SYSNAME == "Windows":
+ cdef void _unified_socket_write(int fd) nogil:
+ win_socket_send(<WIN_SOCKET>fd, b"1", 1, 0)
+ELSE:
+ cimport posix.unistd as unistd
+
+ cdef void _unified_socket_write(int fd) nogil:
+ unistd.write(fd, b"1", 1)
+
+
+def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success):
+ CallbackWrapper.functor_run(callback_wrapper.c_functor(), success)
+
+
+cdef class BaseCompletionQueue:
+
+ cdef grpc_completion_queue* c_ptr(self):
+ return self._cq
+
+
+cdef class _BoundEventLoop:
+
+ def __cinit__(self, object loop, object read_socket, object handler):
+ global _has_fd_monitoring
+ self.loop = loop
+ self.read_socket = read_socket
+ reader_function = functools.partial(
+ handler,
+ loop
+ )
+ # NOTE(lidiz) There isn't a way to cleanly pre-check if fd monitoring
+ # support is available or not. Checking the event loop policy is not
+ # good enough. The application can has its own loop implementation, or
+ # uses different types of event loops (e.g., 1 Proactor, 3 Selectors).
+ if _has_fd_monitoring:
+ try:
+ self.loop.add_reader(self.read_socket, reader_function)
+ self._has_reader = True
+ except NotImplementedError:
+ _has_fd_monitoring = False
+ self._has_reader = False
+
+ def close(self):
+ if self.loop:
+ if self._has_reader:
+ self.loop.remove_reader(self.read_socket)
+
+
+cdef class PollerCompletionQueue(BaseCompletionQueue):
+
+ def __cinit__(self):
+ self._cq = grpc_completion_queue_create_for_next(NULL)
+ self._shutdown = False
+ self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
+ self._poller_thread.start()
+
+ self._read_socket, self._write_socket = socket.socketpair()
+ self._write_fd = self._write_socket.fileno()
+ self._loops = {}
+
+ # The read socket might be read by multiple threads. But only one of them will
+ # read the 1 byte sent by the poller thread. This setting is essential to allow
+ # multiple loops in multiple threads bound to the same poller.
+ self._read_socket.setblocking(False)
+
+ self._queue = cpp_event_queue()
+
+ def bind_loop(self, object loop):
+ if loop in self._loops:
+ return
+ else:
+ self._loops[loop] = _BoundEventLoop(loop, self._read_socket, self._handle_events)
+
+ cdef void _poll(self) nogil:
+ cdef grpc_event event
+ cdef CallbackContext *context
+
+ while not self._shutdown:
+ event = grpc_completion_queue_next(self._cq,
+ _GPR_INF_FUTURE,
+ NULL)
+
+ if event.type == GRPC_QUEUE_TIMEOUT:
+ with gil:
+ raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!")
+ elif event.type == GRPC_QUEUE_SHUTDOWN:
+ self._shutdown = True
+ else:
+ self._queue_mutex.lock()
+ self._queue.push(event)
+ self._queue_mutex.unlock()
+ if _has_fd_monitoring:
+ _unified_socket_write(self._write_fd)
+ else:
+ with gil:
+ # Event loops can be paused or killed at any time. So,
+ # instead of deligate to any thread, the polling thread
+ # should handle the distribution of the event.
+ self._handle_events(None)
+
+ def _poll_wrapper(self):
+ with nogil:
+ self._poll()
+
+ cdef shutdown(self):
+ # Removes the socket hook from loops
+ for loop in self._loops:
+ self._loops.get(loop).close()
+
+ # TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown
+ grpc_completion_queue_shutdown(self._cq)
+ while not self._shutdown:
+ self._poller_thread.join(timeout=_POLL_AWAKE_INTERVAL_S)
+ grpc_completion_queue_destroy(self._cq)
+
+ # Clean up socket resources
+ self._read_socket.close()
+ self._write_socket.close()
+
+ def _handle_events(self, object context_loop):
+ cdef bytes data
+ if _has_fd_monitoring:
+ # If fd monitoring is working, clean the socket without blocking.
+ data = self._read_socket.recv(1)
+ cdef grpc_event event
+ cdef CallbackContext *context
+
+ while True:
+ self._queue_mutex.lock()
+ if self._queue.empty():
+ self._queue_mutex.unlock()
+ break
+ else:
+ event = self._queue.front()
+ self._queue.pop()
+ self._queue_mutex.unlock()
+
+ context = <CallbackContext *>event.tag
+ loop = <object>context.loop
+ if loop is context_loop:
+ # Executes callbacks: complete the future
+ CallbackWrapper.functor_run(
+ <grpc_completion_queue_functor *>event.tag,
+ event.success
+ )
+ else:
+ loop.call_soon_threadsafe(
+ _handle_callback_wrapper,
+ <CallbackWrapper>context.callback_wrapper,
+ event.success
+ )
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi
new file mode 100644
index 00000000000..ebf0660174d
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi
@@ -0,0 +1,43 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# distutils: language=c++
+
+cdef class _AioState:
+ cdef object lock # threading.RLock
+ cdef int refcount
+ cdef object engine # AsyncIOEngine
+ cdef BaseCompletionQueue cq
+
+
+cdef grpc_completion_queue *global_completion_queue()
+
+
+cpdef init_grpc_aio()
+
+
+cpdef shutdown_grpc_aio()
+
+
+cdef extern from "src/core/lib/iomgr/timer_manager.h":
+ void grpc_timer_manager_set_threading(bint enabled)
+
+
+cdef extern from "src/core/lib/iomgr/iomgr_internal.h":
+ void grpc_set_default_iomgr_platform()
+
+
+cdef extern from "src/core/lib/iomgr/executor.h" namespace "grpc_core":
+ cdef cppclass Executor:
+ @staticmethod
+ void SetThreadingAll(bint enable)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
new file mode 100644
index 00000000000..7f9f52da7c0
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
@@ -0,0 +1,114 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import enum
+
+cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'poller').upper()
+cdef _AioState _global_aio_state = _AioState()
+
+
+class AsyncIOEngine(enum.Enum):
+ # NOTE(lidiz) the support for custom_io_manager is removed in favor of the
+ # EventEngine project, which will be the only IO platform in Core.
+ CUSTOM_IO_MANAGER = 'custom_io_manager'
+ POLLER = 'poller'
+
+
+cdef _default_asyncio_engine():
+ return AsyncIOEngine.POLLER
+
+
+cdef grpc_completion_queue *global_completion_queue():
+ return _global_aio_state.cq.c_ptr()
+
+
+cdef class _AioState:
+
+ def __cinit__(self):
+ self.lock = threading.RLock()
+ self.refcount = 0
+ self.engine = None
+ self.cq = None
+
+
+cdef _initialize_poller():
+ # Initializes gRPC Core, must be called before other Core API
+ grpc_init()
+
+ # Creates the only completion queue
+ _global_aio_state.cq = PollerCompletionQueue()
+
+
+cdef _actual_aio_initialization():
+ # Picks the engine for gRPC AsyncIO Stack
+ _global_aio_state.engine = AsyncIOEngine.__members__.get(
+ _GRPC_ASYNCIO_ENGINE,
+ _default_asyncio_engine(),
+ )
+ _LOGGER.debug('Using %s as I/O engine', _global_aio_state.engine)
+
+ # Initializes the process-level state accordingly
+ if _global_aio_state.engine is AsyncIOEngine.POLLER:
+ _initialize_poller()
+ else:
+ raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine)
+
+
+def _grpc_shutdown_wrapper(_):
+ """A thin Python wrapper of Core's shutdown function.
+
+ Define functions are not allowed in "cdef" functions, and Cython complains
+ about a simple lambda with a C function.
+ """
+ grpc_shutdown()
+
+
+cdef _actual_aio_shutdown():
+ if _global_aio_state.engine is AsyncIOEngine.POLLER:
+ (<PollerCompletionQueue>_global_aio_state.cq).shutdown()
+ grpc_shutdown()
+ else:
+ raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine)
+
+
+cdef _initialize_per_loop():
+ cdef object loop = get_working_loop()
+ if _global_aio_state.engine is AsyncIOEngine.POLLER:
+ _global_aio_state.cq.bind_loop(loop)
+
+
+cpdef init_grpc_aio():
+ """Initializes the gRPC AsyncIO module.
+
+ Expected to be invoked on critical class constructors.
+ E.g., AioChannel, AioServer.
+ """
+ with _global_aio_state.lock:
+ _global_aio_state.refcount += 1
+ if _global_aio_state.refcount == 1:
+ _actual_aio_initialization()
+ _initialize_per_loop()
+
+
+cpdef shutdown_grpc_aio():
+ """Shuts down the gRPC AsyncIO module.
+
+ Expected to be invoked on critical class destructors.
+ E.g., AioChannel, AioServer.
+ """
+ with _global_aio_state.lock:
+ assert _global_aio_state.refcount > 0
+ _global_aio_state.refcount -= 1
+ if not _global_aio_state.refcount:
+ _actual_aio_shutdown()
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi
new file mode 100644
index 00000000000..3780d8ddf2f
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi
@@ -0,0 +1,29 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Exceptions for the aio version of the RPC calls."""
+
+
+cdef class AioRpcStatus(Exception):
+ cdef readonly:
+ grpc_status_code _code
+ str _details
+ # Per the spec, only client-side status has trailing metadata.
+ tuple _trailing_metadata
+ str _debug_error_string
+
+ cpdef grpc_status_code code(self)
+ cpdef str details(self)
+ cpdef tuple trailing_metadata(self)
+ cpdef str debug_error_string(self)
+ cdef grpc_status_code c_code(self)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pyx.pxi
new file mode 100644
index 00000000000..07669fc1575
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/rpc_status.pyx.pxi
@@ -0,0 +1,44 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Exceptions for the aio version of the RPC calls."""
+
+
+cdef class AioRpcStatus(Exception):
+
+ # The final status of gRPC is represented by three trailing metadata:
+ # `grpc-status`, `grpc-status-message`, abd `grpc-status-details`.
+ def __cinit__(self,
+ grpc_status_code code,
+ str details,
+ tuple trailing_metadata,
+ str debug_error_string):
+ self._code = code
+ self._details = details
+ self._trailing_metadata = trailing_metadata
+ self._debug_error_string = debug_error_string
+
+ cpdef grpc_status_code code(self):
+ return self._code
+
+ cpdef str details(self):
+ return self._details
+
+ cpdef tuple trailing_metadata(self):
+ return self._trailing_metadata
+
+ cpdef str debug_error_string(self):
+ return self._debug_error_string
+
+ cdef grpc_status_code c_code(self):
+ return <grpc_status_code>self._code
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi
new file mode 100644
index 00000000000..fe10c3883c3
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pxd.pxi
@@ -0,0 +1,92 @@
+# Copyright 2019 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef class _HandlerCallDetails:
+ cdef readonly str method
+ cdef readonly tuple invocation_metadata
+
+
+cdef class RPCState(GrpcCallWrapper):
+ cdef grpc_call_details details
+ cdef grpc_metadata_array request_metadata
+ cdef AioServer server
+ # NOTE(lidiz) Under certain corner case, receiving the client close
+ # operation won't immediately fail ongoing RECV_MESSAGE operations. Here I
+ # added a flag to workaround this unexpected behavior.
+ cdef bint client_closed
+ cdef object abort_exception
+ cdef bint metadata_sent
+ cdef bint status_sent
+ cdef grpc_status_code status_code
+ cdef str status_details
+ cdef tuple trailing_metadata
+ cdef object compression_algorithm
+ cdef bint disable_next_compression
+ cdef object callbacks
+
+ cdef bytes method(self)
+ cdef tuple invocation_metadata(self)
+ cdef void raise_for_termination(self) except *
+ cdef int get_write_flag(self)
+ cdef Operation create_send_initial_metadata_op_if_not_sent(self)
+
+
+cdef class _ServicerContext:
+ cdef RPCState _rpc_state
+ cdef object _loop # asyncio.AbstractEventLoop
+ cdef object _request_deserializer # Callable[[bytes], Any]
+ cdef object _response_serializer # Callable[[Any], bytes]
+
+
+cdef class _SyncServicerContext:
+ cdef _ServicerContext _context
+ cdef list _callbacks
+ cdef object _loop # asyncio.AbstractEventLoop
+
+
+cdef class _MessageReceiver:
+ cdef _ServicerContext _servicer_context
+ cdef object _agen
+
+
+cdef enum AioServerStatus:
+ AIO_SERVER_STATUS_UNKNOWN
+ AIO_SERVER_STATUS_READY
+ AIO_SERVER_STATUS_RUNNING
+ AIO_SERVER_STATUS_STOPPED
+ AIO_SERVER_STATUS_STOPPING
+
+
+cdef class _ConcurrentRpcLimiter:
+ cdef int _maximum_concurrent_rpcs
+ cdef int _active_rpcs
+ cdef object _active_rpcs_condition # asyncio.Condition
+ cdef object _loop # asyncio.EventLoop
+
+
+cdef class AioServer:
+ cdef Server _server
+ cdef list _generic_handlers
+ cdef AioServerStatus _status
+ cdef object _loop # asyncio.EventLoop
+ cdef object _serving_task # asyncio.Task
+ cdef object _shutdown_lock # asyncio.Lock
+ cdef object _shutdown_completed # asyncio.Future
+ cdef CallbackWrapper _shutdown_callback_wrapper
+ cdef object _crash_exception # Exception
+ cdef tuple _interceptors
+ cdef object _thread_pool # concurrent.futures.ThreadPoolExecutor
+ cdef _ConcurrentRpcLimiter _limiter
+
+ cdef thread_pool(self)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi
new file mode 100644
index 00000000000..e85efdd0b9f
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/aio/server.pyx.pxi
@@ -0,0 +1,1097 @@
+# Copyright 2019 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import inspect
+import traceback
+import functools
+
+
+cdef int _EMPTY_FLAG = 0
+cdef str _RPC_FINISHED_DETAILS = 'RPC already finished.'
+cdef str _SERVER_STOPPED_DETAILS = 'Server already stopped.'
+
+cdef _augment_metadata(tuple metadata, object compression):
+ if compression is None:
+ return metadata
+ else:
+ return ((
+ GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY,
+ _COMPRESSION_METADATA_STRING_MAPPING[compression]
+ ),) + metadata
+
+
+cdef class _HandlerCallDetails:
+ def __cinit__(self, str method, tuple invocation_metadata):
+ self.method = method
+ self.invocation_metadata = invocation_metadata
+
+
+class _ServerStoppedError(BaseError):
+ """Raised if the server is stopped."""
+
+
+cdef class RPCState:
+
+ def __cinit__(self, AioServer server):
+ init_grpc_aio()
+ self.call = NULL
+ self.server = server
+ grpc_metadata_array_init(&self.request_metadata)
+ grpc_call_details_init(&self.details)
+ self.client_closed = False
+ self.abort_exception = None
+ self.metadata_sent = False
+ self.status_sent = False
+ self.status_code = StatusCode.ok
+ self.status_details = ''
+ self.trailing_metadata = _IMMUTABLE_EMPTY_METADATA
+ self.compression_algorithm = None
+ self.disable_next_compression = False
+ self.callbacks = []
+
+ cdef bytes method(self):
+ return _slice_bytes(self.details.method)
+
+ cdef tuple invocation_metadata(self):
+ return _metadata(&self.request_metadata)
+
+ cdef void raise_for_termination(self) except *:
+ """Raise exceptions if RPC is not running.
+
+ Server method handlers may suppress the abort exception. We need to halt
+ the RPC execution in that case. This function needs to be called after
+ running application code.
+
+ Also, the server may stop unexpected. We need to check before calling
+ into Core functions, otherwise, segfault.
+ """
+ if self.abort_exception is not None:
+ raise self.abort_exception
+ if self.status_sent:
+ raise UsageError(_RPC_FINISHED_DETAILS)
+ if self.server._status == AIO_SERVER_STATUS_STOPPED:
+ raise _ServerStoppedError(_SERVER_STOPPED_DETAILS)
+
+ cdef int get_write_flag(self):
+ if self.disable_next_compression:
+ self.disable_next_compression = False
+ return WriteFlag.no_compress
+ else:
+ return _EMPTY_FLAG
+
+ cdef Operation create_send_initial_metadata_op_if_not_sent(self):
+ cdef SendInitialMetadataOperation op
+ if self.metadata_sent:
+ return None
+ else:
+ op = SendInitialMetadataOperation(
+ _augment_metadata(_IMMUTABLE_EMPTY_METADATA, self.compression_algorithm),
+ _EMPTY_FLAG
+ )
+ return op
+
+ def __dealloc__(self):
+ """Cleans the Core objects."""
+ grpc_call_details_destroy(&self.details)
+ grpc_metadata_array_destroy(&self.request_metadata)
+ if self.call:
+ grpc_call_unref(self.call)
+ shutdown_grpc_aio()
+
+
+cdef class _ServicerContext:
+
+ def __cinit__(self,
+ RPCState rpc_state,
+ object request_deserializer,
+ object response_serializer,
+ object loop):
+ self._rpc_state = rpc_state
+ self._request_deserializer = request_deserializer
+ self._response_serializer = response_serializer
+ self._loop = loop
+
+ async def read(self):
+ cdef bytes raw_message
+ self._rpc_state.raise_for_termination()
+
+ raw_message = await _receive_message(self._rpc_state, self._loop)
+ self._rpc_state.raise_for_termination()
+
+ if raw_message is None:
+ return EOF
+ else:
+ return deserialize(self._request_deserializer,
+ raw_message)
+
+ async def write(self, object message):
+ self._rpc_state.raise_for_termination()
+
+ await _send_message(self._rpc_state,
+ serialize(self._response_serializer, message),
+ self._rpc_state.create_send_initial_metadata_op_if_not_sent(),
+ self._rpc_state.get_write_flag(),
+ self._loop)
+ self._rpc_state.metadata_sent = True
+
+ async def send_initial_metadata(self, object metadata):
+ self._rpc_state.raise_for_termination()
+
+ if self._rpc_state.metadata_sent:
+ raise UsageError('Send initial metadata failed: already sent')
+ else:
+ await _send_initial_metadata(
+ self._rpc_state,
+ _augment_metadata(tuple(metadata), self._rpc_state.compression_algorithm),
+ _EMPTY_FLAG,
+ self._loop
+ )
+ self._rpc_state.metadata_sent = True
+
+ async def abort(self,
+ object code,
+ str details='',
+ tuple trailing_metadata=_IMMUTABLE_EMPTY_METADATA):
+ if self._rpc_state.abort_exception is not None:
+ raise UsageError('Abort already called!')
+ else:
+ # Keeps track of the exception object. After abort happen, the RPC
+ # should stop execution. However, if users decided to suppress it, it
+ # could lead to undefined behavior.
+ self._rpc_state.abort_exception = AbortError('Locally aborted.')
+
+ if trailing_metadata == _IMMUTABLE_EMPTY_METADATA and self._rpc_state.trailing_metadata:
+ trailing_metadata = self._rpc_state.trailing_metadata
+ else:
+ raise_if_not_valid_trailing_metadata(trailing_metadata)
+ self._rpc_state.trailing_metadata = trailing_metadata
+
+ if details == '' and self._rpc_state.status_details:
+ details = self._rpc_state.status_details
+ else:
+ self._rpc_state.status_details = details
+
+ actual_code = get_status_code(code)
+ self._rpc_state.status_code = actual_code
+
+ self._rpc_state.status_sent = True
+ await _send_error_status_from_server(
+ self._rpc_state,
+ actual_code,
+ details,
+ trailing_metadata,
+ self._rpc_state.create_send_initial_metadata_op_if_not_sent(),
+ self._loop
+ )
+
+ raise self._rpc_state.abort_exception
+
+ async def abort_with_status(self, object status):
+ await self.abort(status.code, status.details, status.trailing_metadata)
+
+ def set_trailing_metadata(self, object metadata):
+ raise_if_not_valid_trailing_metadata(metadata)
+ self._rpc_state.trailing_metadata = tuple(metadata)
+
+ def trailing_metadata(self):
+ return self._rpc_state.trailing_metadata
+
+ def invocation_metadata(self):
+ return self._rpc_state.invocation_metadata()
+
+ def set_code(self, object code):
+ self._rpc_state.status_code = get_status_code(code)
+
+ def code(self):
+ return self._rpc_state.status_code
+
+ def set_details(self, str details):
+ self._rpc_state.status_details = details
+
+ def details(self):
+ return self._rpc_state.status_details
+
+ def set_compression(self, object compression):
+ if self._rpc_state.metadata_sent:
+ raise RuntimeError('Compression setting must be specified before sending initial metadata')
+ else:
+ self._rpc_state.compression_algorithm = compression
+
+ def disable_next_message_compression(self):
+ self._rpc_state.disable_next_compression = True
+
+ def peer(self):
+ cdef char *c_peer = NULL
+ c_peer = grpc_call_get_peer(self._rpc_state.call)
+ peer = (<bytes>c_peer).decode('utf8')
+ gpr_free(c_peer)
+ return peer
+
+ def peer_identities(self):
+ cdef Call query_call = Call()
+ query_call.c_call = self._rpc_state.call
+ identities = peer_identities(query_call)
+ query_call.c_call = NULL
+ return identities
+
+ def peer_identity_key(self):
+ cdef Call query_call = Call()
+ query_call.c_call = self._rpc_state.call
+ identity_key = peer_identity_key(query_call)
+ query_call.c_call = NULL
+ if identity_key:
+ return identity_key.decode('utf8')
+ else:
+ return None
+
+ def auth_context(self):
+ cdef Call query_call = Call()
+ query_call.c_call = self._rpc_state.call
+ bytes_ctx = auth_context(query_call)
+ query_call.c_call = NULL
+ if bytes_ctx:
+ ctx = {}
+ for key in bytes_ctx:
+ ctx[key.decode('utf8')] = bytes_ctx[key]
+ return ctx
+ else:
+ return {}
+
+ def time_remaining(self):
+ if self._rpc_state.details.deadline.seconds == _GPR_INF_FUTURE.seconds:
+ return None
+ else:
+ return max(_time_from_timespec(self._rpc_state.details.deadline) - time.time(), 0)
+
+ def add_done_callback(self, callback):
+ cb = functools.partial(callback, self)
+ self._rpc_state.callbacks.append(cb)
+
+ def done(self):
+ return self._rpc_state.status_sent
+
+ def cancelled(self):
+ return self._rpc_state.status_code == StatusCode.cancelled
+
+
+cdef class _SyncServicerContext:
+ """Sync servicer context for sync handler compatibility."""
+
+ def __cinit__(self,
+ _ServicerContext context):
+ self._context = context
+ self._callbacks = []
+ self._loop = context._loop
+
+ def abort(self,
+ object code,
+ str details='',
+ tuple trailing_metadata=_IMMUTABLE_EMPTY_METADATA):
+ future = asyncio.run_coroutine_threadsafe(
+ self._context.abort(code, details, trailing_metadata),
+ self._loop)
+ # Abort should raise an AbortError
+ future.exception()
+
+ def send_initial_metadata(self, object metadata):
+ future = asyncio.run_coroutine_threadsafe(
+ self._context.send_initial_metadata(metadata),
+ self._loop)
+ future.result()
+
+ def set_trailing_metadata(self, object metadata):
+ self._context.set_trailing_metadata(metadata)
+
+ def invocation_metadata(self):
+ return self._context.invocation_metadata()
+
+ def set_code(self, object code):
+ self._context.set_code(code)
+
+ def set_details(self, str details):
+ self._context.set_details(details)
+
+ def set_compression(self, object compression):
+ self._context.set_compression(compression)
+
+ def disable_next_message_compression(self):
+ self._context.disable_next_message_compression()
+
+ def add_callback(self, object callback):
+ self._callbacks.append(callback)
+
+ def peer(self):
+ return self._context.peer()
+
+ def peer_identities(self):
+ return self._context.peer_identities()
+
+ def peer_identity_key(self):
+ return self._context.peer_identity_key()
+
+ def auth_context(self):
+ return self._context.auth_context()
+
+ def time_remaining(self):
+ return self._context.time_remaining()
+
+
+async def _run_interceptor(object interceptors, object query_handler,
+ object handler_call_details):
+ interceptor = next(interceptors, None)
+ if interceptor:
+ continuation = functools.partial(_run_interceptor, interceptors,
+ query_handler)
+ return await interceptor.intercept_service(continuation, handler_call_details)
+ else:
+ return query_handler(handler_call_details)
+
+
+def _is_async_handler(object handler):
+ """Inspect if a method handler is async or sync."""
+ return inspect.isawaitable(handler) or inspect.iscoroutinefunction(handler) or inspect.isasyncgenfunction(handler)
+
+
+async def _find_method_handler(str method, tuple metadata, list generic_handlers,
+ tuple interceptors):
+ def query_handlers(handler_call_details):
+ for generic_handler in generic_handlers:
+ method_handler = generic_handler.service(handler_call_details)
+ if method_handler is not None:
+ return method_handler
+ return None
+
+ cdef _HandlerCallDetails handler_call_details = _HandlerCallDetails(method,
+ metadata)
+ # interceptor
+ if interceptors:
+ return await _run_interceptor(iter(interceptors), query_handlers,
+ handler_call_details)
+ else:
+ return query_handlers(handler_call_details)
+
+
+async def _finish_handler_with_unary_response(RPCState rpc_state,
+ object unary_handler,
+ object request,
+ _ServicerContext servicer_context,
+ object response_serializer,
+ object loop):
+ """Finishes server method handler with a single response.
+
+ This function executes the application handler, and handles response
+ sending, as well as errors. It is shared between unary-unary and
+ stream-unary handlers.
+ """
+ # Executes application logic
+ cdef object response_message
+ cdef _SyncServicerContext sync_servicer_context
+
+ if _is_async_handler(unary_handler):
+ # Run async method handlers in this coroutine
+ response_message = await unary_handler(
+ request,
+ servicer_context,
+ )
+ else:
+ # Run sync method handlers in the thread pool
+ sync_servicer_context = _SyncServicerContext(servicer_context)
+ response_message = await loop.run_in_executor(
+ rpc_state.server.thread_pool(),
+ unary_handler,
+ request,
+ sync_servicer_context,
+ )
+ # Support sync-stack callback
+ for callback in sync_servicer_context._callbacks:
+ callback()
+
+ # Raises exception if aborted
+ rpc_state.raise_for_termination()
+
+ # Serializes the response message
+ cdef bytes response_raw
+ if rpc_state.status_code == StatusCode.ok:
+ response_raw = serialize(
+ response_serializer,
+ response_message,
+ )
+ else:
+ # Discards the response message if the status code is non-OK.
+ response_raw = b''
+
+ # Assembles the batch operations
+ cdef tuple finish_ops
+ finish_ops = (
+ SendMessageOperation(response_raw, rpc_state.get_write_flag()),
+ SendStatusFromServerOperation(
+ rpc_state.trailing_metadata,
+ rpc_state.status_code,
+ rpc_state.status_details,
+ _EMPTY_FLAGS,
+ ),
+ )
+ if not rpc_state.metadata_sent:
+ finish_ops = prepend_send_initial_metadata_op(
+ finish_ops,
+ None)
+ rpc_state.metadata_sent = True
+ rpc_state.status_sent = True
+ await execute_batch(rpc_state, finish_ops, loop)
+
+
+async def _finish_handler_with_stream_responses(RPCState rpc_state,
+ object stream_handler,
+ object request,
+ _ServicerContext servicer_context,
+ object loop):
+ """Finishes server method handler with multiple responses.
+
+ This function executes the application handler, and handles response
+ sending, as well as errors. It is shared between unary-stream and
+ stream-stream handlers.
+ """
+ cdef object async_response_generator
+ cdef object response_message
+
+ if inspect.iscoroutinefunction(stream_handler):
+ # Case 1: Coroutine async handler - using reader-writer API
+ # The handler uses reader / writer API, returns None.
+ await stream_handler(
+ request,
+ servicer_context,
+ )
+ else:
+ if inspect.isasyncgenfunction(stream_handler):
+ # Case 2: Async handler - async generator
+ # The handler uses async generator API
+ async_response_generator = stream_handler(
+ request,
+ servicer_context,
+ )
+ else:
+ # Case 3: Sync handler - normal generator
+ # NOTE(lidiz) Streaming handler in sync stack is either a generator
+ # function or a function returns a generator.
+ sync_servicer_context = _SyncServicerContext(servicer_context)
+ gen = stream_handler(request, sync_servicer_context)
+ async_response_generator = generator_to_async_generator(gen,
+ loop,
+ rpc_state.server.thread_pool())
+
+ # Consumes messages from the generator
+ async for response_message in async_response_generator:
+ # Raises exception if aborted
+ rpc_state.raise_for_termination()
+
+ await servicer_context.write(response_message)
+
+ # Raises exception if aborted
+ rpc_state.raise_for_termination()
+
+ # Sends the final status of this RPC
+ cdef SendStatusFromServerOperation op = SendStatusFromServerOperation(
+ rpc_state.trailing_metadata,
+ rpc_state.status_code,
+ rpc_state.status_details,
+ _EMPTY_FLAGS,
+ )
+
+ cdef tuple finish_ops = (op,)
+ if not rpc_state.metadata_sent:
+ finish_ops = prepend_send_initial_metadata_op(
+ finish_ops,
+ None
+ )
+ rpc_state.metadata_sent = True
+ rpc_state.status_sent = True
+ await execute_batch(rpc_state, finish_ops, loop)
+
+
+async def _handle_unary_unary_rpc(object method_handler,
+ RPCState rpc_state,
+ object loop):
+ # Receives request message
+ cdef bytes request_raw = await _receive_message(rpc_state, loop)
+ if request_raw is None:
+ # The RPC was cancelled immediately after start on client side.
+ return
+
+ # Deserializes the request message
+ cdef object request_message = deserialize(
+ method_handler.request_deserializer,
+ request_raw,
+ )
+
+ # Creates a dedecated ServicerContext
+ cdef _ServicerContext servicer_context = _ServicerContext(
+ rpc_state,
+ None,
+ None,
+ loop,
+ )
+
+ # Finishes the application handler
+ await _finish_handler_with_unary_response(
+ rpc_state,
+ method_handler.unary_unary,
+ request_message,
+ servicer_context,
+ method_handler.response_serializer,
+ loop
+ )
+
+
+async def _handle_unary_stream_rpc(object method_handler,
+ RPCState rpc_state,
+ object loop):
+ # Receives request message
+ cdef bytes request_raw = await _receive_message(rpc_state, loop)
+ if request_raw is None:
+ return
+
+ # Deserializes the request message
+ cdef object request_message = deserialize(
+ method_handler.request_deserializer,
+ request_raw,
+ )
+
+ # Creates a dedecated ServicerContext
+ cdef _ServicerContext servicer_context = _ServicerContext(
+ rpc_state,
+ method_handler.request_deserializer,
+ method_handler.response_serializer,
+ loop,
+ )
+
+ # Finishes the application handler
+ await _finish_handler_with_stream_responses(
+ rpc_state,
+ method_handler.unary_stream,
+ request_message,
+ servicer_context,
+ loop,
+ )
+
+
+cdef class _MessageReceiver:
+ """Bridge between the async generator API and the reader-writer API."""
+
+ def __cinit__(self, _ServicerContext servicer_context):
+ self._servicer_context = servicer_context
+ self._agen = None
+
+ async def _async_message_receiver(self):
+ """An async generator that receives messages."""
+ cdef object message
+ while True:
+ message = await self._servicer_context.read()
+ if message is not EOF:
+ yield message
+ else:
+ break
+
+ def __aiter__(self):
+ # Prevents never awaited warning if application never used the async generator
+ if self._agen is None:
+ self._agen = self._async_message_receiver()
+ return self._agen
+
+ async def __anext__(self):
+ return await self.__aiter__().__anext__()
+
+
+async def _handle_stream_unary_rpc(object method_handler,
+ RPCState rpc_state,
+ object loop):
+ # Creates a dedecated ServicerContext
+ cdef _ServicerContext servicer_context = _ServicerContext(
+ rpc_state,
+ method_handler.request_deserializer,
+ None,
+ loop,
+ )
+
+ # Prepares the request generator
+ cdef object request_iterator
+ if _is_async_handler(method_handler.stream_unary):
+ request_iterator = _MessageReceiver(servicer_context)
+ else:
+ request_iterator = async_generator_to_generator(
+ _MessageReceiver(servicer_context),
+ loop
+ )
+
+ # Finishes the application handler
+ await _finish_handler_with_unary_response(
+ rpc_state,
+ method_handler.stream_unary,
+ request_iterator,
+ servicer_context,
+ method_handler.response_serializer,
+ loop
+ )
+
+
+async def _handle_stream_stream_rpc(object method_handler,
+ RPCState rpc_state,
+ object loop):
+ # Creates a dedecated ServicerContext
+ cdef _ServicerContext servicer_context = _ServicerContext(
+ rpc_state,
+ method_handler.request_deserializer,
+ method_handler.response_serializer,
+ loop,
+ )
+
+ # Prepares the request generator
+ cdef object request_iterator
+ if _is_async_handler(method_handler.stream_stream):
+ request_iterator = _MessageReceiver(servicer_context)
+ else:
+ request_iterator = async_generator_to_generator(
+ _MessageReceiver(servicer_context),
+ loop
+ )
+
+ # Finishes the application handler
+ await _finish_handler_with_stream_responses(
+ rpc_state,
+ method_handler.stream_stream,
+ request_iterator,
+ servicer_context,
+ loop,
+ )
+
+
+async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop):
+ try:
+ try:
+ await rpc_coro
+ except AbortError as e:
+ # Caught AbortError check if it is the same one
+ assert rpc_state.abort_exception is e, 'Abort error has been replaced!'
+ return
+ else:
+ # Check if the abort exception got suppressed
+ if rpc_state.abort_exception is not None:
+ _LOGGER.error(
+ 'Abort error unexpectedly suppressed: %s',
+ traceback.format_exception(rpc_state.abort_exception)
+ )
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except asyncio.CancelledError:
+ _LOGGER.debug('RPC cancelled for servicer method [%s]', _decode(rpc_state.method()))
+ except _ServerStoppedError:
+ _LOGGER.warning('Aborting method [%s] due to server stop.', _decode(rpc_state.method()))
+ except ExecuteBatchError:
+ # If client closed (aka. cancelled), ignore the failed batch operations.
+ if rpc_state.client_closed:
+ return
+ else:
+ raise
+ except Exception as e:
+ _LOGGER.exception('Unexpected [%s] raised by servicer method [%s]' % (
+ type(e).__name__,
+ _decode(rpc_state.method()),
+ ))
+ if not rpc_state.status_sent and rpc_state.server._status != AIO_SERVER_STATUS_STOPPED:
+ # Allows users to raise other types of exception with specified status code
+ if rpc_state.status_code == StatusCode.ok:
+ status_code = StatusCode.unknown
+ else:
+ status_code = rpc_state.status_code
+
+ rpc_state.status_sent = True
+ try:
+ await _send_error_status_from_server(
+ rpc_state,
+ status_code,
+ 'Unexpected %s: %s' % (type(e), e),
+ rpc_state.trailing_metadata,
+ rpc_state.create_send_initial_metadata_op_if_not_sent(),
+ loop
+ )
+ except ExecuteBatchError:
+ _LOGGER.exception('Failed sending error status from server')
+ traceback.print_exc()
+
+
+cdef _add_callback_handler(object rpc_task, RPCState rpc_state):
+
+ def handle_callbacks(object unused_task):
+ try:
+ for callback in rpc_state.callbacks:
+ # The _ServicerContext object is bound in add_done_callback.
+ callback()
+ except:
+ _LOGGER.exception('Error in callback for method [%s]', _decode(rpc_state.method()))
+
+ rpc_task.add_done_callback(handle_callbacks)
+
+
+async def _handle_cancellation_from_core(object rpc_task,
+ RPCState rpc_state,
+ object loop):
+ cdef ReceiveCloseOnServerOperation op = ReceiveCloseOnServerOperation(_EMPTY_FLAG)
+ cdef tuple ops = (op,)
+
+ # Awaits cancellation from peer.
+ await execute_batch(rpc_state, ops, loop)
+ rpc_state.client_closed = True
+ # If 1) received cancel signal; 2) the Task is not finished; 3) the server
+ # wasn't replying final status. For condition 3, it might cause inaccurate
+ # log that an RPC is both aborted and cancelled.
+ if op.cancelled() and not rpc_task.done() and not rpc_state.status_sent:
+ # Injects `CancelledError` to halt the RPC coroutine
+ rpc_task.cancel()
+
+
+async def _schedule_rpc_coro(object rpc_coro,
+ RPCState rpc_state,
+ object loop):
+ # Schedules the RPC coroutine.
+ cdef object rpc_task = loop.create_task(_handle_exceptions(
+ rpc_state,
+ rpc_coro,
+ loop,
+ ))
+ _add_callback_handler(rpc_task, rpc_state)
+ await _handle_cancellation_from_core(rpc_task, rpc_state, loop)
+
+
+async def _handle_rpc(list generic_handlers, tuple interceptors,
+ RPCState rpc_state, object loop):
+ cdef object method_handler
+ # Finds the method handler (application logic)
+ method_handler = await _find_method_handler(
+ rpc_state.method().decode(),
+ rpc_state.invocation_metadata(),
+ generic_handlers,
+ interceptors,
+ )
+ if method_handler is None:
+ rpc_state.status_sent = True
+ await _send_error_status_from_server(
+ rpc_state,
+ StatusCode.unimplemented,
+ 'Method not found!',
+ _IMMUTABLE_EMPTY_METADATA,
+ rpc_state.create_send_initial_metadata_op_if_not_sent(),
+ loop
+ )
+ return
+
+ # Handles unary-unary case
+ if not method_handler.request_streaming and not method_handler.response_streaming:
+ await _handle_unary_unary_rpc(method_handler,
+ rpc_state,
+ loop)
+ return
+
+ # Handles unary-stream case
+ if not method_handler.request_streaming and method_handler.response_streaming:
+ await _handle_unary_stream_rpc(method_handler,
+ rpc_state,
+ loop)
+ return
+
+ # Handles stream-unary case
+ if method_handler.request_streaming and not method_handler.response_streaming:
+ await _handle_stream_unary_rpc(method_handler,
+ rpc_state,
+ loop)
+ return
+
+ # Handles stream-stream case
+ if method_handler.request_streaming and method_handler.response_streaming:
+ await _handle_stream_stream_rpc(method_handler,
+ rpc_state,
+ loop)
+ return
+
+
+class _RequestCallError(Exception): pass
+
+cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandler(
+ 'grpc_server_request_call', None, _RequestCallError)
+
+
+cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler(
+ 'grpc_server_shutdown_and_notify',
+ None,
+ InternalError)
+
+
+cdef class _ConcurrentRpcLimiter:
+
+ def __cinit__(self, int maximum_concurrent_rpcs, object loop):
+ if maximum_concurrent_rpcs <= 0:
+ raise ValueError("maximum_concurrent_rpcs should be a postive integer")
+ self._maximum_concurrent_rpcs = maximum_concurrent_rpcs
+ self._active_rpcs = 0
+ self._active_rpcs_condition = asyncio.Condition()
+ self._loop = loop
+
+ async def check_before_request_call(self):
+ await self._active_rpcs_condition.acquire()
+ try:
+ predicate = lambda: self._active_rpcs < self._maximum_concurrent_rpcs
+ await self._active_rpcs_condition.wait_for(predicate)
+ self._active_rpcs += 1
+ finally:
+ self._active_rpcs_condition.release()
+
+ async def _decrease_active_rpcs_count_with_lock(self):
+ await self._active_rpcs_condition.acquire()
+ try:
+ self._active_rpcs -= 1
+ self._active_rpcs_condition.notify()
+ finally:
+ self._active_rpcs_condition.release()
+
+ def _decrease_active_rpcs_count(self, unused_future):
+ self._loop.create_task(self._decrease_active_rpcs_count_with_lock())
+
+ def decrease_once_finished(self, object rpc_task):
+ rpc_task.add_done_callback(self._decrease_active_rpcs_count)
+
+
+cdef class AioServer:
+
+ def __init__(self, loop, thread_pool, generic_handlers, interceptors,
+ options, maximum_concurrent_rpcs):
+ init_grpc_aio()
+ # NOTE(lidiz) Core objects won't be deallocated automatically.
+ # If AioServer.shutdown is not called, those objects will leak.
+ # TODO(rbellevi): Support xDS in aio server.
+ self._server = Server(options, False)
+ grpc_server_register_completion_queue(
+ self._server.c_server,
+ global_completion_queue(),
+ NULL
+ )
+
+ self._loop = loop
+ self._status = AIO_SERVER_STATUS_READY
+ self._generic_handlers = []
+ self.add_generic_rpc_handlers(generic_handlers)
+ self._serving_task = None
+
+ self._shutdown_lock = asyncio.Lock()
+ self._shutdown_completed = self._loop.create_future()
+ self._shutdown_callback_wrapper = CallbackWrapper(
+ self._shutdown_completed,
+ self._loop,
+ SERVER_SHUTDOWN_FAILURE_HANDLER)
+ self._crash_exception = None
+
+ if interceptors:
+ self._interceptors = tuple(interceptors)
+ else:
+ self._interceptors = ()
+
+ self._thread_pool = thread_pool
+ if maximum_concurrent_rpcs is not None:
+ self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs,
+ loop)
+
+ def add_generic_rpc_handlers(self, object generic_rpc_handlers):
+ self._generic_handlers.extend(generic_rpc_handlers)
+
+ def add_insecure_port(self, address):
+ return self._server.add_http2_port(address)
+
+ def add_secure_port(self, address, server_credentials):
+ return self._server.add_http2_port(address,
+ server_credentials._credentials)
+
+ async def _request_call(self):
+ cdef grpc_call_error error
+ cdef RPCState rpc_state = RPCState(self)
+ cdef object future = self._loop.create_future()
+ cdef CallbackWrapper wrapper = CallbackWrapper(
+ future,
+ self._loop,
+ REQUEST_CALL_FAILURE_HANDLER)
+ error = grpc_server_request_call(
+ self._server.c_server, &rpc_state.call, &rpc_state.details,
+ &rpc_state.request_metadata,
+ global_completion_queue(), global_completion_queue(),
+ wrapper.c_functor()
+ )
+ if error != GRPC_CALL_OK:
+ raise InternalError("Error in grpc_server_request_call: %s" % error)
+
+ await future
+ return rpc_state
+
+ async def _server_main_loop(self,
+ object server_started):
+ self._server.start(backup_queue=False)
+ cdef RPCState rpc_state
+ server_started.set_result(True)
+
+ while True:
+ # When shutdown begins, no more new connections.
+ if self._status != AIO_SERVER_STATUS_RUNNING:
+ break
+
+ if self._limiter is not None:
+ await self._limiter.check_before_request_call()
+
+ # Accepts new request from Core
+ rpc_state = await self._request_call()
+
+ # Creates the dedicated RPC coroutine. If we schedule it right now,
+ # there is no guarantee if the cancellation listening coroutine is
+ # ready or not. So, we should control the ordering by scheduling
+ # the coroutine onto event loop inside of the cancellation
+ # coroutine.
+ rpc_coro = _handle_rpc(self._generic_handlers,
+ self._interceptors,
+ rpc_state,
+ self._loop)
+
+ # Fires off a task that listens on the cancellation from client.
+ rpc_task = self._loop.create_task(
+ _schedule_rpc_coro(
+ rpc_coro,
+ rpc_state,
+ self._loop
+ )
+ )
+
+ if self._limiter is not None:
+ self._limiter.decrease_once_finished(rpc_task)
+
+ def _serving_task_crash_handler(self, object task):
+ """Shutdown the server immediately if unexpectedly exited."""
+ if task.cancelled():
+ return
+ if task.exception() is None:
+ return
+ if self._status != AIO_SERVER_STATUS_STOPPING:
+ self._crash_exception = task.exception()
+ _LOGGER.exception(self._crash_exception)
+ self._loop.create_task(self.shutdown(None))
+
+ async def start(self):
+ if self._status == AIO_SERVER_STATUS_RUNNING:
+ return
+ elif self._status != AIO_SERVER_STATUS_READY:
+ raise UsageError('Server not in ready state')
+
+ self._status = AIO_SERVER_STATUS_RUNNING
+ cdef object server_started = self._loop.create_future()
+ self._serving_task = self._loop.create_task(self._server_main_loop(server_started))
+ self._serving_task.add_done_callback(self._serving_task_crash_handler)
+ # Needs to explicitly wait for the server to start up.
+ # Otherwise, the actual start time of the server is un-controllable.
+ await server_started
+
+ async def _start_shutting_down(self):
+ """Prepares the server to shutting down.
+
+ This coroutine function is NOT coroutine-safe.
+ """
+ # The shutdown callback won't be called until there is no live RPC.
+ grpc_server_shutdown_and_notify(
+ self._server.c_server,
+ global_completion_queue(),
+ self._shutdown_callback_wrapper.c_functor())
+
+ # Ensures the serving task (coroutine) exits.
+ try:
+ await self._serving_task
+ except _RequestCallError:
+ pass
+
+ async def shutdown(self, grace):
+ """Gracefully shutdown the Core server.
+
+ Application should only call shutdown once.
+
+ Args:
+ grace: An optional float indicating the length of grace period in
+ seconds.
+ """
+ if self._status == AIO_SERVER_STATUS_READY or self._status == AIO_SERVER_STATUS_STOPPED:
+ return
+
+ async with self._shutdown_lock:
+ if self._status == AIO_SERVER_STATUS_RUNNING:
+ self._server.is_shutting_down = True
+ self._status = AIO_SERVER_STATUS_STOPPING
+ await self._start_shutting_down()
+
+ if grace is None:
+ # Directly cancels all calls
+ grpc_server_cancel_all_calls(self._server.c_server)
+ await self._shutdown_completed
+ else:
+ try:
+ await asyncio.wait_for(
+ asyncio.shield(self._shutdown_completed),
+ grace,
+ )
+ except asyncio.TimeoutError:
+ # Cancels all ongoing calls by the end of grace period.
+ grpc_server_cancel_all_calls(self._server.c_server)
+ await self._shutdown_completed
+
+ async with self._shutdown_lock:
+ if self._status == AIO_SERVER_STATUS_STOPPING:
+ grpc_server_destroy(self._server.c_server)
+ self._server.c_server = NULL
+ self._server.is_shutdown = True
+ self._status = AIO_SERVER_STATUS_STOPPED
+
+ async def wait_for_termination(self, object timeout):
+ if timeout is None:
+ await self._shutdown_completed
+ else:
+ try:
+ await asyncio.wait_for(
+ asyncio.shield(self._shutdown_completed),
+ timeout,
+ )
+ except asyncio.TimeoutError:
+ if self._crash_exception is not None:
+ raise self._crash_exception
+ return True
+ if self._crash_exception is not None:
+ raise self._crash_exception
+ return False
+
+ def __dealloc__(self):
+ """Deallocation of Core objects are ensured by Python layer."""
+ # TODO(lidiz) if users create server, and then dealloc it immediately.
+ # There is a potential memory leak of created Core server.
+ if self._status != AIO_SERVER_STATUS_STOPPED:
+ _LOGGER.debug(
+ '__dealloc__ called on running server %s with status %d',
+ self,
+ self._status
+ )
+ shutdown_grpc_aio()
+
+ cdef thread_pool(self):
+ """Access the thread pool instance."""
+ return self._thread_pool
+
+ def is_running(self):
+ return self._status == AIO_SERVER_STATUS_RUNNING
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pxd.pxi
new file mode 100644
index 00000000000..251efe15b39
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pxd.pxi
@@ -0,0 +1,36 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef tuple _wrap_grpc_arg(grpc_arg arg)
+
+
+cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg)
+
+
+cdef class _ChannelArg:
+
+ cdef grpc_arg c_argument
+
+ cdef void c(self, argument, references) except *
+
+
+cdef class _ChannelArgs:
+
+ cdef readonly tuple _arguments
+ cdef list _channel_args
+ cdef readonly list _references
+ cdef grpc_channel_args _c_arguments
+
+ cdef grpc_channel_args *c_args(self) except *
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pyx.pxi
new file mode 100644
index 00000000000..9df308cdbcd
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/arguments.pyx.pxi
@@ -0,0 +1,85 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _GrpcArgWrapper:
+
+ cdef grpc_arg arg
+
+
+cdef tuple _wrap_grpc_arg(grpc_arg arg):
+ wrapped = _GrpcArgWrapper()
+ wrapped.arg = arg
+ return ("grpc.python._cygrpc._GrpcArgWrapper", wrapped)
+
+
+cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg):
+ cdef _GrpcArgWrapper wrapped = wrapped_arg[1]
+ return wrapped.arg
+
+
+cdef class _ChannelArg:
+
+ cdef void c(self, argument, references) except *:
+ key, value = argument
+ cdef bytes encoded_key = _encode(key)
+ if encoded_key is not key:
+ references.append(encoded_key)
+ self.c_argument.key = encoded_key
+ if isinstance(value, int):
+ self.c_argument.type = GRPC_ARG_INTEGER
+ self.c_argument.value.integer = value
+ elif isinstance(value, (bytes, str, unicode,)):
+ self.c_argument.type = GRPC_ARG_STRING
+ encoded_value = _encode(value)
+ if encoded_value is not value:
+ references.append(encoded_value)
+ self.c_argument.value.string = encoded_value
+ elif isinstance(value, _GrpcArgWrapper):
+ self.c_argument = (<_GrpcArgWrapper>value).arg
+ elif hasattr(value, '__int__'):
+ # Pointer objects must override __int__() to return
+ # the underlying C address (Python ints are word size). The
+ # lifecycle of the pointer is fixed to the lifecycle of the
+ # python object wrapping it.
+ self.c_argument.type = GRPC_ARG_POINTER
+ self.c_argument.value.pointer.vtable = &default_vtable
+ self.c_argument.value.pointer.address = <void*>(<intptr_t>int(value))
+ else:
+ raise TypeError(
+ 'Expected int, bytes, or behavior, got {}'.format(type(value)))
+
+
+cdef class _ChannelArgs:
+
+ def __cinit__(self, arguments):
+ self._arguments = () if arguments is None else tuple(arguments)
+ self._channel_args = []
+ self._references = []
+ self._c_arguments.arguments_length = len(self._arguments)
+ if self._c_arguments.arguments_length != 0:
+ self._c_arguments.arguments = <grpc_arg *>gpr_malloc(
+ self._c_arguments.arguments_length * sizeof(grpc_arg))
+ for index, argument in enumerate(self._arguments):
+ channel_arg = _ChannelArg()
+ channel_arg.c(argument, self._references)
+ self._c_arguments.arguments[index] = channel_arg.c_argument
+ self._channel_args.append(channel_arg)
+
+ cdef grpc_channel_args *c_args(self) except *:
+ return &self._c_arguments
+
+ def __dealloc__(self):
+ if self._c_arguments.arguments != NULL:
+ gpr_free(self._c_arguments.arguments)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pxd.pxi
new file mode 100644
index 00000000000..8babeb45361
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pxd.pxi
@@ -0,0 +1,20 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Call:
+
+ cdef grpc_call *c_call
+ cdef list references
+
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pyx.pxi
new file mode 100644
index 00000000000..f68e166b17a
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -0,0 +1,97 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Call:
+
+ def __cinit__(self):
+ # Create an *empty* call
+ fork_handlers_and_grpc_init()
+ self.c_call = NULL
+ self.references = []
+
+ def _start_batch(self, operations, tag, retain_self):
+ if not self.is_valid:
+ raise ValueError("invalid call object cannot be used from Python")
+ cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(
+ tag, operations, self if retain_self else None)
+ batch_operation_tag.prepare()
+ cpython.Py_INCREF(batch_operation_tag)
+ cdef grpc_call_error error
+ with nogil:
+ error = grpc_call_start_batch(
+ self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops,
+ <cpython.PyObject *>batch_operation_tag, NULL)
+ return error
+
+ def start_client_batch(self, operations, tag):
+ # We don't reference this call in the operations tag because
+ # it should be cancelled when it goes out of scope
+ return self._start_batch(operations, tag, False)
+
+ def start_server_batch(self, operations, tag):
+ return self._start_batch(operations, tag, True)
+
+ def cancel(
+ self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE,
+ details=None):
+ details = str_to_bytes(details)
+ if not self.is_valid:
+ raise ValueError("invalid call object cannot be used from Python")
+ if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE):
+ raise ValueError("if error_code is specified, so must details "
+ "(and vice-versa)")
+ cdef grpc_call_error result
+ cdef char *c_details = NULL
+ if error_code != GRPC_STATUS__DO_NOT_USE:
+ self.references.append(details)
+ c_details = details
+ with nogil:
+ result = grpc_call_cancel_with_status(
+ self.c_call, error_code, c_details, NULL)
+ return result
+ else:
+ with nogil:
+ result = grpc_call_cancel(self.c_call, NULL)
+ return result
+
+ def set_credentials(self, CallCredentials call_credentials not None):
+ cdef grpc_call_credentials *c_call_credentials = call_credentials.c()
+ cdef grpc_call_error call_error = grpc_call_set_credentials(
+ self.c_call, c_call_credentials)
+ grpc_call_credentials_release(c_call_credentials)
+ return call_error
+
+ def peer(self):
+ cdef char *peer = NULL
+ with nogil:
+ peer = grpc_call_get_peer(self.c_call)
+ result = <bytes>peer
+ with nogil:
+ gpr_free(peer)
+ return result
+
+ def __dealloc__(self):
+ with nogil:
+ if self.c_call != NULL:
+ grpc_call_unref(self.c_call)
+ grpc_shutdown()
+
+ # The object *should* always be valid from Python. Used for debugging.
+ @property
+ def is_valid(self):
+ return self.c_call != NULL
+
+ def _custom_op_on_c_call(self, int op):
+ return _custom_op_on_c_call(op, self.c_call)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi
new file mode 100644
index 00000000000..eb27f2df7ad
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pxd.pxi
@@ -0,0 +1,74 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef _check_call_error_no_metadata(c_call_error)
+
+
+cdef _check_and_raise_call_error_no_metadata(c_call_error)
+
+
+cdef _check_call_error(c_call_error, metadata)
+
+
+cdef class _CallState:
+
+ cdef grpc_call *c_call
+ cdef set due
+
+
+cdef class _ChannelState:
+
+ cdef object condition
+ cdef grpc_channel *c_channel
+ # A boolean field indicating that the channel is open (if True) or is being
+ # closed (i.e. a call to close is currently executing) or is closed (if
+ # False).
+ # TODO(https://github.com/grpc/grpc/issues/3064): Eliminate "is being closed"
+ # a state in which condition may be acquired by any thread, eliminate this
+ # field and just use the NULLness of c_channel as an indication that the
+ # channel is closed.
+ cdef object open
+ cdef object closed_reason
+
+ # A dict from _BatchOperationTag to _CallState
+ cdef dict integrated_call_states
+ cdef grpc_completion_queue *c_call_completion_queue
+
+ # A set of _CallState
+ cdef set segregated_call_states
+
+ cdef set connectivity_due
+ cdef grpc_completion_queue *c_connectivity_completion_queue
+
+
+cdef class IntegratedCall:
+
+ cdef _ChannelState _channel_state
+ cdef _CallState _call_state
+
+
+cdef class SegregatedCall:
+
+ cdef _ChannelState _channel_state
+ cdef _CallState _call_state
+ cdef grpc_completion_queue *_c_completion_queue
+
+
+cdef class Channel:
+
+ cdef _ChannelState _state
+
+ # TODO(https://github.com/grpc/grpc/issues/15662): Eliminate this.
+ cdef tuple _arguments
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi
new file mode 100644
index 00000000000..d49a4210f7c
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -0,0 +1,516 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
+ 'Internal gRPC call error %d. ' +
+ 'Please report to https://github.com/grpc/grpc/issues')
+
+
+cdef str _call_error_metadata(metadata):
+ return 'metadata was invalid: %s' % metadata
+
+
+cdef str _call_error_no_metadata(c_call_error):
+ return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
+
+
+cdef str _call_error(c_call_error, metadata):
+ if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
+ return _call_error_metadata(metadata)
+ else:
+ return _call_error_no_metadata(c_call_error)
+
+
+cdef _check_call_error_no_metadata(c_call_error):
+ if c_call_error != GRPC_CALL_OK:
+ return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
+ else:
+ return None
+
+
+cdef _check_and_raise_call_error_no_metadata(c_call_error):
+ error = _check_call_error_no_metadata(c_call_error)
+ if error is not None:
+ raise ValueError(error)
+
+
+cdef _check_call_error(c_call_error, metadata):
+ if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
+ return _call_error_metadata(metadata)
+ else:
+ return _check_call_error_no_metadata(c_call_error)
+
+
+cdef void _raise_call_error_no_metadata(c_call_error) except *:
+ raise ValueError(_call_error_no_metadata(c_call_error))
+
+
+cdef void _raise_call_error(c_call_error, metadata) except *:
+ raise ValueError(_call_error(c_call_error, metadata))
+
+
+cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue):
+ grpc_completion_queue_shutdown(c_completion_queue)
+ grpc_completion_queue_destroy(c_completion_queue)
+
+
+cdef class _CallState:
+
+ def __cinit__(self):
+ self.due = set()
+
+
+cdef class _ChannelState:
+
+ def __cinit__(self):
+ self.condition = threading.Condition()
+ self.open = True
+ self.integrated_call_states = {}
+ self.segregated_call_states = set()
+ self.connectivity_due = set()
+ self.closed_reason = None
+
+
+cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
+ cdef grpc_call_error c_call_error
+ cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None)
+ tag.prepare()
+ cpython.Py_INCREF(tag)
+ with nogil:
+ c_call_error = grpc_call_start_batch(
+ c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL)
+ return c_call_error, tag
+
+
+cdef object _operate_from_integrated_call(
+ _ChannelState channel_state, _CallState call_state, object operations,
+ object user_tag):
+ cdef grpc_call_error c_call_error
+ cdef _BatchOperationTag tag
+ with channel_state.condition:
+ if call_state.due:
+ c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
+ if c_call_error == GRPC_CALL_OK:
+ call_state.due.add(tag)
+ channel_state.integrated_call_states[tag] = call_state
+ return True
+ else:
+ _raise_call_error_no_metadata(c_call_error)
+ else:
+ return False
+
+
+cdef object _operate_from_segregated_call(
+ _ChannelState channel_state, _CallState call_state, object operations,
+ object user_tag):
+ cdef grpc_call_error c_call_error
+ cdef _BatchOperationTag tag
+ with channel_state.condition:
+ if call_state.due:
+ c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
+ if c_call_error == GRPC_CALL_OK:
+ call_state.due.add(tag)
+ return True
+ else:
+ _raise_call_error_no_metadata(c_call_error)
+ else:
+ return False
+
+
+cdef _cancel(
+ _ChannelState channel_state, _CallState call_state, grpc_status_code code,
+ str details):
+ cdef grpc_call_error c_call_error
+ with channel_state.condition:
+ if call_state.due:
+ c_call_error = grpc_call_cancel_with_status(
+ call_state.c_call, code, _encode(details), NULL)
+ _check_and_raise_call_error_no_metadata(c_call_error)
+
+
+cdef _next_call_event(
+ _ChannelState channel_state, grpc_completion_queue *c_completion_queue,
+ on_success, on_failure, deadline):
+ """Block on the next event out of the completion queue.
+
+ On success, `on_success` will be invoked with the tag taken from the CQ.
+ In the case of a failure due to an exception raised in a signal handler,
+ `on_failure` will be invoked with no arguments. Note that this situation
+ can only occur on the main thread.
+
+ Args:
+ channel_state: The state for the channel on which the RPC is running.
+ c_completion_queue: The CQ which will be polled.
+ on_success: A callable object to be invoked upon successful receipt of a
+ tag from the CQ.
+ on_failure: A callable object to be invoked in case a Python exception is
+ raised from a signal handler during polling.
+ deadline: The point after which the RPC will time out.
+ """
+ try:
+ tag, event = _latent_event(c_completion_queue, deadline)
+ # NOTE(rbellevi): This broad except enables us to clean up resources before
+ # propagating any exceptions raised by signal handlers to the application.
+ except:
+ if on_failure is not None:
+ on_failure()
+ raise
+ else:
+ with channel_state.condition:
+ on_success(tag)
+ channel_state.condition.notify_all()
+ return event
+
+
+# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler.
+cdef void _call(
+ _ChannelState channel_state, _CallState call_state,
+ grpc_completion_queue *c_completion_queue, on_success, int flags, method,
+ host, object deadline, CallCredentials credentials,
+ object operationses_and_user_tags, object metadata,
+ object context) except *:
+ """Invokes an RPC.
+
+ Args:
+ channel_state: A _ChannelState with its "open" attribute set to True. RPCs
+ may not be invoked on a closed channel.
+ call_state: An empty _CallState to be altered (specifically assigned a
+ c_call and having its due set populated) if the RPC invocation is
+ successful.
+ c_completion_queue: A grpc_completion_queue to be used for the call's
+ operations.
+ on_success: A behavior to be called if attempting to start operations for
+ the call succeeds. If called the behavior will be called while holding the
+ channel_state condition and passed the tags associated with operations
+ that were successfully started for the call.
+ flags: Flags to be passed to gRPC Core as part of call creation.
+ method: The fully-qualified name of the RPC method being invoked.
+ host: A "host" string to be passed to gRPC Core as part of call creation.
+ deadline: A float for the deadline of the RPC, or None if the RPC is to have
+ no deadline.
+ credentials: A _CallCredentials for the RPC or None.
+ operationses_and_user_tags: A sequence of length-two sequences the first
+ element of which is a sequence of Operations and the second element of
+ which is an object to be used as a tag. A SendInitialMetadataOperation
+ must be present in the first element of this value.
+ metadata: The metadata for this call.
+ context: Context object for distributed tracing.
+ """
+ cdef grpc_slice method_slice
+ cdef grpc_slice host_slice
+ cdef grpc_slice *host_slice_ptr
+ cdef grpc_call_credentials *c_call_credentials
+ cdef grpc_call_error c_call_error
+ cdef tuple error_and_wrapper_tag
+ cdef _BatchOperationTag wrapper_tag
+ with channel_state.condition:
+ if channel_state.open:
+ method_slice = _slice_from_bytes(method)
+ if host is None:
+ host_slice_ptr = NULL
+ else:
+ host_slice = _slice_from_bytes(host)
+ host_slice_ptr = &host_slice
+ call_state.c_call = grpc_channel_create_call(
+ channel_state.c_channel, NULL, flags,
+ c_completion_queue, method_slice, host_slice_ptr,
+ _timespec_from_time(deadline), NULL)
+ grpc_slice_unref(method_slice)
+ if host_slice_ptr:
+ grpc_slice_unref(host_slice)
+ if context is not None:
+ set_census_context_on_call(call_state, context)
+ if credentials is not None:
+ c_call_credentials = credentials.c()
+ c_call_error = grpc_call_set_credentials(
+ call_state.c_call, c_call_credentials)
+ grpc_call_credentials_release(c_call_credentials)
+ if c_call_error != GRPC_CALL_OK:
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+ _raise_call_error_no_metadata(c_call_error)
+ started_tags = set()
+ for operations, user_tag in operationses_and_user_tags:
+ c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
+ if c_call_error == GRPC_CALL_OK:
+ started_tags.add(tag)
+ else:
+ grpc_call_cancel(call_state.c_call, NULL)
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+ _raise_call_error(c_call_error, metadata)
+ else:
+ call_state.due.update(started_tags)
+ on_success(started_tags)
+ else:
+ raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
+
+
+cdef void _process_integrated_call_tag(
+ _ChannelState state, _BatchOperationTag tag) except *:
+ cdef _CallState call_state = state.integrated_call_states.pop(tag)
+ call_state.due.remove(tag)
+ if not call_state.due:
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+
+
+cdef class IntegratedCall:
+
+ def __cinit__(self, _ChannelState channel_state, _CallState call_state):
+ self._channel_state = channel_state
+ self._call_state = call_state
+
+ def operate(self, operations, tag):
+ return _operate_from_integrated_call(
+ self._channel_state, self._call_state, operations, tag)
+
+ def cancel(self, code, details):
+ _cancel(self._channel_state, self._call_state, code, details)
+
+
+cdef IntegratedCall _integrated_call(
+ _ChannelState state, int flags, method, host, object deadline,
+ object metadata, CallCredentials credentials, operationses_and_user_tags,
+ object context):
+ call_state = _CallState()
+
+ def on_success(started_tags):
+ for started_tag in started_tags:
+ state.integrated_call_states[started_tag] = call_state
+
+ _call(
+ state, call_state, state.c_call_completion_queue, on_success, flags,
+ method, host, deadline, credentials, operationses_and_user_tags, metadata, context)
+
+ return IntegratedCall(state, call_state)
+
+
+cdef object _process_segregated_call_tag(
+ _ChannelState state, _CallState call_state,
+ grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
+ call_state.due.remove(tag)
+ if not call_state.due:
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+ state.segregated_call_states.remove(call_state)
+ _destroy_c_completion_queue(c_completion_queue)
+ return True
+ else:
+ return False
+
+
+cdef class SegregatedCall:
+
+ def __cinit__(self, _ChannelState channel_state, _CallState call_state):
+ self._channel_state = channel_state
+ self._call_state = call_state
+
+ def operate(self, operations, tag):
+ return _operate_from_segregated_call(
+ self._channel_state, self._call_state, operations, tag)
+
+ def cancel(self, code, details):
+ _cancel(self._channel_state, self._call_state, code, details)
+
+ def next_event(self):
+ def on_success(tag):
+ _process_segregated_call_tag(
+ self._channel_state, self._call_state, self._c_completion_queue, tag)
+ def on_failure():
+ self._call_state.due.clear()
+ grpc_call_unref(self._call_state.c_call)
+ self._call_state.c_call = NULL
+ self._channel_state.segregated_call_states.remove(self._call_state)
+ _destroy_c_completion_queue(self._c_completion_queue)
+ return _next_call_event(
+ self._channel_state, self._c_completion_queue, on_success, on_failure, None)
+
+
+cdef SegregatedCall _segregated_call(
+ _ChannelState state, int flags, method, host, object deadline,
+ object metadata, CallCredentials credentials, operationses_and_user_tags,
+ object context):
+ cdef _CallState call_state = _CallState()
+ cdef SegregatedCall segregated_call
+ cdef grpc_completion_queue *c_completion_queue
+
+ def on_success(started_tags):
+ state.segregated_call_states.add(call_state)
+
+ with state.condition:
+ if state.open:
+ c_completion_queue = (grpc_completion_queue_create_for_next(NULL))
+ else:
+ raise ValueError('Cannot invoke RPC on closed channel!')
+
+ try:
+ _call(
+ state, call_state, c_completion_queue, on_success, flags, method, host,
+ deadline, credentials, operationses_and_user_tags, metadata,
+ context)
+ except:
+ _destroy_c_completion_queue(c_completion_queue)
+ raise
+
+ segregated_call = SegregatedCall(state, call_state)
+ segregated_call._c_completion_queue = c_completion_queue
+ return segregated_call
+
+
+cdef object _watch_connectivity_state(
+ _ChannelState state, grpc_connectivity_state last_observed_state,
+ object deadline):
+ cdef _ConnectivityTag tag = _ConnectivityTag(object())
+ with state.condition:
+ if state.open:
+ cpython.Py_INCREF(tag)
+ grpc_channel_watch_connectivity_state(
+ state.c_channel, last_observed_state, _timespec_from_time(deadline),
+ state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
+ state.connectivity_due.add(tag)
+ else:
+ raise ValueError('Cannot monitor channel state: %s' % state.closed_reason)
+ completed_tag, event = _latent_event(
+ state.c_connectivity_completion_queue, None)
+ with state.condition:
+ state.connectivity_due.remove(completed_tag)
+ state.condition.notify_all()
+ return event
+
+
+cdef _close(Channel channel, grpc_status_code code, object details,
+ drain_calls):
+ cdef _ChannelState state = channel._state
+ cdef _CallState call_state
+ encoded_details = _encode(details)
+ with state.condition:
+ if state.open:
+ state.open = False
+ state.closed_reason = details
+ for call_state in set(state.integrated_call_states.values()):
+ grpc_call_cancel_with_status(
+ call_state.c_call, code, encoded_details, NULL)
+ for call_state in state.segregated_call_states:
+ grpc_call_cancel_with_status(
+ call_state.c_call, code, encoded_details, NULL)
+ # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
+ # watching.
+
+ if drain_calls:
+ while not _calls_drained(state):
+ event = channel.next_call_event()
+ if event.completion_type == CompletionType.queue_timeout:
+ continue
+ event.tag(event)
+ else:
+ while state.integrated_call_states:
+ state.condition.wait()
+ while state.connectivity_due:
+ state.condition.wait()
+
+ _destroy_c_completion_queue(state.c_call_completion_queue)
+ _destroy_c_completion_queue(state.c_connectivity_completion_queue)
+ grpc_channel_destroy(state.c_channel)
+ state.c_channel = NULL
+ grpc_shutdown()
+ state.condition.notify_all()
+ else:
+ # Another call to close already completed in the past or is currently
+ # being executed in another thread.
+ while state.c_channel != NULL:
+ state.condition.wait()
+
+
+cdef _calls_drained(_ChannelState state):
+ return not (state.integrated_call_states or state.segregated_call_states or
+ state.connectivity_due)
+
+cdef class Channel:
+
+ def __cinit__(
+ self, bytes target, object arguments,
+ ChannelCredentials channel_credentials):
+ arguments = () if arguments is None else tuple(arguments)
+ fork_handlers_and_grpc_init()
+ self._state = _ChannelState()
+ self._state.c_call_completion_queue = (
+ grpc_completion_queue_create_for_next(NULL))
+ self._state.c_connectivity_completion_queue = (
+ grpc_completion_queue_create_for_next(NULL))
+ self._arguments = arguments
+ cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
+ c_channel_credentials = (
+ channel_credentials.c() if channel_credentials is not None
+ else grpc_insecure_credentials_create())
+ self._state.c_channel = grpc_channel_create(
+ <char *>target, c_channel_credentials, channel_args.c_args())
+ grpc_channel_credentials_release(c_channel_credentials)
+
+ def target(self):
+ cdef char *c_target
+ with self._state.condition:
+ c_target = grpc_channel_get_target(self._state.c_channel)
+ target = <bytes>c_target
+ gpr_free(c_target)
+ return target
+
+ def integrated_call(
+ self, int flags, method, host, object deadline, object metadata,
+ CallCredentials credentials, operationses_and_tags,
+ object context = None):
+ return _integrated_call(
+ self._state, flags, method, host, deadline, metadata, credentials,
+ operationses_and_tags, context)
+
+ def next_call_event(self):
+ def on_success(tag):
+ if tag is not None:
+ _process_integrated_call_tag(self._state, tag)
+ if is_fork_support_enabled():
+ queue_deadline = time.time() + 1.0
+ else:
+ queue_deadline = None
+ # NOTE(gnossen): It is acceptable for on_failure to be None here because
+ # failure conditions can only ever happen on the main thread and this
+ # method is only ever invoked on the channel spin thread.
+ return _next_call_event(self._state, self._state.c_call_completion_queue,
+ on_success, None, queue_deadline)
+
+ def segregated_call(
+ self, int flags, method, host, object deadline, object metadata,
+ CallCredentials credentials, operationses_and_tags,
+ object context = None):
+ return _segregated_call(
+ self._state, flags, method, host, deadline, metadata, credentials,
+ operationses_and_tags, context)
+
+ def check_connectivity_state(self, bint try_to_connect):
+ with self._state.condition:
+ if self._state.open:
+ return grpc_channel_check_connectivity_state(
+ self._state.c_channel, try_to_connect)
+ else:
+ raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason)
+
+ def watch_connectivity_state(
+ self, grpc_connectivity_state last_observed_state, object deadline):
+ return _watch_connectivity_state(self._state, last_observed_state, deadline)
+
+ def close(self, code, details):
+ _close(self, code, details, False)
+
+ def close_on_fork(self, code, details):
+ _close(self, code, details, True)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channelz.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channelz.pyx.pxi
new file mode 100644
index 00000000000..36c8cd121c7
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/channelz.pyx.pxi
@@ -0,0 +1,71 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def channelz_get_top_channels(start_channel_id):
+ cdef char *c_returned_str = grpc_channelz_get_top_channels(
+ start_channel_id,
+ )
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get top channels, please ensure your' \
+ ' start_channel_id==%s is valid' % start_channel_id)
+ return c_returned_str
+
+def channelz_get_servers(start_server_id):
+ cdef char *c_returned_str = grpc_channelz_get_servers(start_server_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get servers, please ensure your' \
+ ' start_server_id==%s is valid' % start_server_id)
+ return c_returned_str
+
+def channelz_get_server(server_id):
+ cdef char *c_returned_str = grpc_channelz_get_server(server_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get the server, please ensure your' \
+ ' server_id==%s is valid' % server_id)
+ return c_returned_str
+
+def channelz_get_server_sockets(server_id, start_socket_id, max_results):
+ cdef char *c_returned_str = grpc_channelz_get_server_sockets(
+ server_id,
+ start_socket_id,
+ max_results,
+ )
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get server sockets, please ensure your' \
+ ' server_id==%s and start_socket_id==%s and' \
+ ' max_results==%s is valid' %
+ (server_id, start_socket_id, max_results))
+ return c_returned_str
+
+def channelz_get_channel(channel_id):
+ cdef char *c_returned_str = grpc_channelz_get_channel(channel_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get the channel, please ensure your' \
+ ' channel_id==%s is valid' % (channel_id))
+ return c_returned_str
+
+def channelz_get_subchannel(subchannel_id):
+ cdef char *c_returned_str = grpc_channelz_get_subchannel(subchannel_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get the subchannel, please ensure your' \
+ ' subchannel_id==%s is valid' % (subchannel_id))
+ return c_returned_str
+
+def channelz_get_socket(socket_id):
+ cdef char *c_returned_str = grpc_channelz_get_socket(socket_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get the socket, please ensure your' \
+ ' socket_id==%s is valid' % (socket_id))
+ return c_returned_str
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
new file mode 100644
index 00000000000..ec13e60f9d3
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
@@ -0,0 +1,32 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef int g_interrupt_check_period_ms
+
+cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *
+
+
+cdef _interpret_event(grpc_event c_event)
+
+cdef class _LatentEventArg:
+ cdef grpc_completion_queue *c_completion_queue
+ cdef object deadline
+
+cdef class CompletionQueue:
+
+ cdef grpc_completion_queue *c_completion_queue
+ cdef bint is_shutting_down
+ cdef bint is_shutdown
+
+ cdef _interpret_event(self, grpc_event c_event)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
new file mode 100644
index 00000000000..2e4e0107739
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -0,0 +1,139 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+g_interrupt_check_period_ms = 200
+
+cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *:
+ global g_interrupt_check_period_ms
+ cdef gpr_timespec c_increment
+ cdef gpr_timespec c_timeout
+ cdef gpr_timespec c_deadline
+ c_increment = gpr_time_from_millis(g_interrupt_check_period_ms, GPR_TIMESPAN)
+ if deadline is None:
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
+ else:
+ c_deadline = _timespec_from_time(deadline)
+
+ while True:
+ with nogil:
+ c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
+ if gpr_time_cmp(c_timeout, c_deadline) > 0:
+ c_timeout = c_deadline
+
+ c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
+
+ if (c_event.type != GRPC_QUEUE_TIMEOUT or
+ gpr_time_cmp(c_timeout, c_deadline) == 0):
+ break
+
+ # Handle any signals
+ cpython.PyErr_CheckSignals()
+ return c_event
+
+cdef _interpret_event(grpc_event c_event):
+ cdef _Tag tag
+ if c_event.type == GRPC_QUEUE_TIMEOUT:
+ # TODO(ericgribkoff) Do not coopt ConnectivityEvent here.
+ return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
+ elif c_event.type == GRPC_QUEUE_SHUTDOWN:
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None)
+ else:
+ tag = <_Tag>c_event.tag
+ # We receive event tags only after they've been inc-ref'd elsewhere in
+ # the code.
+ cpython.Py_DECREF(tag)
+ return tag, tag.event(c_event)
+
+cdef _internal_latent_event(_LatentEventArg latent_event_arg):
+ cdef grpc_event c_event = _next(latent_event_arg.c_completion_queue, latent_event_arg.deadline)
+ return _interpret_event(c_event)
+
+cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
+ global g_gevent_activated
+
+ latent_event_arg = _LatentEventArg()
+ latent_event_arg.c_completion_queue = c_completion_queue
+ latent_event_arg.deadline = deadline
+
+ if g_gevent_activated:
+ # For gevent, completion_queue_next is run in a native thread pool.
+ global g_gevent_threadpool
+
+ result = g_gevent_threadpool.apply(_internal_latent_event, (latent_event_arg,))
+ return result
+ else:
+ return _internal_latent_event(latent_event_arg)
+
+cdef class CompletionQueue:
+
+ def __cinit__(self, shutdown_cq=False):
+ cdef grpc_completion_queue_attributes c_attrs
+ fork_handlers_and_grpc_init()
+ if shutdown_cq:
+ c_attrs.version = 1
+ c_attrs.cq_completion_type = GRPC_CQ_NEXT
+ c_attrs.cq_polling_type = GRPC_CQ_NON_LISTENING
+ c_attrs.cq_shutdown_cb = NULL
+ self.c_completion_queue = grpc_completion_queue_create(
+ grpc_completion_queue_factory_lookup(&c_attrs), &c_attrs, NULL);
+ else:
+ self.c_completion_queue = grpc_completion_queue_create_for_next(NULL)
+ self.is_shutting_down = False
+ self.is_shutdown = False
+
+ cdef _interpret_event(self, grpc_event c_event):
+ unused_tag, event = _interpret_event(c_event)
+ if event.completion_type == GRPC_QUEUE_SHUTDOWN:
+ self.is_shutdown = True
+ return event
+
+ def _internal_poll(self, deadline):
+ return self._interpret_event(_next(self.c_completion_queue, deadline))
+
+ # We name this 'poll' to avoid problems with CPython's expectations for
+ # 'special' methods (like next and __next__).
+ def poll(self, deadline=None):
+ global g_gevent_activated
+ if g_gevent_activated:
+ return g_gevent_threadpool.apply(CompletionQueue._internal_poll, (self, deadline))
+ else:
+ return self._internal_poll(deadline)
+
+ def shutdown(self):
+ with nogil:
+ grpc_completion_queue_shutdown(self.c_completion_queue)
+ self.is_shutting_down = True
+
+ def clear(self):
+ if not self.is_shutting_down:
+ raise ValueError('queue must be shutting down to be cleared')
+ while self.poll().type != GRPC_QUEUE_SHUTDOWN:
+ pass
+
+ def __dealloc__(self):
+ cdef gpr_timespec c_deadline
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
+ if self.c_completion_queue != NULL:
+ # Ensure shutdown
+ if not self.is_shutting_down:
+ grpc_completion_queue_shutdown(self.c_completion_queue)
+ # Pump the queue (All outstanding calls should have been cancelled)
+ while not self.is_shutdown:
+ event = grpc_completion_queue_next(
+ self.c_completion_queue, c_deadline, NULL)
+ self._interpret_event(event)
+ grpc_completion_queue_destroy(self.c_completion_queue)
+ grpc_shutdown()
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pxd.pxi
new file mode 100644
index 00000000000..827f6f17ca3
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pxd.pxi
@@ -0,0 +1,117 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class CallCredentials:
+
+ cdef grpc_call_credentials *c(self) except *
+
+ # TODO(https://github.com/grpc/grpc/issues/12531): remove.
+ cdef grpc_call_credentials *c_credentials
+
+
+cdef int _get_metadata(
+ void *state, grpc_auth_metadata_context context,
+ grpc_credentials_plugin_metadata_cb cb, void *user_data,
+ grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
+ size_t *num_creds_md, grpc_status_code *status,
+ const char **error_details) except * with gil
+
+cdef void _destroy(void *state) except * with gil
+
+
+cdef class MetadataPluginCallCredentials(CallCredentials):
+
+ cdef readonly object _metadata_plugin
+ cdef readonly bytes _name
+
+ cdef grpc_call_credentials *c(self) except *
+
+
+cdef grpc_call_credentials *_composition(call_credentialses)
+
+
+cdef class CompositeCallCredentials(CallCredentials):
+
+ cdef readonly tuple _call_credentialses
+
+ cdef grpc_call_credentials *c(self) except *
+
+
+cdef class ChannelCredentials:
+
+ cdef grpc_channel_credentials *c(self) except *
+
+
+cdef class SSLSessionCacheLRU:
+
+ cdef grpc_ssl_session_cache *_cache
+
+
+cdef class SSLChannelCredentials(ChannelCredentials):
+
+ cdef readonly object _pem_root_certificates
+ cdef readonly object _private_key
+ cdef readonly object _certificate_chain
+
+ cdef grpc_channel_credentials *c(self) except *
+
+
+cdef class CompositeChannelCredentials(ChannelCredentials):
+
+ cdef readonly tuple _call_credentialses
+ cdef readonly ChannelCredentials _channel_credentials
+
+ cdef grpc_channel_credentials *c(self) except *
+
+
+cdef class XDSChannelCredentials(ChannelCredentials):
+
+ cdef readonly ChannelCredentials _fallback_credentials
+
+ cdef grpc_channel_credentials *c(self) except *
+
+
+cdef class ServerCertificateConfig:
+
+ cdef grpc_ssl_server_certificate_config *c_cert_config
+ cdef const char *c_pem_root_certs
+ cdef grpc_ssl_pem_key_cert_pair *c_ssl_pem_key_cert_pairs
+ cdef size_t c_ssl_pem_key_cert_pairs_count
+ cdef list references
+
+
+cdef class ServerCredentials:
+
+ cdef grpc_server_credentials *c_credentials
+ cdef grpc_ssl_pem_key_cert_pair *c_ssl_pem_key_cert_pairs
+ cdef size_t c_ssl_pem_key_cert_pairs_count
+ cdef list references
+ # the cert config related state is used only if this credentials is
+ # created with cert config/fetcher
+ cdef object initial_cert_config
+ cdef object cert_config_fetcher
+ # whether C-core has asked for the initial_cert_config
+ cdef bint initial_cert_config_fetched
+
+
+cdef class LocalChannelCredentials(ChannelCredentials):
+
+ cdef grpc_local_connect_type _local_connect_type
+
+
+cdef class ALTSChannelCredentials(ChannelCredentials):
+ cdef grpc_alts_credentials_options *c_options
+
+ cdef grpc_channel_credentials *c(self) except *
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi
new file mode 100644
index 00000000000..27b56aa378b
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -0,0 +1,443 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def _spawn_callback_in_thread(cb_func, args):
+ t = ForkManagedThread(target=cb_func, args=args)
+ t.setDaemon(True)
+ t.start()
+
+async_callback_func = _spawn_callback_in_thread
+
+def set_async_callback_func(callback_func):
+ global async_callback_func
+ async_callback_func = callback_func
+
+def _spawn_callback_async(callback, args):
+ async_callback_func(callback, args)
+
+
+cdef class CallCredentials:
+
+ cdef grpc_call_credentials *c(self) except *:
+ raise NotImplementedError()
+
+
+cdef int _get_metadata(void *state,
+ grpc_auth_metadata_context context,
+ grpc_credentials_plugin_metadata_cb cb,
+ void *user_data,
+ grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
+ size_t *num_creds_md,
+ grpc_status_code *status,
+ const char **error_details) except * with gil:
+ cdef size_t metadata_count
+ cdef grpc_metadata *c_metadata
+ def callback(metadata, grpc_status_code status, bytes error_details):
+ cdef char* c_error_details = NULL
+ if error_details is not None:
+ c_error_details = <char*> error_details
+ if status == StatusCode.ok:
+ _store_c_metadata(metadata, &c_metadata, &metadata_count)
+ with nogil:
+ cb(user_data, c_metadata, metadata_count, status, NULL)
+ _release_c_metadata(c_metadata, metadata_count)
+ else:
+ with nogil:
+ cb(user_data, NULL, 0, status, c_error_details)
+ args = context.service_url, context.method_name, callback,
+ plugin = <object>state
+ if plugin._stored_ctx is not None:
+ plugin._stored_ctx.copy().run(_spawn_callback_async, plugin, args)
+ else:
+ _spawn_callback_async(<object>state, args)
+ return 0 # Asynchronous return
+
+
+cdef void _destroy(void *state) except * with gil:
+ cpython.Py_DECREF(<object>state)
+ grpc_shutdown()
+
+
+cdef class MetadataPluginCallCredentials(CallCredentials):
+
+ def __cinit__(self, metadata_plugin, name):
+ self._metadata_plugin = metadata_plugin
+ self._name = name
+
+ cdef grpc_call_credentials *c(self) except *:
+ cdef grpc_metadata_credentials_plugin c_metadata_plugin
+ c_metadata_plugin.get_metadata = _get_metadata
+ c_metadata_plugin.destroy = _destroy
+ c_metadata_plugin.state = <void *>self._metadata_plugin
+ c_metadata_plugin.type = self._name
+ cpython.Py_INCREF(self._metadata_plugin)
+ fork_handlers_and_grpc_init()
+ # TODO(yihuazhang): Expose min_security_level via the Python API so that
+ # applications can decide what minimum security level their plugins require.
+ return grpc_metadata_credentials_create_from_plugin(c_metadata_plugin, GRPC_PRIVACY_AND_INTEGRITY, NULL)
+
+
+cdef grpc_call_credentials *_composition(call_credentialses):
+ call_credentials_iterator = iter(call_credentialses)
+ cdef CallCredentials composition = next(call_credentials_iterator)
+ cdef grpc_call_credentials *c_composition = composition.c()
+ cdef CallCredentials additional_call_credentials
+ cdef grpc_call_credentials *c_additional_call_credentials
+ cdef grpc_call_credentials *c_next_composition
+ for additional_call_credentials in call_credentials_iterator:
+ c_additional_call_credentials = additional_call_credentials.c()
+ c_next_composition = grpc_composite_call_credentials_create(
+ c_composition, c_additional_call_credentials, NULL)
+ grpc_call_credentials_release(c_composition)
+ grpc_call_credentials_release(c_additional_call_credentials)
+ c_composition = c_next_composition
+ return c_composition
+
+
+cdef class CompositeCallCredentials(CallCredentials):
+
+ def __cinit__(self, call_credentialses):
+ self._call_credentialses = call_credentialses
+
+ cdef grpc_call_credentials *c(self) except *:
+ return _composition(self._call_credentialses)
+
+
+cdef class ChannelCredentials:
+
+ cdef grpc_channel_credentials *c(self) except *:
+ raise NotImplementedError()
+
+
+cdef class SSLSessionCacheLRU:
+
+ def __cinit__(self, capacity):
+ fork_handlers_and_grpc_init()
+ self._cache = grpc_ssl_session_cache_create_lru(capacity)
+
+ def __int__(self):
+ return <uintptr_t>self._cache
+
+ def __dealloc__(self):
+ if self._cache != NULL:
+ grpc_ssl_session_cache_destroy(self._cache)
+ grpc_shutdown()
+
+
+cdef class SSLChannelCredentials(ChannelCredentials):
+
+ def __cinit__(self, pem_root_certificates, private_key, certificate_chain):
+ if pem_root_certificates is not None and not isinstance(pem_root_certificates, bytes):
+ raise TypeError('expected certificate to be bytes, got %s' % (type(pem_root_certificates)))
+ self._pem_root_certificates = pem_root_certificates
+ self._private_key = private_key
+ self._certificate_chain = certificate_chain
+
+ cdef grpc_channel_credentials *c(self) except *:
+ cdef const char *c_pem_root_certificates
+ cdef grpc_ssl_pem_key_cert_pair c_pem_key_certificate_pair
+ if self._pem_root_certificates is None:
+ c_pem_root_certificates = NULL
+ else:
+ c_pem_root_certificates = self._pem_root_certificates
+ if self._private_key is None and self._certificate_chain is None:
+ return grpc_ssl_credentials_create(
+ c_pem_root_certificates, NULL, NULL, NULL)
+ else:
+ if self._private_key:
+ c_pem_key_certificate_pair.private_key = self._private_key
+ else:
+ c_pem_key_certificate_pair.private_key = NULL
+ if self._certificate_chain:
+ c_pem_key_certificate_pair.certificate_chain = self._certificate_chain
+ else:
+ c_pem_key_certificate_pair.certificate_chain = NULL
+ return grpc_ssl_credentials_create(
+ c_pem_root_certificates, &c_pem_key_certificate_pair, NULL, NULL)
+
+
+cdef class CompositeChannelCredentials(ChannelCredentials):
+
+ def __cinit__(self, call_credentialses, channel_credentials):
+ self._call_credentialses = call_credentialses
+ self._channel_credentials = channel_credentials
+
+ cdef grpc_channel_credentials *c(self) except *:
+ cdef grpc_channel_credentials *c_channel_credentials
+ c_channel_credentials = self._channel_credentials.c()
+ cdef grpc_call_credentials *c_call_credentials_composition = _composition(
+ self._call_credentialses)
+ cdef grpc_channel_credentials *composition
+ c_composition = grpc_composite_channel_credentials_create(
+ c_channel_credentials, c_call_credentials_composition, NULL)
+ grpc_channel_credentials_release(c_channel_credentials)
+ grpc_call_credentials_release(c_call_credentials_composition)
+ return c_composition
+
+
+cdef class XDSChannelCredentials(ChannelCredentials):
+
+ def __cinit__(self, fallback_credentials):
+ self._fallback_credentials = fallback_credentials
+
+ cdef grpc_channel_credentials *c(self) except *:
+ cdef grpc_channel_credentials *c_fallback_creds = self._fallback_credentials.c()
+ cdef grpc_channel_credentials *xds_creds = grpc_xds_credentials_create(c_fallback_creds)
+ grpc_channel_credentials_release(c_fallback_creds)
+ return xds_creds
+
+
+cdef class ServerCertificateConfig:
+
+ def __cinit__(self):
+ fork_handlers_and_grpc_init()
+ self.c_cert_config = NULL
+ self.c_pem_root_certs = NULL
+ self.c_ssl_pem_key_cert_pairs = NULL
+ self.references = []
+
+ def __dealloc__(self):
+ grpc_ssl_server_certificate_config_destroy(self.c_cert_config)
+ gpr_free(self.c_ssl_pem_key_cert_pairs)
+ grpc_shutdown()
+
+
+cdef class ServerCredentials:
+
+ def __cinit__(self):
+ fork_handlers_and_grpc_init()
+ self.c_credentials = NULL
+ self.references = []
+ self.initial_cert_config = None
+ self.cert_config_fetcher = None
+ self.initial_cert_config_fetched = False
+
+ def __dealloc__(self):
+ if self.c_credentials != NULL:
+ grpc_server_credentials_release(self.c_credentials)
+ grpc_shutdown()
+
+cdef const char* _get_c_pem_root_certs(pem_root_certs):
+ if pem_root_certs is None:
+ return NULL
+ else:
+ return pem_root_certs
+
+cdef grpc_ssl_pem_key_cert_pair* _create_c_ssl_pem_key_cert_pairs(pem_key_cert_pairs):
+ # return a malloc'ed grpc_ssl_pem_key_cert_pair from a _list_ of SslPemKeyCertPair
+ for pair in pem_key_cert_pairs:
+ if not isinstance(pair, SslPemKeyCertPair):
+ raise TypeError("expected pem_key_cert_pairs to be sequence of "
+ "SslPemKeyCertPair")
+ cdef size_t c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs)
+ cdef grpc_ssl_pem_key_cert_pair* c_ssl_pem_key_cert_pairs = NULL
+ with nogil:
+ c_ssl_pem_key_cert_pairs = (
+ <grpc_ssl_pem_key_cert_pair *>gpr_malloc(
+ sizeof(grpc_ssl_pem_key_cert_pair) * c_ssl_pem_key_cert_pairs_count))
+ for i in range(c_ssl_pem_key_cert_pairs_count):
+ c_ssl_pem_key_cert_pairs[i] = (
+ (<SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair)
+ return c_ssl_pem_key_cert_pairs
+
+def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs,
+ bint force_client_auth):
+ pem_root_certs = str_to_bytes(pem_root_certs)
+ pem_key_cert_pairs = list(pem_key_cert_pairs)
+ cdef ServerCredentials credentials = ServerCredentials()
+ credentials.references.append(pem_root_certs)
+ credentials.references.append(pem_key_cert_pairs)
+ cdef const char * c_pem_root_certs = _get_c_pem_root_certs(pem_root_certs)
+ credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs)
+ credentials.c_ssl_pem_key_cert_pairs = _create_c_ssl_pem_key_cert_pairs(pem_key_cert_pairs)
+ cdef grpc_ssl_server_certificate_config *c_cert_config = NULL
+ c_cert_config = grpc_ssl_server_certificate_config_create(
+ c_pem_root_certs, credentials.c_ssl_pem_key_cert_pairs,
+ credentials.c_ssl_pem_key_cert_pairs_count)
+ cdef grpc_ssl_server_credentials_options* c_options = NULL
+ # C-core assumes ownership of c_cert_config
+ c_options = grpc_ssl_server_credentials_create_options_using_config(
+ GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY
+ if force_client_auth else
+ GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE,
+ c_cert_config)
+ # C-core assumes ownership of c_options
+ credentials.c_credentials = grpc_ssl_server_credentials_create_with_options(c_options)
+ return credentials
+
+def server_certificate_config_ssl(pem_root_certs, pem_key_cert_pairs):
+ pem_root_certs = str_to_bytes(pem_root_certs)
+ pem_key_cert_pairs = list(pem_key_cert_pairs)
+ cdef ServerCertificateConfig cert_config = ServerCertificateConfig()
+ cert_config.references.append(pem_root_certs)
+ cert_config.references.append(pem_key_cert_pairs)
+ cert_config.c_pem_root_certs = _get_c_pem_root_certs(pem_root_certs)
+ cert_config.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs)
+ cert_config.c_ssl_pem_key_cert_pairs = _create_c_ssl_pem_key_cert_pairs(pem_key_cert_pairs)
+ cert_config.c_cert_config = grpc_ssl_server_certificate_config_create(
+ cert_config.c_pem_root_certs, cert_config.c_ssl_pem_key_cert_pairs,
+ cert_config.c_ssl_pem_key_cert_pairs_count)
+ return cert_config
+
+def server_credentials_ssl_dynamic_cert_config(initial_cert_config,
+ cert_config_fetcher,
+ bint force_client_auth):
+ if not isinstance(initial_cert_config, grpc.ServerCertificateConfiguration):
+ raise TypeError(
+ 'initial_cert_config must be a grpc.ServerCertificateConfiguration')
+ if not callable(cert_config_fetcher):
+ raise TypeError('cert_config_fetcher must be callable')
+ cdef ServerCredentials credentials = ServerCredentials()
+ credentials.initial_cert_config = initial_cert_config
+ credentials.cert_config_fetcher = cert_config_fetcher
+ cdef grpc_ssl_server_credentials_options* c_options = NULL
+ c_options = grpc_ssl_server_credentials_create_options_using_config_fetcher(
+ GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY
+ if force_client_auth else
+ GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE,
+ _server_cert_config_fetcher_wrapper,
+ <void*>credentials)
+ # C-core assumes ownership of c_options
+ credentials.c_credentials = grpc_ssl_server_credentials_create_with_options(c_options)
+ return credentials
+
+cdef grpc_ssl_certificate_config_reload_status _server_cert_config_fetcher_wrapper(
+ void* user_data, grpc_ssl_server_certificate_config **config) with gil:
+ # This is a credentials.ServerCertificateConfig
+ cdef ServerCertificateConfig cert_config = None
+ if not user_data:
+ raise ValueError('internal error: user_data must be specified')
+ credentials = <ServerCredentials>user_data
+ if not credentials.initial_cert_config_fetched:
+ # C-core is asking for the initial cert config
+ credentials.initial_cert_config_fetched = True
+ cert_config = credentials.initial_cert_config._certificate_configuration
+ else:
+ user_cb = credentials.cert_config_fetcher
+ try:
+ cert_config_wrapper = user_cb()
+ except Exception:
+ _LOGGER.exception('Error fetching certificate config')
+ return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_FAIL
+ if cert_config_wrapper is None:
+ return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_UNCHANGED
+ elif not isinstance(
+ cert_config_wrapper, grpc.ServerCertificateConfiguration):
+ _LOGGER.error(
+ 'Error fetching certificate configuration: certificate '
+ 'configuration must be of type grpc.ServerCertificateConfiguration, '
+ 'not %s' % type(cert_config_wrapper).__name__)
+ return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_FAIL
+ else:
+ cert_config = cert_config_wrapper._certificate_configuration
+ config[0] = <grpc_ssl_server_certificate_config*>cert_config.c_cert_config
+ # our caller will assume ownership of memory, so we have to recreate
+ # a copy of c_cert_config here
+ cert_config.c_cert_config = grpc_ssl_server_certificate_config_create(
+ cert_config.c_pem_root_certs, cert_config.c_ssl_pem_key_cert_pairs,
+ cert_config.c_ssl_pem_key_cert_pairs_count)
+ return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_NEW
+
+
+class LocalConnectionType:
+ uds = UDS
+ local_tcp = LOCAL_TCP
+
+cdef class LocalChannelCredentials(ChannelCredentials):
+
+ def __cinit__(self, grpc_local_connect_type local_connect_type):
+ self._local_connect_type = local_connect_type
+
+ cdef grpc_channel_credentials *c(self) except *:
+ cdef grpc_local_connect_type local_connect_type
+ local_connect_type = self._local_connect_type
+ return grpc_local_credentials_create(local_connect_type)
+
+def channel_credentials_local(grpc_local_connect_type local_connect_type):
+ return LocalChannelCredentials(local_connect_type)
+
+cdef class InsecureChannelCredentials(ChannelCredentials):
+
+ cdef grpc_channel_credentials *c(self) except *:
+ return grpc_insecure_credentials_create()
+
+def channel_credentials_insecure():
+ return InsecureChannelCredentials()
+
+def server_credentials_local(grpc_local_connect_type local_connect_type):
+ cdef ServerCredentials credentials = ServerCredentials()
+ credentials.c_credentials = grpc_local_server_credentials_create(local_connect_type)
+ return credentials
+
+def xds_server_credentials(ServerCredentials fallback_credentials):
+ cdef ServerCredentials credentials = ServerCredentials()
+ credentials.c_credentials = grpc_xds_server_credentials_create(fallback_credentials.c_credentials)
+ # NOTE: We do not need to call grpc_server_credentials_release on the
+ # fallback credentials here because this will be done by the __dealloc__
+ # method of its Cython wrapper.
+ return credentials
+
+def insecure_server_credentials():
+ cdef ServerCredentials credentials = ServerCredentials()
+ credentials.c_credentials = grpc_insecure_server_credentials_create()
+ return credentials
+
+cdef class ALTSChannelCredentials(ChannelCredentials):
+
+ def __cinit__(self, list service_accounts):
+ self.c_options = grpc_alts_credentials_client_options_create()
+ cdef str account
+ for account in service_accounts:
+ grpc_alts_credentials_client_options_add_target_service_account(
+ self.c_options, str_to_bytes(account))
+
+ def __dealloc__(self):
+ if self.c_options != NULL:
+ grpc_alts_credentials_options_destroy(self.c_options)
+
+ cdef grpc_channel_credentials *c(self) except *:
+ return grpc_alts_credentials_create(self.c_options)
+
+
+def channel_credentials_alts(list service_accounts):
+ return ALTSChannelCredentials(service_accounts)
+
+
+def server_credentials_alts():
+ cdef ServerCredentials credentials = ServerCredentials()
+ cdef grpc_alts_credentials_options* c_options = grpc_alts_credentials_server_options_create()
+ credentials.c_credentials = grpc_alts_server_credentials_create(c_options)
+ # Options can be destroyed as deep copy was performed.
+ grpc_alts_credentials_options_destroy(c_options)
+ return credentials
+
+
+cdef class ComputeEngineChannelCredentials(ChannelCredentials):
+ cdef grpc_channel_credentials* _c_creds
+ cdef grpc_call_credentials* _call_creds
+
+ def __cinit__(self, CallCredentials call_creds):
+ self._c_creds = NULL
+ self._call_creds = call_creds.c()
+ if self._call_creds == NULL:
+ raise ValueError("Call credentials may not be NULL.")
+
+ cdef grpc_channel_credentials *c(self) except *:
+ self._c_creds = grpc_google_default_credentials_create(self._call_creds)
+ return self._c_creds
+
+
+def channel_credentials_compute_engine(call_creds):
+ return ComputeEngineChannelCredentials(call_creds)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/csds.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/csds.pyx.pxi
new file mode 100644
index 00000000000..c33eb76e47f
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/csds.pyx.pxi
@@ -0,0 +1,21 @@
+# Copyright 2021 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def dump_xds_configs():
+ cdef grpc_slice client_config_in_slice
+ with nogil:
+ client_config_in_slice = grpc_dump_xds_configs()
+ cdef bytes result = _slice_bytes(client_config_in_slice)
+ return result
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pxd.pxi
new file mode 100644
index 00000000000..0f173c6bd21
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pxd.pxi
@@ -0,0 +1,47 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef class BaseEvent:
+ pass
+
+cdef class ConnectivityEvent(BaseEvent):
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+
+
+cdef class RequestCallEvent(BaseEvent):
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+ cdef readonly Call call
+ cdef readonly CallDetails call_details
+ cdef readonly tuple invocation_metadata
+
+
+cdef class BatchOperationEvent(BaseEvent):
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+ cdef readonly object batch_operations
+
+
+cdef class ServerShutdownEvent(BaseEvent):
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pyx.pxi
new file mode 100644
index 00000000000..49cd0392553
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/event.pyx.pxi
@@ -0,0 +1,54 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef class ConnectivityEvent(BaseEvent):
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+
+
+cdef class RequestCallEvent(BaseEvent):
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag,
+ Call call, CallDetails call_details, tuple invocation_metadata):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+ self.call = call
+ self.call_details = call_details
+ self.invocation_metadata = invocation_metadata
+
+
+cdef class BatchOperationEvent(BaseEvent):
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag,
+ object batch_operations):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+ self.batch_operations = batch_operations
+
+
+cdef class ServerShutdownEvent(BaseEvent):
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi
new file mode 100644
index 00000000000..a925bdd2e69
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pxd.pxi
@@ -0,0 +1,29 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef extern from "pthread.h" nogil:
+ int pthread_atfork(
+ void (*prepare)() nogil,
+ void (*parent)() nogil,
+ void (*child)() nogil)
+
+
+cdef void __prefork() nogil
+
+
+cdef void __postfork_parent() nogil
+
+
+cdef void __postfork_child() nogil \ No newline at end of file
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
new file mode 100644
index 00000000000..53657e8b1a9
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
@@ -0,0 +1,208 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+_AWAIT_THREADS_TIMEOUT_SECONDS = 5
+
+_TRUE_VALUES = ['yes', 'Yes', 'YES', 'true', 'True', 'TRUE', '1']
+
+# This flag enables experimental support within gRPC Python for applications
+# that will fork() without exec(). When enabled, gRPC Python will attempt to
+# pause all of its internally created threads before the fork syscall proceeds.
+#
+# For this to be successful, the application must not have multiple threads of
+# its own calling into gRPC when fork is invoked. Any callbacks from gRPC
+# Python-spawned threads into user code (e.g., callbacks for asynchronous RPCs)
+# must not block and should execute quickly.
+#
+# This flag is not supported on Windows.
+# This flag is also not supported for non-native IO manager.
+_GRPC_ENABLE_FORK_SUPPORT = (
+ os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0')
+ .lower() in _TRUE_VALUES)
+
+_fork_handler_failed = False
+
+cdef void __prefork() nogil:
+ with gil:
+ global _fork_handler_failed
+ _fork_handler_failed = False
+ with _fork_state.fork_in_progress_condition:
+ _fork_state.fork_in_progress = True
+ if not _fork_state.active_thread_count.await_zero_threads(
+ _AWAIT_THREADS_TIMEOUT_SECONDS):
+ _LOGGER.error(
+ 'Failed to shutdown gRPC Python threads prior to fork. '
+ 'Behavior after fork will be undefined.')
+ _fork_handler_failed = True
+
+
+cdef void __postfork_parent() nogil:
+ with gil:
+ with _fork_state.fork_in_progress_condition:
+ _fork_state.fork_in_progress = False
+ _fork_state.fork_in_progress_condition.notify_all()
+
+
+cdef void __postfork_child() nogil:
+ with gil:
+ try:
+ if _fork_handler_failed:
+ return
+ # Thread could be holding the fork_in_progress_condition inside of
+ # block_if_fork_in_progress() when fork occurs. Reset the lock here.
+ _fork_state.fork_in_progress_condition = threading.Condition()
+ # A thread in return_from_user_request_generator() may hold this lock
+ # when fork occurs.
+ _fork_state.active_thread_count = _ActiveThreadCount()
+ for state_to_reset in _fork_state.postfork_states_to_reset:
+ state_to_reset.reset_postfork_child()
+ _fork_state.postfork_states_to_reset = []
+ _fork_state.fork_epoch += 1
+ for channel in _fork_state.channels:
+ channel._close_on_fork()
+ with _fork_state.fork_in_progress_condition:
+ _fork_state.fork_in_progress = False
+ except:
+ _LOGGER.error('Exiting child due to raised exception')
+ _LOGGER.error(sys.exc_info()[0])
+ os._exit(os.EX_USAGE)
+
+ if grpc_is_initialized() > 0:
+ with gil:
+ _LOGGER.error('Failed to shutdown gRPC Core after fork()')
+ os._exit(os.EX_USAGE)
+
+
+def fork_handlers_and_grpc_init():
+ grpc_init()
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ with _fork_state.fork_handler_registered_lock:
+ if not _fork_state.fork_handler_registered:
+ pthread_atfork(&__prefork, &__postfork_parent, &__postfork_child)
+ _fork_state.fork_handler_registered = True
+
+
+
+
+class ForkManagedThread(object):
+ def __init__(self, target, args=()):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ def managed_target(*args):
+ try:
+ target(*args)
+ finally:
+ _fork_state.active_thread_count.decrement()
+ self._thread = threading.Thread(target=_run_with_context(managed_target), args=args)
+ else:
+ self._thread = threading.Thread(target=_run_with_context(target), args=args)
+
+ def setDaemon(self, daemonic):
+ self._thread.daemon = daemonic
+
+ def start(self):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.active_thread_count.increment()
+ self._thread.start()
+
+ def join(self):
+ self._thread.join()
+
+
+def block_if_fork_in_progress(postfork_state_to_reset=None):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ with _fork_state.fork_in_progress_condition:
+ if not _fork_state.fork_in_progress:
+ return
+ if postfork_state_to_reset is not None:
+ _fork_state.postfork_states_to_reset.append(postfork_state_to_reset)
+ _fork_state.active_thread_count.decrement()
+ _fork_state.fork_in_progress_condition.wait()
+ _fork_state.active_thread_count.increment()
+
+
+def enter_user_request_generator():
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.active_thread_count.decrement()
+
+
+def return_from_user_request_generator():
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.active_thread_count.increment()
+ block_if_fork_in_progress()
+
+
+def get_fork_epoch():
+ return _fork_state.fork_epoch
+
+
+def is_fork_support_enabled():
+ return _GRPC_ENABLE_FORK_SUPPORT
+
+
+def fork_register_channel(channel):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.channels.add(channel)
+
+
+def fork_unregister_channel(channel):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.channels.discard(channel)
+
+
+class _ActiveThreadCount(object):
+ def __init__(self):
+ self._num_active_threads = 0
+ self._condition = threading.Condition()
+
+ def increment(self):
+ with self._condition:
+ self._num_active_threads += 1
+
+ def decrement(self):
+ with self._condition:
+ self._num_active_threads -= 1
+ if self._num_active_threads == 0:
+ self._condition.notify_all()
+
+ def await_zero_threads(self, timeout_secs):
+ end_time = time.time() + timeout_secs
+ wait_time = timeout_secs
+ with self._condition:
+ while True:
+ if self._num_active_threads > 0:
+ self._condition.wait(wait_time)
+ if self._num_active_threads == 0:
+ return True
+ # Thread count may have increased before this re-obtains the
+ # lock after a notify(). Wait again until timeout_secs has
+ # elapsed.
+ wait_time = end_time - time.time()
+ if wait_time <= 0:
+ return False
+
+
+class _ForkState(object):
+ def __init__(self):
+ self.fork_in_progress_condition = threading.Condition()
+ self.fork_in_progress = False
+ self.postfork_states_to_reset = []
+ self.fork_handler_registered_lock = threading.Lock()
+ self.fork_handler_registered = False
+ self.active_thread_count = _ActiveThreadCount()
+ self.fork_epoch = 0
+ self.channels = set()
+
+
+_fork_state = _ForkState()
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_windows.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_windows.pyx.pxi
new file mode 100644
index 00000000000..67aaf4d033d
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/fork_windows.pyx.pxi
@@ -0,0 +1,61 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# No-op implementations for Windows.
+
+def fork_handlers_and_grpc_init():
+ grpc_init()
+
+
+class ForkManagedThread(object):
+ def __init__(self, target, args=()):
+ self._thread = threading.Thread(target=_run_with_context(target), args=args)
+
+ def setDaemon(self, daemonic):
+ self._thread.daemon = daemonic
+
+ def start(self):
+ self._thread.start()
+
+ def join(self):
+ self._thread.join()
+
+
+def block_if_fork_in_progress(postfork_state_to_reset=None):
+ pass
+
+
+def enter_user_request_generator():
+ pass
+
+
+def return_from_user_request_generator():
+ pass
+
+
+def get_fork_epoch():
+ return 0
+
+
+def is_fork_support_enabled():
+ return False
+
+
+def fork_register_channel(channel):
+ pass
+
+
+def fork_unregister_channel(channel):
+ pass
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi
new file mode 100644
index 00000000000..6e04e0cbfd4
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc.pxi
@@ -0,0 +1,735 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cimport libc.time
+
+ctypedef ssize_t intptr_t
+ctypedef size_t uintptr_t
+ctypedef signed char int8_t
+ctypedef signed short int16_t
+ctypedef signed int int32_t
+ctypedef signed long long int64_t
+ctypedef unsigned char uint8_t
+ctypedef unsigned short uint16_t
+ctypedef unsigned int uint32_t
+ctypedef unsigned long long uint64_t
+
+# C++ Utilities
+
+# NOTE(lidiz) Unfortunately, we can't use "cimport" here because Cython
+# links it with exception handling. It introduces new dependencies.
+cdef extern from "<queue>" namespace "std" nogil:
+ cdef cppclass queue[T]:
+ queue()
+ bint empty()
+ T& front()
+ T& back()
+ void pop()
+ void push(T&)
+ size_t size()
+
+
+cdef extern from "<mutex>" namespace "std" nogil:
+ cdef cppclass mutex:
+ mutex()
+ void lock()
+ void unlock()
+
+ cdef cppclass unique_lock[Mutex]:
+ unique_lock(Mutex&)
+
+cdef extern from "<condition_variable>" namespace "std" nogil:
+ cdef cppclass condition_variable:
+ condition_variable()
+ void notify_all()
+ void wait(unique_lock[mutex]&)
+
+# gRPC Core Declarations
+
+cdef extern from "grpc/support/alloc.h":
+
+ void *gpr_malloc(size_t size) nogil
+ void *gpr_zalloc(size_t size) nogil
+ void gpr_free(void *ptr) nogil
+ void *gpr_realloc(void *p, size_t size) nogil
+
+
+cdef extern from "grpc/byte_buffer_reader.h":
+
+ struct grpc_byte_buffer_reader:
+ # We don't care about the internals
+ pass
+
+
+cdef extern from "grpc/impl/codegen/grpc_types.h":
+ ctypedef struct grpc_completion_queue_functor:
+ void (*functor_run)(grpc_completion_queue_functor*, int);
+
+
+cdef extern from "grpc/grpc.h":
+
+ ctypedef struct grpc_slice:
+ # don't worry about writing out the members of grpc_slice; we never access
+ # them directly.
+ pass
+
+ grpc_slice grpc_slice_ref(grpc_slice s) nogil
+ void grpc_slice_unref(grpc_slice s) nogil
+ grpc_slice grpc_empty_slice() nogil
+ grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
+ grpc_slice grpc_slice_new_with_len(
+ void *p, size_t len, void (*destroy)(void *, size_t)) nogil
+ grpc_slice grpc_slice_malloc(size_t length) nogil
+ grpc_slice grpc_slice_from_copied_string(const char *source) nogil
+ grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len) nogil
+ grpc_slice grpc_slice_copy(grpc_slice s) nogil
+
+ # Declare functions for function-like macros (because Cython)...
+ void *grpc_slice_start_ptr "GRPC_SLICE_START_PTR" (grpc_slice s) nogil
+ size_t grpc_slice_length "GRPC_SLICE_LENGTH" (grpc_slice s) nogil
+
+ const int GPR_MS_PER_SEC
+ const int GPR_US_PER_SEC
+ const int GPR_NS_PER_SEC
+
+ ctypedef enum gpr_clock_type:
+ GPR_CLOCK_MONOTONIC
+ GPR_CLOCK_REALTIME
+ GPR_CLOCK_PRECISE
+ GPR_TIMESPAN
+
+ ctypedef struct gpr_timespec:
+ int64_t seconds "tv_sec"
+ int32_t nanoseconds "tv_nsec"
+ gpr_clock_type clock_type
+
+ gpr_timespec gpr_time_0(gpr_clock_type type) nogil
+ gpr_timespec gpr_inf_future(gpr_clock_type type) nogil
+ gpr_timespec gpr_inf_past(gpr_clock_type type) nogil
+
+ gpr_timespec gpr_now(gpr_clock_type clock) nogil
+
+ gpr_timespec gpr_convert_clock_type(gpr_timespec t,
+ gpr_clock_type target_clock) nogil
+
+ gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) nogil
+ gpr_timespec gpr_time_from_nanos(int64_t ns, gpr_clock_type type) nogil
+ double gpr_timespec_to_micros(gpr_timespec t) nogil
+
+ gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) nogil
+
+ int gpr_time_cmp(gpr_timespec a, gpr_timespec b) nogil
+
+ ctypedef struct grpc_byte_buffer:
+ # We don't care about the internals.
+ pass
+
+ grpc_byte_buffer *grpc_raw_byte_buffer_create(grpc_slice *slices,
+ size_t nslices) nogil
+ size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil
+ void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil
+
+ int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
+ grpc_byte_buffer *buffer) nogil
+ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
+ grpc_slice *slice) nogil
+ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil
+
+ ctypedef enum grpc_status_code:
+ GRPC_STATUS_OK
+ GRPC_STATUS_CANCELLED
+ GRPC_STATUS_UNKNOWN
+ GRPC_STATUS_INVALID_ARGUMENT
+ GRPC_STATUS_DEADLINE_EXCEEDED
+ GRPC_STATUS_NOT_FOUND
+ GRPC_STATUS_ALREADY_EXISTS
+ GRPC_STATUS_PERMISSION_DENIED
+ GRPC_STATUS_UNAUTHENTICATED
+ GRPC_STATUS_RESOURCE_EXHAUSTED
+ GRPC_STATUS_FAILED_PRECONDITION
+ GRPC_STATUS_ABORTED
+ GRPC_STATUS_OUT_OF_RANGE
+ GRPC_STATUS_UNIMPLEMENTED
+ GRPC_STATUS_INTERNAL
+ GRPC_STATUS_UNAVAILABLE
+ GRPC_STATUS_DATA_LOSS
+ GRPC_STATUS__DO_NOT_USE
+
+ const char *GRPC_ARG_ENABLE_CENSUS
+ const char *GRPC_ARG_MAX_CONCURRENT_STREAMS
+ const char *GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
+ const char *GRPC_ARG_MAX_SEND_MESSAGE_LENGTH
+ const char *GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
+ const char *GRPC_ARG_DEFAULT_AUTHORITY
+ const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING
+ const char *GRPC_ARG_SECONDARY_USER_AGENT_STRING
+ const char *GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
+ const char *GRPC_SSL_SESSION_CACHE_ARG
+ const char *_GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM \
+ "GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM"
+ const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL
+ const char *GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET
+
+ const int GRPC_WRITE_BUFFER_HINT
+ const int GRPC_WRITE_NO_COMPRESS
+ const int GRPC_WRITE_USED_MASK
+
+ const int GRPC_INITIAL_METADATA_WAIT_FOR_READY
+ const int GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
+ const int GRPC_INITIAL_METADATA_USED_MASK
+
+ const int GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
+
+ ctypedef struct grpc_completion_queue:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ ctypedef struct grpc_channel:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ ctypedef struct grpc_server:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ ctypedef struct grpc_call:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ ctypedef enum grpc_arg_type:
+ GRPC_ARG_STRING
+ GRPC_ARG_INTEGER
+ GRPC_ARG_POINTER
+
+ ctypedef struct grpc_arg_pointer_vtable:
+ void *(*copy)(void *)
+ void (*destroy)(void *)
+ int (*cmp)(void *, void *)
+
+ ctypedef struct grpc_arg_value_pointer:
+ void *address "p"
+ grpc_arg_pointer_vtable *vtable
+
+ union grpc_arg_value:
+ char *string
+ int integer
+ grpc_arg_value_pointer pointer
+
+ ctypedef struct grpc_arg:
+ grpc_arg_type type
+ char *key
+ grpc_arg_value value
+
+ ctypedef struct grpc_channel_args:
+ size_t arguments_length "num_args"
+ grpc_arg *arguments "args"
+
+ ctypedef enum grpc_stream_compression_level:
+ GRPC_STREAM_COMPRESS_LEVEL_NONE
+ GRPC_STREAM_COMPRESS_LEVEL_LOW
+ GRPC_STREAM_COMPRESS_LEVEL_MED
+ GRPC_STREAM_COMPRESS_LEVEL_HIGH
+
+ ctypedef enum grpc_call_error:
+ GRPC_CALL_OK
+ GRPC_CALL_ERROR
+ GRPC_CALL_ERROR_NOT_ON_SERVER
+ GRPC_CALL_ERROR_NOT_ON_CLIENT
+ GRPC_CALL_ERROR_ALREADY_ACCEPTED
+ GRPC_CALL_ERROR_ALREADY_INVOKED
+ GRPC_CALL_ERROR_NOT_INVOKED
+ GRPC_CALL_ERROR_ALREADY_FINISHED
+ GRPC_CALL_ERROR_TOO_MANY_OPERATIONS
+ GRPC_CALL_ERROR_INVALID_FLAGS
+ GRPC_CALL_ERROR_INVALID_METADATA
+
+ ctypedef enum grpc_cq_completion_type:
+ GRPC_CQ_NEXT
+ GRPC_CQ_PLUCK
+
+ ctypedef enum grpc_cq_polling_type:
+ GRPC_CQ_DEFAULT_POLLING
+ GRPC_CQ_NON_LISTENING
+ GRPC_CQ_NON_POLLING
+
+ ctypedef struct grpc_completion_queue_attributes:
+ int version
+ grpc_cq_completion_type cq_completion_type
+ grpc_cq_polling_type cq_polling_type
+ void* cq_shutdown_cb
+
+ ctypedef enum grpc_connectivity_state:
+ GRPC_CHANNEL_IDLE
+ GRPC_CHANNEL_CONNECTING
+ GRPC_CHANNEL_READY
+ GRPC_CHANNEL_TRANSIENT_FAILURE
+ GRPC_CHANNEL_SHUTDOWN
+
+ ctypedef struct grpc_metadata:
+ grpc_slice key
+ grpc_slice value
+ # ignore the 'internal_data.obfuscated' fields.
+
+ ctypedef enum grpc_completion_type:
+ GRPC_QUEUE_SHUTDOWN
+ GRPC_QUEUE_TIMEOUT
+ GRPC_OP_COMPLETE
+
+ ctypedef struct grpc_event:
+ grpc_completion_type type
+ int success
+ void *tag
+
+ ctypedef struct grpc_metadata_array:
+ size_t count
+ size_t capacity
+ grpc_metadata *metadata
+
+ void grpc_metadata_array_init(grpc_metadata_array *array) nogil
+ void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil
+
+ ctypedef struct grpc_call_details:
+ grpc_slice method
+ grpc_slice host
+ gpr_timespec deadline
+
+ void grpc_call_details_init(grpc_call_details *details) nogil
+ void grpc_call_details_destroy(grpc_call_details *details) nogil
+
+ ctypedef enum grpc_op_type:
+ GRPC_OP_SEND_INITIAL_METADATA
+ GRPC_OP_SEND_MESSAGE
+ GRPC_OP_SEND_CLOSE_FROM_CLIENT
+ GRPC_OP_SEND_STATUS_FROM_SERVER
+ GRPC_OP_RECV_INITIAL_METADATA
+ GRPC_OP_RECV_MESSAGE
+ GRPC_OP_RECV_STATUS_ON_CLIENT
+ GRPC_OP_RECV_CLOSE_ON_SERVER
+
+ ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level:
+ uint8_t is_set
+ grpc_compression_level level
+
+ ctypedef struct grpc_op_data_send_initial_metadata:
+ size_t count
+ grpc_metadata *metadata
+ grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
+
+ ctypedef struct grpc_op_data_send_status_from_server:
+ size_t trailing_metadata_count
+ grpc_metadata *trailing_metadata
+ grpc_status_code status
+ grpc_slice *status_details
+
+ ctypedef struct grpc_op_data_recv_status_on_client:
+ grpc_metadata_array *trailing_metadata
+ grpc_status_code *status
+ grpc_slice *status_details
+ char** error_string
+
+ ctypedef struct grpc_op_data_recv_close_on_server:
+ int *cancelled
+
+ ctypedef struct grpc_op_data_send_message:
+ grpc_byte_buffer *send_message
+
+ ctypedef struct grpc_op_data_receive_message:
+ grpc_byte_buffer **receive_message "recv_message"
+
+ ctypedef struct grpc_op_data_receive_initial_metadata:
+ grpc_metadata_array *receive_initial_metadata "recv_initial_metadata"
+
+ union grpc_op_data:
+ grpc_op_data_send_initial_metadata send_initial_metadata
+ grpc_op_data_send_message send_message
+ grpc_op_data_send_status_from_server send_status_from_server
+ grpc_op_data_receive_initial_metadata receive_initial_metadata "recv_initial_metadata"
+ grpc_op_data_receive_message receive_message "recv_message"
+ grpc_op_data_recv_status_on_client receive_status_on_client "recv_status_on_client"
+ grpc_op_data_recv_close_on_server receive_close_on_server "recv_close_on_server"
+
+ ctypedef struct grpc_op:
+ grpc_op_type type "op"
+ uint32_t flags
+ void * reserved
+ grpc_op_data data
+
+ void grpc_dont_init_openssl() nogil
+ void grpc_init() nogil
+ void grpc_shutdown() nogil
+ void grpc_shutdown_blocking() nogil
+ int grpc_is_initialized() nogil
+
+ ctypedef struct grpc_completion_queue_factory:
+ pass
+
+ grpc_completion_queue_factory *grpc_completion_queue_factory_lookup(
+ const grpc_completion_queue_attributes* attributes) nogil
+ grpc_completion_queue *grpc_completion_queue_create(
+ const grpc_completion_queue_factory* factory,
+ const grpc_completion_queue_attributes* attr, void* reserved) nogil
+ grpc_completion_queue *grpc_completion_queue_create_for_next(void *reserved) nogil
+
+ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
+ gpr_timespec deadline,
+ void *reserved) nogil
+ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
+ gpr_timespec deadline,
+ void *reserved) nogil
+ void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil
+ void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil
+
+ grpc_completion_queue *grpc_completion_queue_create_for_callback(
+ grpc_completion_queue_functor* shutdown_callback,
+ void *reserved) nogil
+
+ grpc_call_error grpc_call_start_batch(
+ grpc_call *call, const grpc_op *ops, size_t nops, void *tag,
+ void *reserved) nogil
+ const char* grpc_call_error_to_string(grpc_call_error error) nogil
+ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) nogil
+ grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
+ grpc_status_code status,
+ const char *description,
+ void *reserved) nogil
+ char *grpc_call_get_peer(grpc_call *call) nogil
+ void grpc_call_unref(grpc_call *call) nogil
+
+ grpc_call *grpc_channel_create_call(
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_completion_queue *completion_queue, grpc_slice method,
+ const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil
+ grpc_connectivity_state grpc_channel_check_connectivity_state(
+ grpc_channel *channel, int try_to_connect) nogil
+ void grpc_channel_watch_connectivity_state(
+ grpc_channel *channel, grpc_connectivity_state last_observed_state,
+ gpr_timespec deadline, grpc_completion_queue *cq, void *tag) nogil
+ char *grpc_channel_get_target(grpc_channel *channel) nogil
+ void grpc_channel_destroy(grpc_channel *channel) nogil
+
+ grpc_server *grpc_server_create(
+ const grpc_channel_args *args, void *reserved) nogil
+ grpc_call_error grpc_server_request_call(
+ grpc_server *server, grpc_call **call, grpc_call_details *details,
+ grpc_metadata_array *request_metadata, grpc_completion_queue
+ *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void
+ *tag_new) nogil
+ void grpc_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq,
+ void *reserved) nogil
+
+ ctypedef struct grpc_server_config_fetcher:
+ pass
+
+ void grpc_server_set_config_fetcher(
+ grpc_server* server, grpc_server_config_fetcher* config_fetcher) nogil
+
+ ctypedef struct grpc_server_xds_status_notifier:
+ void (*on_serving_status_update)(void* user_data, const char* uri,
+ grpc_status_code code,
+ const char* error_message)
+ void* user_data;
+
+ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
+ grpc_server_xds_status_notifier notifier,
+ const grpc_channel_args* args) nogil
+
+
+ void grpc_server_start(grpc_server *server) nogil
+ void grpc_server_shutdown_and_notify(
+ grpc_server *server, grpc_completion_queue *cq, void *tag) nogil
+ void grpc_server_cancel_all_calls(grpc_server *server) nogil
+ void grpc_server_destroy(grpc_server *server) nogil
+
+ char* grpc_channelz_get_top_channels(intptr_t start_channel_id)
+ char* grpc_channelz_get_servers(intptr_t start_server_id)
+ char* grpc_channelz_get_server(intptr_t server_id)
+ char* grpc_channelz_get_server_sockets(intptr_t server_id,
+ intptr_t start_socket_id,
+ intptr_t max_results)
+ char* grpc_channelz_get_channel(intptr_t channel_id)
+ char* grpc_channelz_get_subchannel(intptr_t subchannel_id)
+ char* grpc_channelz_get_socket(intptr_t socket_id)
+
+ grpc_slice grpc_dump_xds_configs() nogil
+
+
+cdef extern from "grpc/grpc_security.h":
+
+ # Declare this as an enum, this is the only way to make it a const in
+ # cython
+ enum: GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX
+
+ ctypedef enum grpc_ssl_roots_override_result:
+ GRPC_SSL_ROOTS_OVERRIDE_OK
+ GRPC_SSL_ROOTS_OVERRIDE_FAILED_PERMANENTLY
+ GRPC_SSL_ROOTS_OVERRIDE_FAILED
+
+ ctypedef enum grpc_ssl_client_certificate_request_type:
+ GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE,
+ GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY
+ GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY
+ GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY
+ GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY
+
+ ctypedef enum grpc_security_level:
+ GRPC_SECURITY_MIN
+ GRPC_SECURITY_NONE = GRPC_SECURITY_MIN
+ GRPC_INTEGRITY_ONLY
+ GRPC_PRIVACY_AND_INTEGRITY
+ GRPC_SECURITY_MAX = GRPC_PRIVACY_AND_INTEGRITY
+
+ ctypedef enum grpc_ssl_certificate_config_reload_status:
+ GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_UNCHANGED
+ GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_NEW
+ GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_FAIL
+
+ ctypedef struct grpc_ssl_server_certificate_config:
+ # We don't care about the internals
+ pass
+
+ ctypedef struct grpc_ssl_server_credentials_options:
+ # We don't care about the internals
+ pass
+
+ grpc_ssl_server_certificate_config * grpc_ssl_server_certificate_config_create(
+ const char *pem_root_certs,
+ const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
+ size_t num_key_cert_pairs)
+
+ void grpc_ssl_server_certificate_config_destroy(grpc_ssl_server_certificate_config *config)
+
+ ctypedef grpc_ssl_certificate_config_reload_status (*grpc_ssl_server_certificate_config_callback)(
+ void *user_data,
+ grpc_ssl_server_certificate_config **config)
+
+ grpc_ssl_server_credentials_options *grpc_ssl_server_credentials_create_options_using_config(
+ grpc_ssl_client_certificate_request_type client_certificate_request,
+ grpc_ssl_server_certificate_config *certificate_config)
+
+ grpc_ssl_server_credentials_options* grpc_ssl_server_credentials_create_options_using_config_fetcher(
+ grpc_ssl_client_certificate_request_type client_certificate_request,
+ grpc_ssl_server_certificate_config_callback cb,
+ void *user_data)
+
+ grpc_server_credentials *grpc_ssl_server_credentials_create_with_options(
+ grpc_ssl_server_credentials_options *options)
+
+ ctypedef struct grpc_ssl_pem_key_cert_pair:
+ const char *private_key
+ const char *certificate_chain "cert_chain"
+
+ ctypedef struct grpc_channel_credentials:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ ctypedef struct grpc_call_credentials:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ ctypedef struct grpc_ssl_session_cache:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ ctypedef struct verify_peer_options:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs)
+
+ grpc_ssl_session_cache *grpc_ssl_session_cache_create_lru(size_t capacity)
+ void grpc_ssl_session_cache_destroy(grpc_ssl_session_cache* cache)
+
+ void grpc_set_ssl_roots_override_callback(
+ grpc_ssl_roots_override_callback cb) nogil
+
+ grpc_channel_credentials *grpc_google_default_credentials_create(grpc_call_credentials* call_credentials) nogil
+ grpc_channel_credentials *grpc_ssl_credentials_create(
+ const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair,
+ verify_peer_options *verify_options, void *reserved) nogil
+ grpc_channel_credentials *grpc_composite_channel_credentials_create(
+ grpc_channel_credentials *creds1, grpc_call_credentials *creds2,
+ void *reserved) nogil
+ void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil
+
+ grpc_channel_credentials *grpc_xds_credentials_create(
+ grpc_channel_credentials *fallback_creds) nogil
+
+ grpc_channel_credentials *grpc_insecure_credentials_create() nogil
+
+ grpc_server_credentials *grpc_xds_server_credentials_create(
+ grpc_server_credentials *fallback_creds) nogil
+
+ grpc_server_credentials *grpc_insecure_server_credentials_create() nogil
+
+ grpc_call_credentials *grpc_composite_call_credentials_create(
+ grpc_call_credentials *creds1, grpc_call_credentials *creds2,
+ void *reserved) nogil
+ grpc_call_credentials *grpc_google_compute_engine_credentials_create(
+ void *reserved) nogil
+ grpc_call_credentials *grpc_service_account_jwt_access_credentials_create(
+ const char *json_key,
+ gpr_timespec token_lifetime, void *reserved) nogil
+ grpc_call_credentials *grpc_google_refresh_token_credentials_create(
+ const char *json_refresh_token, void *reserved) nogil
+ grpc_call_credentials *grpc_google_iam_credentials_create(
+ const char *authorization_token, const char *authority_selector,
+ void *reserved) nogil
+ void grpc_call_credentials_release(grpc_call_credentials *creds) nogil
+
+ grpc_channel *grpc_channel_create(
+ const char *target, grpc_channel_credentials *creds,
+ const grpc_channel_args *args) nogil
+
+ ctypedef struct grpc_server_credentials:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ void grpc_server_credentials_release(grpc_server_credentials *creds) nogil
+
+ int grpc_server_add_http2_port(grpc_server *server, const char *addr,
+ grpc_server_credentials *creds) nogil
+
+ grpc_call_error grpc_call_set_credentials(grpc_call *call,
+ grpc_call_credentials *creds) nogil
+
+ ctypedef struct grpc_auth_context:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ ctypedef struct grpc_auth_metadata_context:
+ const char *service_url
+ const char *method_name
+ const grpc_auth_context *channel_auth_context
+
+ ctypedef void (*grpc_credentials_plugin_metadata_cb)(
+ void *user_data, const grpc_metadata *creds_md, size_t num_creds_md,
+ grpc_status_code status, const char *error_details) nogil
+
+ ctypedef struct grpc_metadata_credentials_plugin:
+ int (*get_metadata)(
+ void *state, grpc_auth_metadata_context context,
+ grpc_credentials_plugin_metadata_cb cb, void *user_data,
+ grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
+ size_t *num_creds_md, grpc_status_code *status,
+ const char **error_details) except *
+ void (*destroy)(void *state) except *
+ void *state
+ const char *type
+
+ grpc_call_credentials *grpc_metadata_credentials_create_from_plugin(
+ grpc_metadata_credentials_plugin plugin, grpc_security_level min_security_level, void *reserved) nogil
+
+ ctypedef struct grpc_auth_property_iterator:
+ pass
+
+ ctypedef struct grpc_auth_property:
+ char *name
+ char *value
+ size_t value_length
+
+ grpc_auth_property *grpc_auth_property_iterator_next(
+ grpc_auth_property_iterator *it)
+
+ grpc_auth_property_iterator grpc_auth_context_property_iterator(
+ const grpc_auth_context *ctx)
+
+ grpc_auth_property_iterator grpc_auth_context_peer_identity(
+ const grpc_auth_context *ctx)
+
+ char *grpc_auth_context_peer_identity_property_name(
+ const grpc_auth_context *ctx)
+
+ grpc_auth_property_iterator grpc_auth_context_find_properties_by_name(
+ const grpc_auth_context *ctx, const char *name)
+
+ grpc_auth_context_peer_is_authenticated(
+ const grpc_auth_context *ctx)
+
+ grpc_auth_context *grpc_call_auth_context(grpc_call *call)
+
+ void grpc_auth_context_release(grpc_auth_context *context)
+
+ grpc_channel_credentials *grpc_local_credentials_create(
+ grpc_local_connect_type type)
+ grpc_server_credentials *grpc_local_server_credentials_create(
+ grpc_local_connect_type type)
+
+ ctypedef struct grpc_alts_credentials_options:
+ # We don't care about the internals (and in fact don't know them)
+ pass
+
+ grpc_channel_credentials *grpc_alts_credentials_create(
+ const grpc_alts_credentials_options *options)
+ grpc_server_credentials *grpc_alts_server_credentials_create(
+ const grpc_alts_credentials_options *options)
+
+ grpc_alts_credentials_options* grpc_alts_credentials_client_options_create()
+ grpc_alts_credentials_options* grpc_alts_credentials_server_options_create()
+ void grpc_alts_credentials_options_destroy(grpc_alts_credentials_options *options)
+ void grpc_alts_credentials_client_options_add_target_service_account(grpc_alts_credentials_options *options, const char *service_account)
+
+
+
+cdef extern from "grpc/compression.h":
+
+ ctypedef enum grpc_compression_algorithm:
+ GRPC_COMPRESS_NONE
+ GRPC_COMPRESS_DEFLATE
+ GRPC_COMPRESS_GZIP
+ GRPC_COMPRESS_STREAM_GZIP
+ GRPC_COMPRESS_ALGORITHMS_COUNT
+
+ ctypedef enum grpc_compression_level:
+ GRPC_COMPRESS_LEVEL_NONE
+ GRPC_COMPRESS_LEVEL_LOW
+ GRPC_COMPRESS_LEVEL_MED
+ GRPC_COMPRESS_LEVEL_HIGH
+ GRPC_COMPRESS_LEVEL_COUNT
+
+ ctypedef struct grpc_compression_options:
+ uint32_t enabled_algorithms_bitset
+
+ int grpc_compression_algorithm_parse(
+ grpc_slice value, grpc_compression_algorithm *algorithm) nogil
+ int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
+ const char **name) nogil
+ grpc_compression_algorithm grpc_compression_algorithm_for_level(
+ grpc_compression_level level, uint32_t accepted_encodings) nogil
+ void grpc_compression_options_init(grpc_compression_options *opts) nogil
+ void grpc_compression_options_enable_algorithm(
+ grpc_compression_options *opts,
+ grpc_compression_algorithm algorithm) nogil
+ void grpc_compression_options_disable_algorithm(
+ grpc_compression_options *opts,
+ grpc_compression_algorithm algorithm) nogil
+ int grpc_compression_options_is_algorithm_enabled(
+ const grpc_compression_options *opts,
+ grpc_compression_algorithm algorithm) nogil
+
+cdef extern from "grpc/impl/codegen/compression_types.h":
+
+ const char *_GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY \
+ "GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY"
+
+
+cdef extern from "grpc/grpc_security_constants.h":
+ ctypedef enum grpc_local_connect_type:
+ UDS
+ LOCAL_TCP
+
+cdef extern from "src/core/lib/config/config_vars.h" namespace "grpc_core":
+ cdef cppclass ConfigVars:
+ @staticmethod
+ void Reset()
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi
new file mode 100644
index 00000000000..baa9fb54a3e
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi
@@ -0,0 +1,21 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# distutils: language=c++
+
+g_gevent_threadpool = None
+g_gevent_activated = False
+
+cpdef void gevent_increment_channel_count()
+
+cpdef void gevent_decrement_channel_count()
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
new file mode 100644
index 00000000000..41d27df5948
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
@@ -0,0 +1,137 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# distutils: language=c++
+
+from libc cimport string
+from cython.operator cimport dereference
+
+from cpython cimport Py_INCREF, Py_DECREF
+
+import atexit
+import errno
+import sys
+
+gevent_hub = None
+g_gevent_pool = None
+g_gevent_threadpool = None
+g_gevent_activated = False
+
+
+cdef queue[void*] g_greenlets_to_run
+cdef condition_variable g_greenlets_cv
+cdef mutex g_greenlets_mu
+cdef bint g_shutdown_greenlets_to_run_queue = False
+cdef int g_channel_count = 0
+
+
+cdef _submit_to_greenlet_queue(object cb, tuple args):
+ cdef tuple to_call = (cb,) + args
+ cdef unique_lock[mutex]* lk
+ Py_INCREF(to_call)
+ with nogil:
+ lk = new unique_lock[mutex](g_greenlets_mu)
+ g_greenlets_to_run.push(<void*>(to_call))
+ del lk
+ g_greenlets_cv.notify_all()
+
+
+cpdef void gevent_increment_channel_count():
+ global g_channel_count
+ cdef int old_channel_count
+ with nogil:
+ lk = new unique_lock[mutex](g_greenlets_mu)
+ old_channel_count = g_channel_count
+ g_channel_count += 1
+ del lk
+ if old_channel_count == 0:
+ run_spawn_greenlets()
+
+
+cpdef void gevent_decrement_channel_count():
+ global g_channel_count
+ with nogil:
+ lk = new unique_lock[mutex](g_greenlets_mu)
+ g_channel_count -= 1
+ if g_channel_count == 0:
+ g_greenlets_cv.notify_all()
+ del lk
+
+
+cdef object await_next_greenlet():
+ cdef unique_lock[mutex]* lk
+ with nogil:
+ # Cython doesn't allow us to do proper stack allocations, so we can't take
+ # advantage of RAII.
+ lk = new unique_lock[mutex](g_greenlets_mu)
+ while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0:
+ if not g_greenlets_to_run.empty():
+ break
+ g_greenlets_cv.wait(dereference(lk))
+ if g_channel_count == 0:
+ del lk
+ return None
+ if g_shutdown_greenlets_to_run_queue:
+ del lk
+ return None
+ cdef object to_call = <object>g_greenlets_to_run.front()
+ Py_DECREF(to_call)
+ g_greenlets_to_run.pop()
+ del lk
+ return to_call
+
+def spawn_greenlets():
+ while True:
+ to_call = g_gevent_threadpool.apply(await_next_greenlet, ())
+ if to_call is None:
+ break
+ fn = to_call[0]
+ args = to_call[1:]
+ fn(*args)
+
+def run_spawn_greenlets():
+ g_gevent_pool.spawn(spawn_greenlets)
+
+def shutdown_await_next_greenlet():
+ global g_shutdown_greenlets_to_run_queue
+ cdef unique_lock[mutex]* lk
+ with nogil:
+ lk = new unique_lock[mutex](g_greenlets_mu)
+ g_shutdown_greenlets_to_run_queue = True
+ del lk
+ g_greenlets_cv.notify_all()
+
+def init_grpc_gevent():
+ # Lazily import gevent
+ global gevent_hub
+ global g_gevent_threadpool
+ global g_gevent_activated
+ global g_interrupt_check_period_ms
+ global g_gevent_pool
+
+ import gevent
+ import gevent.pool
+
+ gevent_hub = gevent.hub
+ g_gevent_threadpool = gevent_hub.get_hub().threadpool
+
+ g_gevent_activated = True
+ g_interrupt_check_period_ms = 2000
+
+ g_gevent_pool = gevent.pool.Group()
+
+
+ set_async_callback_func(_submit_to_greenlet_queue)
+
+ # TODO: Document how this all works.
+ atexit.register(shutdown_await_next_greenlet)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_string.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
new file mode 100644
index 00000000000..5c1e0679a97
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
@@ -0,0 +1,51 @@
+# Copyright 2016 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This function will ascii encode unicode string inputs if necessary.
+# In Python3, unicode strings are the default str type.
+cdef bytes str_to_bytes(object s):
+ if s is None or isinstance(s, bytes):
+ return s
+ elif isinstance(s, unicode):
+ return s.encode('ascii')
+ else:
+ raise TypeError('Expected bytes, str, or unicode, not {}'.format(type(s)))
+
+
+# TODO(https://github.com/grpc/grpc/issues/13782): It would be nice for us if
+# the type of metadata that we accept were exactly the same as the type of
+# metadata that we deliver to our users (so "str" for this function's
+# parameter rather than "object"), but would it be nice for our users? Right
+# now we haven't yet heard from enough users to know one way or another.
+cdef bytes _encode(object string_or_none):
+ if string_or_none is None:
+ return b''
+ elif isinstance(string_or_none, (bytes,)):
+ return <bytes>string_or_none
+ elif isinstance(string_or_none, (unicode,)):
+ return string_or_none.encode('utf8')
+ else:
+ raise TypeError('Expected str, not {}'.format(type(string_or_none)))
+
+
+cdef str _decode(bytes bytestring):
+ if isinstance(bytestring, (str,)):
+ return <str>bytestring
+ else:
+ try:
+ return bytestring.decode('utf8')
+ except UnicodeDecodeError:
+ _LOGGER.exception('Invalid encoding on %s', bytestring)
+ return bytestring.decode('latin1')
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pxd.pxi
new file mode 100644
index 00000000000..fc72ac1576b
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pxd.pxi
@@ -0,0 +1,26 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef void _store_c_metadata(
+ metadata, grpc_metadata **c_metadata, size_t *c_count) except *
+
+
+cdef void _release_c_metadata(grpc_metadata *c_metadata, int count) except *
+
+
+cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice)
+
+
+cdef tuple _metadata(grpc_metadata_array *c_metadata_array)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pyx.pxi
new file mode 100644
index 00000000000..b2dd1e33808
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/metadata.pyx.pxi
@@ -0,0 +1,73 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+
+class InitialMetadataFlags:
+ used_mask = GRPC_INITIAL_METADATA_USED_MASK
+ wait_for_ready = GRPC_INITIAL_METADATA_WAIT_FOR_READY
+ wait_for_ready_explicitly_set = GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
+
+
+_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',))
+
+
+cdef void _store_c_metadata(
+ metadata, grpc_metadata **c_metadata, size_t *c_count) except *:
+ if metadata is None:
+ c_count[0] = 0
+ c_metadata[0] = NULL
+ else:
+ metadatum_count = len(metadata)
+ if metadatum_count == 0:
+ c_count[0] = 0
+ c_metadata[0] = NULL
+ else:
+ c_count[0] = metadatum_count
+ c_metadata[0] = <grpc_metadata *>gpr_malloc(
+ metadatum_count * sizeof(grpc_metadata))
+ for index, (key, value) in enumerate(metadata):
+ encoded_key = _encode(key)
+ encoded_value = value if encoded_key[-4:] == b'-bin' else _encode(value)
+ if not isinstance(encoded_value, bytes):
+ raise TypeError('Binary metadata key="%s" expected bytes, got %s' % (
+ key,
+ type(encoded_value)
+ ))
+ c_metadata[0][index].key = _slice_from_bytes(encoded_key)
+ c_metadata[0][index].value = _slice_from_bytes(encoded_value)
+
+
+cdef void _release_c_metadata(grpc_metadata *c_metadata, int count) except *:
+ if 0 < count:
+ for index in range(count):
+ grpc_slice_unref(c_metadata[index].key)
+ grpc_slice_unref(c_metadata[index].value)
+ gpr_free(c_metadata)
+
+
+cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice):
+ cdef bytes key = _slice_bytes(key_slice)
+ cdef bytes value = _slice_bytes(value_slice)
+ return <tuple>_Metadatum(
+ _decode(key), value if key[-4:] == b'-bin' else _decode(value))
+
+
+cdef tuple _metadata(grpc_metadata_array *c_metadata_array):
+ return tuple(
+ _metadatum(
+ c_metadata_array.metadata[index].key,
+ c_metadata_array.metadata[index].value)
+ for index in range(c_metadata_array.count))
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pxd.pxi
new file mode 100644
index 00000000000..c9df32dadf5
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pxd.pxi
@@ -0,0 +1,111 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Operation:
+
+ cdef void c(self) except *
+ cdef void un_c(self) except *
+
+ # TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this!
+ cdef grpc_op c_op
+
+
+cdef class SendInitialMetadataOperation(Operation):
+
+ cdef readonly object _initial_metadata;
+ cdef readonly int _flags
+ cdef grpc_metadata *_c_initial_metadata
+ cdef size_t _c_initial_metadata_count
+
+ cdef void c(self) except *
+ cdef void un_c(self) except *
+
+
+cdef class SendMessageOperation(Operation):
+
+ cdef readonly bytes _message
+ cdef readonly int _flags
+ cdef grpc_byte_buffer *_c_message_byte_buffer
+
+ cdef void c(self) except *
+ cdef void un_c(self) except *
+
+
+cdef class SendCloseFromClientOperation(Operation):
+
+ cdef readonly int _flags
+
+ cdef void c(self) except *
+ cdef void un_c(self) except *
+
+
+cdef class SendStatusFromServerOperation(Operation):
+
+ cdef readonly object _trailing_metadata
+ cdef readonly object _code
+ cdef readonly object _details
+ cdef readonly int _flags
+ cdef grpc_metadata *_c_trailing_metadata
+ cdef size_t _c_trailing_metadata_count
+ cdef grpc_slice _c_details
+
+ cdef void c(self) except *
+ cdef void un_c(self) except *
+
+
+cdef class ReceiveInitialMetadataOperation(Operation):
+
+ cdef readonly int _flags
+ cdef tuple _initial_metadata
+ cdef grpc_metadata_array _c_initial_metadata
+
+ cdef void c(self) except *
+ cdef void un_c(self) except *
+
+
+cdef class ReceiveMessageOperation(Operation):
+
+ cdef readonly int _flags
+ cdef grpc_byte_buffer *_c_message_byte_buffer
+ cdef bytes _message
+
+ cdef void c(self) except *
+ cdef void un_c(self) except *
+
+
+cdef class ReceiveStatusOnClientOperation(Operation):
+
+ cdef readonly int _flags
+ cdef grpc_metadata_array _c_trailing_metadata
+ cdef grpc_status_code _c_code
+ cdef grpc_slice _c_details
+ cdef const char* _c_error_string
+ cdef tuple _trailing_metadata
+ cdef object _code
+ cdef str _details
+ cdef str _error_string
+
+ cdef void c(self) except *
+ cdef void un_c(self) except *
+
+
+cdef class ReceiveCloseOnServerOperation(Operation):
+
+ cdef readonly int _flags
+ cdef object _cancelled
+ cdef int _c_cancelled
+
+ cdef void c(self) except *
+ cdef void un_c(self) except *
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pyx.pxi
new file mode 100644
index 00000000000..3f3fd75407c
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/operation.pyx.pxi
@@ -0,0 +1,250 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Operation:
+
+ cdef void c(self) except *:
+ raise NotImplementedError()
+
+ cdef void un_c(self) except *:
+ raise NotImplementedError()
+
+
+cdef class SendInitialMetadataOperation(Operation):
+
+ def __cinit__(self, initial_metadata, flags):
+ self._initial_metadata = initial_metadata
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_INITIAL_METADATA
+
+ cdef void c(self) except *:
+ self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
+ self.c_op.flags = self._flags
+ _store_c_metadata(
+ self._initial_metadata, &self._c_initial_metadata,
+ &self._c_initial_metadata_count)
+ self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata
+ self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count
+ self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0
+
+ cdef void un_c(self) except *:
+ _release_c_metadata(
+ self._c_initial_metadata, self._c_initial_metadata_count)
+
+
+cdef class SendMessageOperation(Operation):
+
+ def __cinit__(self, bytes message, int flags):
+ if message is None:
+ self._message = b''
+ else:
+ self._message = message
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_MESSAGE
+
+ cdef void c(self) except *:
+ self.c_op.type = GRPC_OP_SEND_MESSAGE
+ self.c_op.flags = self._flags
+ cdef grpc_slice message_slice = grpc_slice_from_copied_buffer(
+ self._message, len(self._message))
+ self._c_message_byte_buffer = grpc_raw_byte_buffer_create(
+ &message_slice, 1)
+ grpc_slice_unref(message_slice)
+ self.c_op.data.send_message.send_message = self._c_message_byte_buffer
+
+ cdef void un_c(self) except *:
+ grpc_byte_buffer_destroy(self._c_message_byte_buffer)
+
+
+cdef class SendCloseFromClientOperation(Operation):
+
+ def __cinit__(self, int flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_CLOSE_FROM_CLIENT
+
+ cdef void c(self) except *:
+ self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
+ self.c_op.flags = self._flags
+
+ cdef void un_c(self) except *:
+ pass
+
+
+cdef class SendStatusFromServerOperation(Operation):
+
+ def __cinit__(self, trailing_metadata, code, object details, int flags):
+ self._trailing_metadata = trailing_metadata
+ self._code = code
+ self._details = details
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_STATUS_FROM_SERVER
+
+ cdef void c(self) except *:
+ self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
+ self.c_op.flags = self._flags
+ _store_c_metadata(
+ self._trailing_metadata, &self._c_trailing_metadata,
+ &self._c_trailing_metadata_count)
+ self.c_op.data.send_status_from_server.trailing_metadata = (
+ self._c_trailing_metadata)
+ self.c_op.data.send_status_from_server.trailing_metadata_count = (
+ self._c_trailing_metadata_count)
+ self.c_op.data.send_status_from_server.status = self._code
+ self._c_details = _slice_from_bytes(_encode(self._details))
+ self.c_op.data.send_status_from_server.status_details = &self._c_details
+
+ cdef void un_c(self) except *:
+ grpc_slice_unref(self._c_details)
+ _release_c_metadata(
+ self._c_trailing_metadata, self._c_trailing_metadata_count)
+
+
+cdef class ReceiveInitialMetadataOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_INITIAL_METADATA
+
+ cdef void c(self) except *:
+ self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
+ self.c_op.flags = self._flags
+ grpc_metadata_array_init(&self._c_initial_metadata)
+ self.c_op.data.receive_initial_metadata.receive_initial_metadata = (
+ &self._c_initial_metadata)
+
+ cdef void un_c(self) except *:
+ self._initial_metadata = _metadata(&self._c_initial_metadata)
+ grpc_metadata_array_destroy(&self._c_initial_metadata)
+
+ def initial_metadata(self):
+ return self._initial_metadata
+
+
+cdef class ReceiveMessageOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_MESSAGE
+
+ cdef void c(self) except *:
+ self.c_op.type = GRPC_OP_RECV_MESSAGE
+ self.c_op.flags = self._flags
+ self.c_op.data.receive_message.receive_message = (
+ &self._c_message_byte_buffer)
+
+ cdef void un_c(self) except *:
+ cdef grpc_byte_buffer_reader message_reader
+ cdef bint message_reader_status
+ cdef grpc_slice message_slice
+ cdef size_t message_slice_length
+ cdef void *message_slice_pointer
+ if self._c_message_byte_buffer != NULL:
+ message_reader_status = grpc_byte_buffer_reader_init(
+ &message_reader, self._c_message_byte_buffer)
+ if message_reader_status:
+ message = bytearray()
+ while grpc_byte_buffer_reader_next(&message_reader, &message_slice):
+ message_slice_pointer = grpc_slice_start_ptr(message_slice)
+ message_slice_length = grpc_slice_length(message_slice)
+ message += (<char *>message_slice_pointer)[:message_slice_length]
+ grpc_slice_unref(message_slice)
+ grpc_byte_buffer_reader_destroy(&message_reader)
+ self._message = bytes(message)
+ else:
+ self._message = None
+ grpc_byte_buffer_destroy(self._c_message_byte_buffer)
+ else:
+ self._message = None
+
+ def message(self):
+ return self._message
+
+
+cdef class ReceiveStatusOnClientOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_STATUS_ON_CLIENT
+
+ cdef void c(self) except *:
+ self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
+ self.c_op.flags = self._flags
+ grpc_metadata_array_init(&self._c_trailing_metadata)
+ self.c_op.data.receive_status_on_client.trailing_metadata = (
+ &self._c_trailing_metadata)
+ self.c_op.data.receive_status_on_client.status = (
+ &self._c_code)
+ self.c_op.data.receive_status_on_client.status_details = (
+ &self._c_details)
+ self.c_op.data.receive_status_on_client.error_string = (
+ &self._c_error_string)
+
+ cdef void un_c(self) except *:
+ self._trailing_metadata = _metadata(&self._c_trailing_metadata)
+ grpc_metadata_array_destroy(&self._c_trailing_metadata)
+ self._code = self._c_code
+ self._details = _decode(_slice_bytes(self._c_details))
+ grpc_slice_unref(self._c_details)
+ if self._c_error_string != NULL:
+ self._error_string = _decode(self._c_error_string)
+ gpr_free(<void*>self._c_error_string)
+ else:
+ self._error_string = ""
+
+ def trailing_metadata(self):
+ return self._trailing_metadata
+
+ def code(self):
+ return self._code
+
+ def details(self):
+ return self._details
+
+ def error_string(self):
+ return self._error_string
+
+
+cdef class ReceiveCloseOnServerOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_CLOSE_ON_SERVER
+
+ cdef void c(self) except *:
+ self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
+ self.c_op.flags = self._flags
+ self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled
+
+ cdef void un_c(self) except *:
+ self._cancelled = bool(self._c_cancelled)
+
+ def cancelled(self):
+ return self._cancelled
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi
new file mode 100644
index 00000000000..3182aa54de5
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi
@@ -0,0 +1,20 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef extern from "grpc/impl/propagation_bits.h":
+ cdef int _GRPC_PROPAGATE_DEADLINE "GRPC_PROPAGATE_DEADLINE"
+ cdef int _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT "GRPC_PROPAGATE_CENSUS_STATS_CONTEXT"
+ cdef int _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT "GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT"
+ cdef int _GRPC_PROPAGATE_CANCELLATION "GRPC_PROPAGATE_CANCELLATION"
+ cdef int _GRPC_PROPAGATE_DEFAULTS "GRPC_PROPAGATE_DEFAULTS"
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi
new file mode 100644
index 00000000000..2dcc76a2db2
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi
@@ -0,0 +1,20 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class PropagationConstants:
+ GRPC_PROPAGATE_DEADLINE = _GRPC_PROPAGATE_DEADLINE
+ GRPC_PROPAGATE_CENSUS_STATS_CONTEXT = _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT
+ GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT = _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT
+ GRPC_PROPAGATE_CANCELLATION = _GRPC_PROPAGATE_CANCELLATION
+ GRPC_PROPAGATE_DEFAULTS = _GRPC_PROPAGATE_DEFAULTS
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pxd.pxi
new file mode 100644
index 00000000000..35e1bdb0aeb
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -0,0 +1,34 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef bytes _slice_bytes(grpc_slice slice)
+cdef grpc_slice _copy_slice(grpc_slice slice) nogil
+cdef grpc_slice _slice_from_bytes(bytes value) nogil
+
+
+cdef class CallDetails:
+
+ cdef grpc_call_details c_details
+
+
+cdef class SslPemKeyCertPair:
+
+ cdef grpc_ssl_pem_key_cert_pair c_pair
+ cdef readonly object private_key, certificate_chain
+
+
+cdef class CompressionOptions:
+
+ cdef grpc_compression_options c_options
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pyx.pxi
new file mode 100644
index 00000000000..281cf8613f0
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -0,0 +1,201 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef bytes _slice_bytes(grpc_slice slice):
+ cdef void *start = grpc_slice_start_ptr(slice)
+ cdef size_t length = grpc_slice_length(slice)
+ return (<const char *>start)[:length]
+
+cdef grpc_slice _copy_slice(grpc_slice slice) nogil:
+ cdef void *start = grpc_slice_start_ptr(slice)
+ cdef size_t length = grpc_slice_length(slice)
+ return grpc_slice_from_copied_buffer(<const char *>start, length)
+
+cdef grpc_slice _slice_from_bytes(bytes value) nogil:
+ cdef const char *value_ptr
+ cdef size_t length
+ with gil:
+ value_ptr = <const char *>value
+ length = len(value)
+ return grpc_slice_from_copied_buffer(value_ptr, length)
+
+
+class ConnectivityState:
+ idle = GRPC_CHANNEL_IDLE
+ connecting = GRPC_CHANNEL_CONNECTING
+ ready = GRPC_CHANNEL_READY
+ transient_failure = GRPC_CHANNEL_TRANSIENT_FAILURE
+ shutdown = GRPC_CHANNEL_SHUTDOWN
+
+
+class ChannelArgKey:
+ enable_census = GRPC_ARG_ENABLE_CENSUS
+ max_concurrent_streams = GRPC_ARG_MAX_CONCURRENT_STREAMS
+ max_receive_message_length = GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
+ max_send_message_length = GRPC_ARG_MAX_SEND_MESSAGE_LENGTH
+ http2_initial_sequence_number = GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
+ default_authority = GRPC_ARG_DEFAULT_AUTHORITY
+ primary_user_agent_string = GRPC_ARG_PRIMARY_USER_AGENT_STRING
+ secondary_user_agent_string = GRPC_ARG_SECONDARY_USER_AGENT_STRING
+ ssl_session_cache = GRPC_SSL_SESSION_CACHE_ARG
+ ssl_target_name_override = GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
+
+
+class WriteFlag:
+ buffer_hint = GRPC_WRITE_BUFFER_HINT
+ no_compress = GRPC_WRITE_NO_COMPRESS
+
+
+class StatusCode:
+ ok = GRPC_STATUS_OK
+ cancelled = GRPC_STATUS_CANCELLED
+ unknown = GRPC_STATUS_UNKNOWN
+ invalid_argument = GRPC_STATUS_INVALID_ARGUMENT
+ deadline_exceeded = GRPC_STATUS_DEADLINE_EXCEEDED
+ not_found = GRPC_STATUS_NOT_FOUND
+ already_exists = GRPC_STATUS_ALREADY_EXISTS
+ permission_denied = GRPC_STATUS_PERMISSION_DENIED
+ unauthenticated = GRPC_STATUS_UNAUTHENTICATED
+ resource_exhausted = GRPC_STATUS_RESOURCE_EXHAUSTED
+ failed_precondition = GRPC_STATUS_FAILED_PRECONDITION
+ aborted = GRPC_STATUS_ABORTED
+ out_of_range = GRPC_STATUS_OUT_OF_RANGE
+ unimplemented = GRPC_STATUS_UNIMPLEMENTED
+ internal = GRPC_STATUS_INTERNAL
+ unavailable = GRPC_STATUS_UNAVAILABLE
+ data_loss = GRPC_STATUS_DATA_LOSS
+
+
+class CallError:
+ ok = GRPC_CALL_OK
+ error = GRPC_CALL_ERROR
+ not_on_server = GRPC_CALL_ERROR_NOT_ON_SERVER
+ not_on_client = GRPC_CALL_ERROR_NOT_ON_CLIENT
+ already_accepted = GRPC_CALL_ERROR_ALREADY_ACCEPTED
+ already_invoked = GRPC_CALL_ERROR_ALREADY_INVOKED
+ not_invoked = GRPC_CALL_ERROR_NOT_INVOKED
+ already_finished = GRPC_CALL_ERROR_ALREADY_FINISHED
+ too_many_operations = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS
+ invalid_flags = GRPC_CALL_ERROR_INVALID_FLAGS
+ invalid_metadata = GRPC_CALL_ERROR_INVALID_METADATA
+
+
+class CompletionType:
+ queue_shutdown = GRPC_QUEUE_SHUTDOWN
+ queue_timeout = GRPC_QUEUE_TIMEOUT
+ operation_complete = GRPC_OP_COMPLETE
+
+
+class OperationType:
+ send_initial_metadata = GRPC_OP_SEND_INITIAL_METADATA
+ send_message = GRPC_OP_SEND_MESSAGE
+ send_close_from_client = GRPC_OP_SEND_CLOSE_FROM_CLIENT
+ send_status_from_server = GRPC_OP_SEND_STATUS_FROM_SERVER
+ receive_initial_metadata = GRPC_OP_RECV_INITIAL_METADATA
+ receive_message = GRPC_OP_RECV_MESSAGE
+ receive_status_on_client = GRPC_OP_RECV_STATUS_ON_CLIENT
+ receive_close_on_server = GRPC_OP_RECV_CLOSE_ON_SERVER
+
+GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM= (
+ _GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)
+
+GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY = (
+ _GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY)
+
+class CompressionAlgorithm:
+ none = GRPC_COMPRESS_NONE
+ deflate = GRPC_COMPRESS_DEFLATE
+ gzip = GRPC_COMPRESS_GZIP
+
+
+class CompressionLevel:
+ none = GRPC_COMPRESS_LEVEL_NONE
+ low = GRPC_COMPRESS_LEVEL_LOW
+ medium = GRPC_COMPRESS_LEVEL_MED
+ high = GRPC_COMPRESS_LEVEL_HIGH
+
+
+cdef class CallDetails:
+
+ def __cinit__(self):
+ fork_handlers_and_grpc_init()
+ with nogil:
+ grpc_call_details_init(&self.c_details)
+
+ def __dealloc__(self):
+ with nogil:
+ grpc_call_details_destroy(&self.c_details)
+ grpc_shutdown()
+
+ @property
+ def method(self):
+ return _slice_bytes(self.c_details.method)
+
+ @property
+ def host(self):
+ return _slice_bytes(self.c_details.host)
+
+ @property
+ def deadline(self):
+ return _time_from_timespec(self.c_details.deadline)
+
+
+cdef class SslPemKeyCertPair:
+
+ def __cinit__(self, bytes private_key, bytes certificate_chain):
+ self.private_key = private_key
+ self.certificate_chain = certificate_chain
+ self.c_pair.private_key = self.private_key
+ self.c_pair.certificate_chain = self.certificate_chain
+
+
+cdef class CompressionOptions:
+
+ def __cinit__(self):
+ with nogil:
+ grpc_compression_options_init(&self.c_options)
+
+ def enable_algorithm(self, grpc_compression_algorithm algorithm):
+ with nogil:
+ grpc_compression_options_enable_algorithm(&self.c_options, algorithm)
+
+ def disable_algorithm(self, grpc_compression_algorithm algorithm):
+ with nogil:
+ grpc_compression_options_disable_algorithm(&self.c_options, algorithm)
+
+ def is_algorithm_enabled(self, grpc_compression_algorithm algorithm):
+ cdef int result
+ with nogil:
+ result = grpc_compression_options_is_algorithm_enabled(
+ &self.c_options, algorithm)
+ return result
+
+ def to_channel_arg(self):
+ return (
+ GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
+ self.c_options.enabled_algorithms_bitset,
+ )
+
+
+def compression_algorithm_name(grpc_compression_algorithm algorithm):
+ cdef const char* name
+ with nogil:
+ grpc_compression_algorithm_name(algorithm, &name)
+ # Let Cython do the right thing with string casting
+ return name
+
+
+def reset_grpc_config_vars():
+ ConfigVars.Reset()
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pxd.pxi
new file mode 100644
index 00000000000..e6e79536bbe
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pxd.pxi
@@ -0,0 +1,17 @@
+# Copyright 2016 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef grpc_ssl_roots_override_result ssl_roots_override_callback(
+ char **pem_root_certs) nogil
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pyx.pxi
new file mode 100644
index 00000000000..9cc3fd5a211
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/security.pyx.pxi
@@ -0,0 +1,85 @@
+# Copyright 2016 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from libc.string cimport memcpy
+
+cdef grpc_ssl_roots_override_result ssl_roots_override_callback(
+ char **pem_root_certs) nogil:
+ with gil:
+ temporary_pem_root_certs = ''
+ pem_root_certs[0] = <char *>gpr_malloc(len(temporary_pem_root_certs) + 1)
+ memcpy(
+ pem_root_certs[0], <char *>temporary_pem_root_certs,
+ len(temporary_pem_root_certs))
+ pem_root_certs[0][len(temporary_pem_root_certs)] = '\0'
+
+ return GRPC_SSL_ROOTS_OVERRIDE_OK
+
+
+def peer_identities(Call call):
+ cdef grpc_auth_context* auth_context
+ cdef grpc_auth_property_iterator properties
+ cdef const grpc_auth_property* property
+
+ auth_context = grpc_call_auth_context(call.c_call)
+ if auth_context == NULL:
+ return None
+ properties = grpc_auth_context_peer_identity(auth_context)
+ identities = []
+ while True:
+ property = grpc_auth_property_iterator_next(&properties)
+ if property == NULL:
+ break
+ if property.value != NULL:
+ identities.append(<bytes>(property.value))
+ grpc_auth_context_release(auth_context)
+ return identities if identities else None
+
+def peer_identity_key(Call call):
+ cdef grpc_auth_context* auth_context
+ cdef const char* c_key
+ auth_context = grpc_call_auth_context(call.c_call)
+ if auth_context == NULL:
+ return None
+ c_key = grpc_auth_context_peer_identity_property_name(auth_context)
+ if c_key == NULL:
+ key = None
+ else:
+ key = <bytes> grpc_auth_context_peer_identity_property_name(auth_context)
+ grpc_auth_context_release(auth_context)
+ return key
+
+def auth_context(Call call):
+ cdef grpc_auth_context* auth_context
+ cdef grpc_auth_property_iterator properties
+ cdef const grpc_auth_property* property
+
+ auth_context = grpc_call_auth_context(call.c_call)
+ if auth_context == NULL:
+ return {}
+ properties = grpc_auth_context_property_iterator(auth_context)
+ py_auth_context = {}
+ while True:
+ property = grpc_auth_property_iterator_next(&properties)
+ if property == NULL:
+ break
+ if property.name != NULL and property.value != NULL:
+ key = <bytes> property.name
+ if key in py_auth_context:
+ py_auth_context[key].append(<bytes>(property.value))
+ else:
+ py_auth_context[key] = [<bytes> property.value]
+ grpc_auth_context_release(auth_context)
+ return py_auth_context
+
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pxd.pxi
new file mode 100644
index 00000000000..b89ed99d97b
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pxd.pxi
@@ -0,0 +1,29 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef class Server:
+
+ cdef grpc_server *c_server
+
+ cdef bint is_started # start has been called
+ cdef bint is_shutting_down # shutdown has been called
+ cdef bint is_shutdown # notification of complete shutdown received
+ # used at dealloc when user forgets to shutdown
+ cdef CompletionQueue backup_shutdown_queue
+ # TODO(https://github.com/grpc/grpc/issues/15662): Elide this.
+ cdef list references
+ cdef list registered_completion_queues
+
+ cdef _c_shutdown(self, CompletionQueue queue, tag)
+ cdef notify_shutdown_complete(self)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pyx.pxi
new file mode 100644
index 00000000000..29dabec61d9
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -0,0 +1,165 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Server:
+
+ def __cinit__(self, object arguments, bint xds):
+ fork_handlers_and_grpc_init()
+ self.references = []
+ self.registered_completion_queues = []
+ self.is_started = False
+ self.is_shutting_down = False
+ self.is_shutdown = False
+ self.c_server = NULL
+ cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
+ self.c_server = grpc_server_create(channel_args.c_args(), NULL)
+ cdef grpc_server_xds_status_notifier notifier
+ notifier.on_serving_status_update = NULL
+ notifier.user_data = NULL
+ if xds:
+ grpc_server_set_config_fetcher(self.c_server,
+ grpc_server_config_fetcher_xds_create(notifier, channel_args.c_args()))
+ self.references.append(arguments)
+
+ def request_call(
+ self, CompletionQueue call_queue not None,
+ CompletionQueue server_queue not None, tag):
+ if not self.is_started or self.is_shutting_down:
+ raise ValueError("server must be started and not shutting down")
+ if server_queue not in self.registered_completion_queues:
+ raise ValueError("server_queue must be a registered completion queue")
+ cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
+ request_call_tag.prepare()
+ cpython.Py_INCREF(request_call_tag)
+ return grpc_server_request_call(
+ self.c_server, &request_call_tag.call.c_call,
+ &request_call_tag.call_details.c_details,
+ &request_call_tag.c_invocation_metadata,
+ call_queue.c_completion_queue, server_queue.c_completion_queue,
+ <cpython.PyObject *>request_call_tag)
+
+ def register_completion_queue(
+ self, CompletionQueue queue not None):
+ if self.is_started:
+ raise ValueError("cannot register completion queues after start")
+ with nogil:
+ grpc_server_register_completion_queue(
+ self.c_server, queue.c_completion_queue, NULL)
+ self.registered_completion_queues.append(queue)
+
+ def start(self, backup_queue=True):
+ """Start the Cython gRPC Server.
+
+ Args:
+ backup_queue: a bool indicates whether to spawn a backup completion
+ queue. In the case that no CQ is bound to the server, and the shutdown
+ of server becomes un-observable.
+ """
+ if self.is_started:
+ raise ValueError("the server has already started")
+ if backup_queue:
+ self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True)
+ self.register_completion_queue(self.backup_shutdown_queue)
+ self.is_started = True
+ with nogil:
+ grpc_server_start(self.c_server)
+ if backup_queue:
+ # Ensure the core has gotten a chance to do the start-up work
+ self.backup_shutdown_queue.poll(deadline=time.time())
+
+ def add_http2_port(self, bytes address,
+ ServerCredentials server_credentials=None):
+ address = str_to_bytes(address)
+ self.references.append(address)
+ cdef int result
+ cdef char *address_c_string = address
+ if server_credentials is not None:
+ self.references.append(server_credentials)
+ with nogil:
+ result = grpc_server_add_http2_port(
+ self.c_server, address_c_string, server_credentials.c_credentials)
+ else:
+ with nogil:
+ creds = grpc_insecure_server_credentials_create()
+ result = grpc_server_add_http2_port(self.c_server,
+ address_c_string, creds)
+ grpc_server_credentials_release(creds)
+ return result
+
+ cdef _c_shutdown(self, CompletionQueue queue, tag):
+ self.is_shutting_down = True
+ cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
+ cpython.Py_INCREF(server_shutdown_tag)
+ with nogil:
+ grpc_server_shutdown_and_notify(
+ self.c_server, queue.c_completion_queue,
+ <cpython.PyObject *>server_shutdown_tag)
+
+ def shutdown(self, CompletionQueue queue not None, tag):
+ if queue.is_shutting_down:
+ raise ValueError("queue must be live")
+ elif not self.is_started:
+ raise ValueError("the server hasn't started yet")
+ elif self.is_shutting_down:
+ return
+ elif queue not in self.registered_completion_queues:
+ raise ValueError("expected registered completion queue")
+ else:
+ self._c_shutdown(queue, tag)
+
+ cdef notify_shutdown_complete(self):
+ # called only after our server shutdown tag has emerged from a completion
+ # queue.
+ self.is_shutdown = True
+
+ def cancel_all_calls(self):
+ if not self.is_shutting_down:
+ raise UsageError("the server must be shutting down to cancel all calls")
+ elif self.is_shutdown:
+ return
+ else:
+ with nogil:
+ grpc_server_cancel_all_calls(self.c_server)
+
+ # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any,
+ # portion of this is safe to call from __dealloc__, and potentially remove
+ # backup_shutdown_queue.
+ def destroy(self):
+ if self.c_server != NULL:
+ if not self.is_started:
+ pass
+ elif self.is_shutdown:
+ pass
+ elif not self.is_shutting_down:
+ if self.backup_shutdown_queue is None:
+ raise InternalError('Server shutdown failed: no completion queue.')
+ else:
+ # the user didn't call shutdown - use our backup queue
+ self._c_shutdown(self.backup_shutdown_queue, None)
+ # and now we wait
+ while not self.is_shutdown:
+ self.backup_shutdown_queue.poll()
+ else:
+ # We're in the process of shutting down, but have not shutdown; can't do
+ # much but repeatedly release the GIL and wait
+ while not self.is_shutdown:
+ time.sleep(0)
+ with nogil:
+ grpc_server_destroy(self.c_server)
+ self.c_server = NULL
+
+ def __dealloc__(self):
+ if self.c_server == NULL:
+ grpc_shutdown()
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pxd.pxi
new file mode 100644
index 00000000000..7af169fa3f9
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pxd.pxi
@@ -0,0 +1,58 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+ cdef BaseEvent event(self, grpc_event c_event)
+
+
+cdef class _ConnectivityTag(_Tag):
+
+ cdef readonly object _user_tag
+
+ cdef ConnectivityEvent event(self, grpc_event c_event)
+
+
+cdef class _RequestCallTag(_Tag):
+
+ cdef readonly object _user_tag
+ cdef Call call
+ cdef CallDetails call_details
+ cdef grpc_metadata_array c_invocation_metadata
+
+ cdef void prepare(self) except *
+ cdef RequestCallEvent event(self, grpc_event c_event)
+
+
+cdef class _BatchOperationTag(_Tag):
+
+ cdef object _user_tag
+ cdef readonly object _operations
+ cdef readonly object _retained_call
+ cdef grpc_op *c_ops
+ cdef size_t c_nops
+
+ cdef void prepare(self) except *
+ cdef BatchOperationEvent event(self, grpc_event c_event)
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+ cdef readonly object _user_tag
+ # This allows CompletionQueue to notify the Python Server object that the
+ # underlying GRPC core server has shutdown
+ cdef readonly Server _shutting_down_server
+
+ cdef ServerShutdownEvent event(self, grpc_event c_event)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pyx.pxi
new file mode 100644
index 00000000000..7e62ff63897
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/tag.pyx.pxi
@@ -0,0 +1,88 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+ cdef BaseEvent event(self, grpc_event c_event):
+ raise NotImplementedError()
+
+
+cdef class _ConnectivityTag(_Tag):
+
+ def __cinit__(self, user_tag):
+ self._user_tag = user_tag
+
+ cdef ConnectivityEvent event(self, grpc_event c_event):
+ return ConnectivityEvent(c_event.type, c_event.success, self._user_tag)
+
+
+cdef class _RequestCallTag(_Tag):
+
+ def __cinit__(self, user_tag):
+ self._user_tag = user_tag
+ self.call = None
+ self.call_details = None
+
+ cdef void prepare(self) except *:
+ self.call = Call()
+ self.call_details = CallDetails()
+ grpc_metadata_array_init(&self.c_invocation_metadata)
+
+ cdef RequestCallEvent event(self, grpc_event c_event):
+ cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata)
+ grpc_metadata_array_destroy(&self.c_invocation_metadata)
+ return RequestCallEvent(
+ c_event.type, c_event.success, self._user_tag, self.call,
+ self.call_details, invocation_metadata)
+
+
+cdef class _BatchOperationTag:
+
+ def __cinit__(self, user_tag, operations, call):
+ self._user_tag = user_tag
+ self._operations = operations
+ self._retained_call = call
+
+ cdef void prepare(self) except *:
+ cdef Operation operation
+ self.c_nops = 0 if self._operations is None else len(self._operations)
+ if 0 < self.c_nops:
+ self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
+ for index, operation in enumerate(self._operations):
+ operation.c()
+ self.c_ops[index] = operation.c_op
+
+ cdef BatchOperationEvent event(self, grpc_event c_event):
+ cdef Operation operation
+ if 0 < self.c_nops:
+ for operation in self._operations:
+ operation.un_c()
+ gpr_free(self.c_ops)
+ return BatchOperationEvent(
+ c_event.type, c_event.success, self._user_tag, self._operations)
+ else:
+ return BatchOperationEvent(
+ c_event.type, c_event.success, self._user_tag, ())
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+ def __cinit__(self, user_tag, shutting_down_server):
+ self._user_tag = user_tag
+ self._shutting_down_server = shutting_down_server
+
+ cdef ServerShutdownEvent event(self, grpc_event c_event):
+ self._shutting_down_server.notify_shutdown_complete()
+ return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag)
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/thread.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/thread.pyx.pxi
new file mode 100644
index 00000000000..be4cb8b9a8e
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/thread.pyx.pxi
@@ -0,0 +1,59 @@
+# Copyright 2020 The gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+def _contextvars_supported():
+ """Determines if the contextvars module is supported.
+
+ We use a 'try it and see if it works approach' here rather than predicting
+ based on interpreter version in order to support older interpreters that
+ may have a backported module based on, e.g. `threading.local`.
+
+ Returns:
+ A bool indicating whether `contextvars` are supported in the current
+ environment.
+ """
+ try:
+ import contextvars
+ return True
+ except ImportError:
+ return False
+
+
+def _run_with_context(target):
+ """Runs a callable with contextvars propagated.
+
+ If contextvars are supported, the calling thread's context will be copied
+ and propagated. If they are not supported, this function is equivalent
+ to the identity function.
+
+ Args:
+ target: A callable object to wrap.
+ Returns:
+ A callable object with the same signature as `target` but with
+ contextvars propagated.
+ """
+
+
+if _contextvars_supported():
+ import contextvars
+ def _run_with_context(target):
+ ctx = contextvars.copy_context()
+ def _run(*args):
+ ctx.run(target, *args)
+ return _run
+else:
+ def _run_with_context(target):
+ def _run(*args):
+ target(*args)
+ return _run
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pxd.pxi
new file mode 100644
index 00000000000..c46e8a98b04
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pxd.pxi
@@ -0,0 +1,19 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef gpr_timespec _timespec_from_time(object time) except *
+
+
+cdef double _time_from_timespec(gpr_timespec timespec) except *
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pyx.pxi
new file mode 100644
index 00000000000..6d181bb1d60
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/time.pyx.pxi
@@ -0,0 +1,29 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef gpr_timespec _timespec_from_time(object time) except *:
+ if time is None:
+ return gpr_inf_future(GPR_CLOCK_REALTIME)
+ else:
+ return gpr_time_from_nanos(
+ <int64_t>(<double>time * GPR_NS_PER_SEC),
+ GPR_CLOCK_REALTIME,
+ )
+
+
+cdef double _time_from_timespec(gpr_timespec timespec) except *:
+ cdef gpr_timespec real_timespec = gpr_convert_clock_type(
+ timespec, GPR_CLOCK_REALTIME)
+ return gpr_timespec_to_micros(real_timespec) / GPR_US_PER_SEC
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pxd.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pxd.pxi
new file mode 100644
index 00000000000..c96e5cb6696
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pxd.pxi
@@ -0,0 +1,23 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef void* _copy_pointer(void* pointer)
+
+cdef void _destroy_pointer(void* pointer)
+
+cdef int _compare_pointer(void* first_pointer, void* second_pointer)
+
+
+cdef grpc_arg_pointer_vtable default_vtable
diff --git a/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi
new file mode 100644
index 00000000000..da4b81bd97e
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/_cygrpc/vtable.pyx.pxi
@@ -0,0 +1,36 @@
+# Copyright 2019 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# TODO(https://github.com/grpc/grpc/issues/15662): Reform this.
+cdef void* _copy_pointer(void* pointer):
+ return pointer
+
+
+# TODO(https://github.com/grpc/grpc/issues/15662): Reform this.
+cdef void _destroy_pointer(void* pointer):
+ pass
+
+
+cdef int _compare_pointer(void* first_pointer, void* second_pointer):
+ if first_pointer < second_pointer:
+ return -1
+ elif first_pointer > second_pointer:
+ return 1
+ else:
+ return 0
+
+cdef grpc_arg_pointer_vtable default_vtable
+default_vtable.copy = &_copy_pointer
+default_vtable.destroy = &_destroy_pointer
+default_vtable.cmp = &_compare_pointer
diff --git a/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pxd b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pxd
new file mode 100644
index 00000000000..ed04119143a
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pxd
@@ -0,0 +1,50 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# distutils: language=c++
+
+cimport cpython
+
+include "_cygrpc/grpc.pxi"
+
+include "_cygrpc/arguments.pxd.pxi"
+include "_cygrpc/call.pxd.pxi"
+include "_cygrpc/channel.pxd.pxi"
+include "_cygrpc/credentials.pxd.pxi"
+include "_cygrpc/completion_queue.pxd.pxi"
+include "_cygrpc/event.pxd.pxi"
+include "_cygrpc/metadata.pxd.pxi"
+include "_cygrpc/operation.pxd.pxi"
+include "_cygrpc/propagation_bits.pxd.pxi"
+include "_cygrpc/records.pxd.pxi"
+include "_cygrpc/security.pxd.pxi"
+include "_cygrpc/server.pxd.pxi"
+include "_cygrpc/tag.pxd.pxi"
+include "_cygrpc/time.pxd.pxi"
+include "_cygrpc/vtable.pxd.pxi"
+include "_cygrpc/_hooks.pxd.pxi"
+
+
+include "_cygrpc/grpc_gevent.pxd.pxi"
+
+IF UNAME_SYSNAME != "Windows":
+ include "_cygrpc/fork_posix.pxd.pxi"
+
+# Following pxi files are part of the Aio module
+include "_cygrpc/aio/completion_queue.pxd.pxi"
+include "_cygrpc/aio/rpc_status.pxd.pxi"
+include "_cygrpc/aio/grpc_aio.pxd.pxi"
+include "_cygrpc/aio/callback_common.pxd.pxi"
+include "_cygrpc/aio/call.pxd.pxi"
+include "_cygrpc/aio/channel.pxd.pxi"
+include "_cygrpc/aio/server.pxd.pxi"
diff --git a/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx
new file mode 100644
index 00000000000..c7925676c34
--- /dev/null
+++ b/contrib/python/grpcio/py3/grpc/_cython/cygrpc.pyx
@@ -0,0 +1,94 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# distutils: language=c++
+
+cimport cpython
+
+import logging
+import os
+import sys
+import threading
+import time
+
+import grpc
+
+try:
+ import asyncio
+except ImportError:
+ # TODO(https://github.com/grpc/grpc/issues/19728) Improve how Aio Cython is
+ # distributed without breaking none compatible Python versions. For now, if
+ # Asyncio package is not available we just skip it.
+ pass
+
+# The only copy of Python logger for the Cython extension
+_LOGGER = logging.getLogger(__name__)
+
+# TODO(atash): figure out why the coverage tool gets confused about the Cython
+# coverage plugin when the following files don't have a '.pxi' suffix.
+include "_cygrpc/grpc_string.pyx.pxi"
+include "_cygrpc/arguments.pyx.pxi"
+include "_cygrpc/call.pyx.pxi"
+include "_cygrpc/channel.pyx.pxi"
+include "_cygrpc/channelz.pyx.pxi"
+include "_cygrpc/csds.pyx.pxi"
+include "_cygrpc/credentials.pyx.pxi"
+include "_cygrpc/completion_queue.pyx.pxi"
+include "_cygrpc/event.pyx.pxi"
+include "_cygrpc/metadata.pyx.pxi"
+include "_cygrpc/operation.pyx.pxi"
+include "_cygrpc/propagation_bits.pyx.pxi"
+include "_cygrpc/records.pyx.pxi"
+include "_cygrpc/security.pyx.pxi"
+include "_cygrpc/server.pyx.pxi"
+include "_cygrpc/tag.pyx.pxi"
+include "_cygrpc/time.pyx.pxi"
+include "_cygrpc/vtable.pyx.pxi"
+include "_cygrpc/_hooks.pyx.pxi"
+
+include "_cygrpc/grpc_gevent.pyx.pxi"
+
+include "_cygrpc/thread.pyx.pxi"
+
+IF UNAME_SYSNAME == "Windows":
+ include "_cygrpc/fork_windows.pyx.pxi"
+ELSE:
+ include "_cygrpc/fork_posix.pyx.pxi"
+
+# Following pxi files are part of the Aio module
+include "_cygrpc/aio/common.pyx.pxi"
+include "_cygrpc/aio/rpc_status.pyx.pxi"
+include "_cygrpc/aio/completion_queue.pyx.pxi"
+include "_cygrpc/aio/callback_common.pyx.pxi"
+include "_cygrpc/aio/grpc_aio.pyx.pxi"
+include "_cygrpc/aio/call.pyx.pxi"
+include "_cygrpc/aio/channel.pyx.pxi"
+include "_cygrpc/aio/server.pyx.pxi"
+
+
+#
+# initialize gRPC
+#
+cdef extern from "Python.h":
+
+ int PyEval_InitThreads()
+
+cdef _initialize():
+ # We have Python callbacks called by c-core threads, this ensures the GIL
+ # is initialized.
+ PyEval_InitThreads()
+ import ssl
+ grpc_dont_init_openssl()
+ # Load Arcadia certs in ComputePemRootCerts and do not override here.
+
+_initialize()