# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++

from libc cimport string
from cython.operator cimport dereference

from cpython cimport Py_INCREF, Py_DECREF

import atexit
import errno
import sys

gevent_hub = None
g_gevent_pool = None
g_gevent_threadpool = None
g_gevent_activated = False


cdef queue[void*] g_greenlets_to_run
cdef condition_variable g_greenlets_cv
cdef mutex g_greenlets_mu
cdef bint g_shutdown_greenlets_to_run_queue = False
cdef int g_channel_count = 0


cdef _submit_to_greenlet_queue(object cb, tuple args):
  cdef tuple to_call = (cb,) + args
  cdef unique_lock[mutex]* lk
  Py_INCREF(to_call)
  with nogil:
    lk = new unique_lock[mutex](g_greenlets_mu)
    g_greenlets_to_run.push(<void*>(to_call))
    del lk
    g_greenlets_cv.notify_all()


cpdef void gevent_increment_channel_count():
  global g_channel_count
  cdef int old_channel_count
  with nogil:
    lk = new unique_lock[mutex](g_greenlets_mu)
    old_channel_count = g_channel_count
    g_channel_count += 1
    del lk
  if old_channel_count == 0:
    run_spawn_greenlets()


cpdef void gevent_decrement_channel_count():
  global g_channel_count
  with nogil:
    lk = new unique_lock[mutex](g_greenlets_mu)
    g_channel_count -= 1
    if g_channel_count == 0:
      g_greenlets_cv.notify_all()
    del lk


cdef object await_next_greenlet():
  cdef unique_lock[mutex]* lk
  with nogil:
    # Cython doesn't allow us to do proper stack allocations, so we can't take
    # advantage of RAII.
    lk = new unique_lock[mutex](g_greenlets_mu)
    while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0:
      if not g_greenlets_to_run.empty():
        break
      g_greenlets_cv.wait(dereference(lk))
  if g_channel_count == 0:
    del lk
    return None
  if g_shutdown_greenlets_to_run_queue:
    del lk
    return None
  cdef object to_call = <object>g_greenlets_to_run.front()
  Py_DECREF(to_call)
  g_greenlets_to_run.pop()
  del lk
  return to_call

def spawn_greenlets():
  while True:
    to_call = g_gevent_threadpool.apply(await_next_greenlet, ())
    if to_call is None:
      break
    fn = to_call[0]
    args = to_call[1:]
    fn(*args)

def run_spawn_greenlets():
  g_gevent_pool.spawn(spawn_greenlets)

def shutdown_await_next_greenlet():
  global g_shutdown_greenlets_to_run_queue
  cdef unique_lock[mutex]* lk
  with nogil:
    lk = new unique_lock[mutex](g_greenlets_mu)
    g_shutdown_greenlets_to_run_queue = True
  del lk
  g_greenlets_cv.notify_all()

def init_grpc_gevent():
  # Lazily import gevent
  global gevent_hub
  global g_gevent_threadpool
  global g_gevent_activated
  global g_interrupt_check_period_ms
  global g_gevent_pool

  import gevent
  import gevent.pool

  gevent_hub = gevent.hub
  g_gevent_threadpool = gevent_hub.get_hub().threadpool

  g_gevent_activated = True
  g_interrupt_check_period_ms = 2000

  g_gevent_pool = gevent.pool.Group()


  set_async_callback_func(_submit_to_greenlet_queue)

  # TODO: Document how this all works.
  atexit.register(shutdown_await_next_greenlet)