blob: 41d27df59481891c8335b0749e2e6f7bc2ae8888 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
from libc cimport string
from cython.operator cimport dereference
from cpython cimport Py_INCREF, Py_DECREF
import atexit
import errno
import sys
gevent_hub = None
g_gevent_pool = None
g_gevent_threadpool = None
g_gevent_activated = False
cdef queue[void*] g_greenlets_to_run
cdef condition_variable g_greenlets_cv
cdef mutex g_greenlets_mu
cdef bint g_shutdown_greenlets_to_run_queue = False
cdef int g_channel_count = 0
cdef _submit_to_greenlet_queue(object cb, tuple args):
cdef tuple to_call = (cb,) + args
cdef unique_lock[mutex]* lk
Py_INCREF(to_call)
with nogil:
lk = new unique_lock[mutex](g_greenlets_mu)
g_greenlets_to_run.push(<void*>(to_call))
del lk
g_greenlets_cv.notify_all()
cpdef void gevent_increment_channel_count():
global g_channel_count
cdef int old_channel_count
with nogil:
lk = new unique_lock[mutex](g_greenlets_mu)
old_channel_count = g_channel_count
g_channel_count += 1
del lk
if old_channel_count == 0:
run_spawn_greenlets()
cpdef void gevent_decrement_channel_count():
global g_channel_count
with nogil:
lk = new unique_lock[mutex](g_greenlets_mu)
g_channel_count -= 1
if g_channel_count == 0:
g_greenlets_cv.notify_all()
del lk
cdef object await_next_greenlet():
cdef unique_lock[mutex]* lk
with nogil:
# Cython doesn't allow us to do proper stack allocations, so we can't take
# advantage of RAII.
lk = new unique_lock[mutex](g_greenlets_mu)
while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0:
if not g_greenlets_to_run.empty():
break
g_greenlets_cv.wait(dereference(lk))
if g_channel_count == 0:
del lk
return None
if g_shutdown_greenlets_to_run_queue:
del lk
return None
cdef object to_call = <object>g_greenlets_to_run.front()
Py_DECREF(to_call)
g_greenlets_to_run.pop()
del lk
return to_call
def spawn_greenlets():
while True:
to_call = g_gevent_threadpool.apply(await_next_greenlet, ())
if to_call is None:
break
fn = to_call[0]
args = to_call[1:]
fn(*args)
def run_spawn_greenlets():
g_gevent_pool.spawn(spawn_greenlets)
def shutdown_await_next_greenlet():
global g_shutdown_greenlets_to_run_queue
cdef unique_lock[mutex]* lk
with nogil:
lk = new unique_lock[mutex](g_greenlets_mu)
g_shutdown_greenlets_to_run_queue = True
del lk
g_greenlets_cv.notify_all()
def init_grpc_gevent():
# Lazily import gevent
global gevent_hub
global g_gevent_threadpool
global g_gevent_activated
global g_interrupt_check_period_ms
global g_gevent_pool
import gevent
import gevent.pool
gevent_hub = gevent.hub
g_gevent_threadpool = gevent_hub.get_hub().threadpool
g_gevent_activated = True
g_interrupt_check_period_ms = 2000
g_gevent_pool = gevent.pool.Group()
set_async_callback_func(_submit_to_greenlet_queue)
# TODO: Document how this all works.
atexit.register(shutdown_await_next_greenlet)
|