aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/grpcio/py2/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
blob: 41d27df59481891c8335b0749e2e6f7bc2ae8888 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++

from libc cimport string
from cython.operator cimport dereference

from cpython cimport Py_INCREF, Py_DECREF

import atexit
import errno
import sys

gevent_hub = None
g_gevent_pool = None
g_gevent_threadpool = None
g_gevent_activated = False


cdef queue[void*] g_greenlets_to_run
cdef condition_variable g_greenlets_cv
cdef mutex g_greenlets_mu
cdef bint g_shutdown_greenlets_to_run_queue = False
cdef int g_channel_count = 0


cdef _submit_to_greenlet_queue(object cb, tuple args):
  cdef tuple to_call = (cb,) + args
  cdef unique_lock[mutex]* lk
  Py_INCREF(to_call)
  with nogil:
    lk = new unique_lock[mutex](g_greenlets_mu)
    g_greenlets_to_run.push(<void*>(to_call))
    del lk
    g_greenlets_cv.notify_all()


cpdef void gevent_increment_channel_count():
  global g_channel_count
  cdef int old_channel_count
  with nogil:
    lk = new unique_lock[mutex](g_greenlets_mu)
    old_channel_count = g_channel_count
    g_channel_count += 1
    del lk
  if old_channel_count == 0:
    run_spawn_greenlets()


cpdef void gevent_decrement_channel_count():
  global g_channel_count
  with nogil:
    lk = new unique_lock[mutex](g_greenlets_mu)
    g_channel_count -= 1
    if g_channel_count == 0:
      g_greenlets_cv.notify_all()
    del lk


cdef object await_next_greenlet():
  cdef unique_lock[mutex]* lk
  with nogil:
    # Cython doesn't allow us to do proper stack allocations, so we can't take
    # advantage of RAII.
    lk = new unique_lock[mutex](g_greenlets_mu)
    while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0:
      if not g_greenlets_to_run.empty():
        break
      g_greenlets_cv.wait(dereference(lk))
  if g_channel_count == 0:
    del lk
    return None
  if g_shutdown_greenlets_to_run_queue:
    del lk
    return None
  cdef object to_call = <object>g_greenlets_to_run.front()
  Py_DECREF(to_call)
  g_greenlets_to_run.pop()
  del lk
  return to_call

def spawn_greenlets():
  while True:
    to_call = g_gevent_threadpool.apply(await_next_greenlet, ())
    if to_call is None:
      break
    fn = to_call[0]
    args = to_call[1:]
    fn(*args)

def run_spawn_greenlets():
  g_gevent_pool.spawn(spawn_greenlets)

def shutdown_await_next_greenlet():
  global g_shutdown_greenlets_to_run_queue
  cdef unique_lock[mutex]* lk
  with nogil:
    lk = new unique_lock[mutex](g_greenlets_mu)
    g_shutdown_greenlets_to_run_queue = True
  del lk
  g_greenlets_cv.notify_all()

def init_grpc_gevent():
  # Lazily import gevent
  global gevent_hub
  global g_gevent_threadpool
  global g_gevent_activated
  global g_interrupt_check_period_ms
  global g_gevent_pool

  import gevent
  import gevent.pool

  gevent_hub = gevent.hub
  g_gevent_threadpool = gevent_hub.get_hub().threadpool

  g_gevent_activated = True
  g_interrupt_check_period_ms = 2000

  g_gevent_pool = gevent.pool.Group()


  set_async_callback_func(_submit_to_greenlet_queue)

  # TODO: Document how this all works.
  atexit.register(shutdown_await_next_greenlet)