1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
class _WatchConnectivityFailed(Exception):
"""Dedicated exception class for watch connectivity failed.
It might be failed due to deadline exceeded.
"""
cdef CallbackFailureHandler _WATCH_CONNECTIVITY_FAILURE_HANDLER = CallbackFailureHandler(
'watch_connectivity_state',
'Timed out',
_WatchConnectivityFailed)
cdef class AioChannel:
def __cinit__(self, bytes target, tuple options, ChannelCredentials credentials, object loop):
init_grpc_aio()
if options is None:
options = ()
cdef _ChannelArgs channel_args = _ChannelArgs(options)
self._target = target
self.loop = loop
self._status = AIO_CHANNEL_STATUS_READY
if credentials is None:
self._is_secure = False
creds = grpc_insecure_credentials_create();
self.channel = grpc_channel_create(<char *>target,
creds,
channel_args.c_args())
grpc_channel_credentials_release(creds)
else:
self._is_secure = True
creds = <grpc_channel_credentials *> credentials.c()
self.channel = grpc_channel_create(<char *>target,
creds,
channel_args.c_args())
grpc_channel_credentials_release(creds)
def __dealloc__(self):
shutdown_grpc_aio()
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_}>"
def check_connectivity_state(self, bint try_to_connect):
"""A Cython wrapper for Core's check connectivity state API."""
if self._status == AIO_CHANNEL_STATUS_DESTROYED:
return ConnectivityState.shutdown
else:
return grpc_channel_check_connectivity_state(
self.channel,
try_to_connect,
)
async def watch_connectivity_state(self,
grpc_connectivity_state last_observed_state,
object deadline):
"""Watch for one connectivity state change.
Keeps mirroring the behavior from Core, so we can easily switch to
other design of API if necessary.
"""
if self._status in (AIO_CHANNEL_STATUS_DESTROYED, AIO_CHANNEL_STATUS_CLOSING):
raise UsageError('Channel is closed.')
cdef gpr_timespec c_deadline = _timespec_from_time(deadline)
cdef object future = self.loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(
future,
self.loop,
_WATCH_CONNECTIVITY_FAILURE_HANDLER)
grpc_channel_watch_connectivity_state(
self.channel,
last_observed_state,
c_deadline,
global_completion_queue(),
wrapper.c_functor())
try:
await future
except _WatchConnectivityFailed:
return False
else:
return True
def closing(self):
self._status = AIO_CHANNEL_STATUS_CLOSING
def close(self):
self._status = AIO_CHANNEL_STATUS_DESTROYED
grpc_channel_destroy(self.channel)
def closed(self):
return self._status in (AIO_CHANNEL_STATUS_CLOSING, AIO_CHANNEL_STATUS_DESTROYED)
def call(self,
bytes method,
object deadline,
object python_call_credentials,
object wait_for_ready):
"""Assembles a Cython Call object.
Returns:
An _AioCall object.
"""
if self.closed():
raise UsageError('Channel is closed.')
cdef CallCredentials cython_call_credentials
if python_call_credentials is not None:
if not self._is_secure:
raise UsageError("Call credentials are only valid on secure channels")
cython_call_credentials = python_call_credentials._credentials
else:
cython_call_credentials = None
return _AioCall(self, deadline, method, cython_call_credentials, wait_for_ready)
|