aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c')
-rw-r--r--contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c2656
1 files changed, 2656 insertions, 0 deletions
diff --git a/contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c b/contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c
new file mode 100644
index 0000000000..314059d108
--- /dev/null
+++ b/contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c
@@ -0,0 +1,2656 @@
+
+/* interpreters module */
+/* low-level access to interpreter primitives */
+
+#include "Python.h"
+#include "frameobject.h"
+#include "interpreteridobject.h"
+
+
+static char *
+_copy_raw_string(PyObject *strobj)
+{
+ const char *str = PyUnicode_AsUTF8(strobj);
+ if (str == NULL) {
+ return NULL;
+ }
+ char *copied = PyMem_Malloc(strlen(str)+1);
+ if (copied == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ strcpy(copied, str);
+ return copied;
+}
+
+static PyInterpreterState *
+_get_current(void)
+{
+ // PyInterpreterState_Get() aborts if lookup fails, so don't need
+ // to check the result for NULL.
+ return PyInterpreterState_Get();
+}
+
+
+/* data-sharing-specific code ***********************************************/
+
+struct _sharednsitem {
+ char *name;
+ _PyCrossInterpreterData data;
+};
+
+static void _sharednsitem_clear(struct _sharednsitem *); // forward
+
+static int
+_sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value)
+{
+ item->name = _copy_raw_string(key);
+ if (item->name == NULL) {
+ return -1;
+ }
+ if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
+ _sharednsitem_clear(item);
+ return -1;
+ }
+ return 0;
+}
+
+static void
+_sharednsitem_clear(struct _sharednsitem *item)
+{
+ if (item->name != NULL) {
+ PyMem_Free(item->name);
+ item->name = NULL;
+ }
+ _PyCrossInterpreterData_Release(&item->data);
+}
+
+static int
+_sharednsitem_apply(struct _sharednsitem *item, PyObject *ns)
+{
+ PyObject *name = PyUnicode_FromString(item->name);
+ if (name == NULL) {
+ return -1;
+ }
+ PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
+ if (value == NULL) {
+ Py_DECREF(name);
+ return -1;
+ }
+ int res = PyDict_SetItem(ns, name, value);
+ Py_DECREF(name);
+ Py_DECREF(value);
+ return res;
+}
+
+typedef struct _sharedns {
+ Py_ssize_t len;
+ struct _sharednsitem* items;
+} _sharedns;
+
+static _sharedns *
+_sharedns_new(Py_ssize_t len)
+{
+ _sharedns *shared = PyMem_NEW(_sharedns, 1);
+ if (shared == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ shared->len = len;
+ shared->items = PyMem_NEW(struct _sharednsitem, len);
+ if (shared->items == NULL) {
+ PyErr_NoMemory();
+ PyMem_Free(shared);
+ return NULL;
+ }
+ return shared;
+}
+
+static void
+_sharedns_free(_sharedns *shared)
+{
+ for (Py_ssize_t i=0; i < shared->len; i++) {
+ _sharednsitem_clear(&shared->items[i]);
+ }
+ PyMem_Free(shared->items);
+ PyMem_Free(shared);
+}
+
+static _sharedns *
+_get_shared_ns(PyObject *shareable)
+{
+ if (shareable == NULL || shareable == Py_None) {
+ return NULL;
+ }
+ Py_ssize_t len = PyDict_Size(shareable);
+ if (len == 0) {
+ return NULL;
+ }
+
+ _sharedns *shared = _sharedns_new(len);
+ if (shared == NULL) {
+ return NULL;
+ }
+ Py_ssize_t pos = 0;
+ for (Py_ssize_t i=0; i < len; i++) {
+ PyObject *key, *value;
+ if (PyDict_Next(shareable, &pos, &key, &value) == 0) {
+ break;
+ }
+ if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
+ break;
+ }
+ }
+ if (PyErr_Occurred()) {
+ _sharedns_free(shared);
+ return NULL;
+ }
+ return shared;
+}
+
+static int
+_sharedns_apply(_sharedns *shared, PyObject *ns)
+{
+ for (Py_ssize_t i=0; i < shared->len; i++) {
+ if (_sharednsitem_apply(&shared->items[i], ns) != 0) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+// Ultimately we'd like to preserve enough information about the
+// exception and traceback that we could re-constitute (or at least
+// simulate, a la traceback.TracebackException), and even chain, a copy
+// of the exception in the calling interpreter.
+
+typedef struct _sharedexception {
+ char *name;
+ char *msg;
+} _sharedexception;
+
+static _sharedexception *
+_sharedexception_new(void)
+{
+ _sharedexception *err = PyMem_NEW(_sharedexception, 1);
+ if (err == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ err->name = NULL;
+ err->msg = NULL;
+ return err;
+}
+
+static void
+_sharedexception_clear(_sharedexception *exc)
+{
+ if (exc->name != NULL) {
+ PyMem_Free(exc->name);
+ }
+ if (exc->msg != NULL) {
+ PyMem_Free(exc->msg);
+ }
+}
+
+static void
+_sharedexception_free(_sharedexception *exc)
+{
+ _sharedexception_clear(exc);
+ PyMem_Free(exc);
+}
+
+static _sharedexception *
+_sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
+{
+ assert(exctype != NULL);
+ char *failure = NULL;
+
+ _sharedexception *err = _sharedexception_new();
+ if (err == NULL) {
+ goto finally;
+ }
+
+ PyObject *name = PyUnicode_FromFormat("%S", exctype);
+ if (name == NULL) {
+ failure = "unable to format exception type name";
+ goto finally;
+ }
+ err->name = _copy_raw_string(name);
+ Py_DECREF(name);
+ if (err->name == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
+ failure = "out of memory copying exception type name";
+ } else {
+ failure = "unable to encode and copy exception type name";
+ }
+ goto finally;
+ }
+
+ if (exc != NULL) {
+ PyObject *msg = PyUnicode_FromFormat("%S", exc);
+ if (msg == NULL) {
+ failure = "unable to format exception message";
+ goto finally;
+ }
+ err->msg = _copy_raw_string(msg);
+ Py_DECREF(msg);
+ if (err->msg == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
+ failure = "out of memory copying exception message";
+ } else {
+ failure = "unable to encode and copy exception message";
+ }
+ goto finally;
+ }
+ }
+
+finally:
+ if (failure != NULL) {
+ PyErr_Clear();
+ if (err->name != NULL) {
+ PyMem_Free(err->name);
+ err->name = NULL;
+ }
+ err->msg = failure;
+ }
+ return err;
+}
+
+static void
+_sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
+{
+ if (exc->name != NULL) {
+ if (exc->msg != NULL) {
+ PyErr_Format(wrapperclass, "%s: %s", exc->name, exc->msg);
+ }
+ else {
+ PyErr_SetString(wrapperclass, exc->name);
+ }
+ }
+ else if (exc->msg != NULL) {
+ PyErr_SetString(wrapperclass, exc->msg);
+ }
+ else {
+ PyErr_SetNone(wrapperclass);
+ }
+}
+
+
+/* channel-specific code ****************************************************/
+
+#define CHANNEL_SEND 1
+#define CHANNEL_BOTH 0
+#define CHANNEL_RECV -1
+
+static PyObject *ChannelError;
+static PyObject *ChannelNotFoundError;
+static PyObject *ChannelClosedError;
+static PyObject *ChannelEmptyError;
+static PyObject *ChannelNotEmptyError;
+
+static int
+channel_exceptions_init(PyObject *ns)
+{
+ // XXX Move the exceptions into per-module memory?
+
+ // A channel-related operation failed.
+ ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError",
+ PyExc_RuntimeError, NULL);
+ if (ChannelError == NULL) {
+ return -1;
+ }
+ if (PyDict_SetItemString(ns, "ChannelError", ChannelError) != 0) {
+ return -1;
+ }
+
+ // An operation tried to use a channel that doesn't exist.
+ ChannelNotFoundError = PyErr_NewException(
+ "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL);
+ if (ChannelNotFoundError == NULL) {
+ return -1;
+ }
+ if (PyDict_SetItemString(ns, "ChannelNotFoundError", ChannelNotFoundError) != 0) {
+ return -1;
+ }
+
+ // An operation tried to use a closed channel.
+ ChannelClosedError = PyErr_NewException(
+ "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL);
+ if (ChannelClosedError == NULL) {
+ return -1;
+ }
+ if (PyDict_SetItemString(ns, "ChannelClosedError", ChannelClosedError) != 0) {
+ return -1;
+ }
+
+ // An operation tried to pop from an empty channel.
+ ChannelEmptyError = PyErr_NewException(
+ "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL);
+ if (ChannelEmptyError == NULL) {
+ return -1;
+ }
+ if (PyDict_SetItemString(ns, "ChannelEmptyError", ChannelEmptyError) != 0) {
+ return -1;
+ }
+
+ // An operation tried to close a non-empty channel.
+ ChannelNotEmptyError = PyErr_NewException(
+ "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL);
+ if (ChannelNotEmptyError == NULL) {
+ return -1;
+ }
+ if (PyDict_SetItemString(ns, "ChannelNotEmptyError", ChannelNotEmptyError) != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/* the channel queue */
+
+struct _channelitem;
+
+typedef struct _channelitem {
+ _PyCrossInterpreterData *data;
+ struct _channelitem *next;
+} _channelitem;
+
+static _channelitem *
+_channelitem_new(void)
+{
+ _channelitem *item = PyMem_NEW(_channelitem, 1);
+ if (item == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ item->data = NULL;
+ item->next = NULL;
+ return item;
+}
+
+static void
+_channelitem_clear(_channelitem *item)
+{
+ if (item->data != NULL) {
+ _PyCrossInterpreterData_Release(item->data);
+ PyMem_Free(item->data);
+ item->data = NULL;
+ }
+ item->next = NULL;
+}
+
+static void
+_channelitem_free(_channelitem *item)
+{
+ _channelitem_clear(item);
+ PyMem_Free(item);
+}
+
+static void
+_channelitem_free_all(_channelitem *item)
+{
+ while (item != NULL) {
+ _channelitem *last = item;
+ item = item->next;
+ _channelitem_free(last);
+ }
+}
+
+static _PyCrossInterpreterData *
+_channelitem_popped(_channelitem *item)
+{
+ _PyCrossInterpreterData *data = item->data;
+ item->data = NULL;
+ _channelitem_free(item);
+ return data;
+}
+
+typedef struct _channelqueue {
+ int64_t count;
+ _channelitem *first;
+ _channelitem *last;
+} _channelqueue;
+
+static _channelqueue *
+_channelqueue_new(void)
+{
+ _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
+ if (queue == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ queue->count = 0;
+ queue->first = NULL;
+ queue->last = NULL;
+ return queue;
+}
+
+static void
+_channelqueue_clear(_channelqueue *queue)
+{
+ _channelitem_free_all(queue->first);
+ queue->count = 0;
+ queue->first = NULL;
+ queue->last = NULL;
+}
+
+static void
+_channelqueue_free(_channelqueue *queue)
+{
+ _channelqueue_clear(queue);
+ PyMem_Free(queue);
+}
+
+static int
+_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
+{
+ _channelitem *item = _channelitem_new();
+ if (item == NULL) {
+ return -1;
+ }
+ item->data = data;
+
+ queue->count += 1;
+ if (queue->first == NULL) {
+ queue->first = item;
+ }
+ else {
+ queue->last->next = item;
+ }
+ queue->last = item;
+ return 0;
+}
+
+static _PyCrossInterpreterData *
+_channelqueue_get(_channelqueue *queue)
+{
+ _channelitem *item = queue->first;
+ if (item == NULL) {
+ return NULL;
+ }
+ queue->first = item->next;
+ if (queue->last == item) {
+ queue->last = NULL;
+ }
+ queue->count -= 1;
+
+ return _channelitem_popped(item);
+}
+
+/* channel-interpreter associations */
+
+struct _channelend;
+
+typedef struct _channelend {
+ struct _channelend *next;
+ int64_t interp;
+ int open;
+} _channelend;
+
+static _channelend *
+_channelend_new(int64_t interp)
+{
+ _channelend *end = PyMem_NEW(_channelend, 1);
+ if (end == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ end->next = NULL;
+ end->interp = interp;
+ end->open = 1;
+ return end;
+}
+
+static void
+_channelend_free(_channelend *end)
+{
+ PyMem_Free(end);
+}
+
+static void
+_channelend_free_all(_channelend *end)
+{
+ while (end != NULL) {
+ _channelend *last = end;
+ end = end->next;
+ _channelend_free(last);
+ }
+}
+
+static _channelend *
+_channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
+{
+ _channelend *prev = NULL;
+ _channelend *end = first;
+ while (end != NULL) {
+ if (end->interp == interp) {
+ break;
+ }
+ prev = end;
+ end = end->next;
+ }
+ if (pprev != NULL) {
+ *pprev = prev;
+ }
+ return end;
+}
+
+typedef struct _channelassociations {
+ // Note that the list entries are never removed for interpreter
+ // for which the channel is closed. This should not be a problem in
+ // practice. Also, a channel isn't automatically closed when an
+ // interpreter is destroyed.
+ int64_t numsendopen;
+ int64_t numrecvopen;
+ _channelend *send;
+ _channelend *recv;
+} _channelends;
+
+static _channelends *
+_channelends_new(void)
+{
+ _channelends *ends = PyMem_NEW(_channelends, 1);
+ if (ends== NULL) {
+ return NULL;
+ }
+ ends->numsendopen = 0;
+ ends->numrecvopen = 0;
+ ends->send = NULL;
+ ends->recv = NULL;
+ return ends;
+}
+
+static void
+_channelends_clear(_channelends *ends)
+{
+ _channelend_free_all(ends->send);
+ ends->send = NULL;
+ ends->numsendopen = 0;
+
+ _channelend_free_all(ends->recv);
+ ends->recv = NULL;
+ ends->numrecvopen = 0;
+}
+
+static void
+_channelends_free(_channelends *ends)
+{
+ _channelends_clear(ends);
+ PyMem_Free(ends);
+}
+
+static _channelend *
+_channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
+ int send)
+{
+ _channelend *end = _channelend_new(interp);
+ if (end == NULL) {
+ return NULL;
+ }
+
+ if (prev == NULL) {
+ if (send) {
+ ends->send = end;
+ }
+ else {
+ ends->recv = end;
+ }
+ }
+ else {
+ prev->next = end;
+ }
+ if (send) {
+ ends->numsendopen += 1;
+ }
+ else {
+ ends->numrecvopen += 1;
+ }
+ return end;
+}
+
+static int
+_channelends_associate(_channelends *ends, int64_t interp, int send)
+{
+ _channelend *prev;
+ _channelend *end = _channelend_find(send ? ends->send : ends->recv,
+ interp, &prev);
+ if (end != NULL) {
+ if (!end->open) {
+ PyErr_SetString(ChannelClosedError, "channel already closed");
+ return -1;
+ }
+ // already associated
+ return 0;
+ }
+ if (_channelends_add(ends, prev, interp, send) == NULL) {
+ return -1;
+ }
+ return 0;
+}
+
+static int
+_channelends_is_open(_channelends *ends)
+{
+ if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
+ return 1;
+ }
+ if (ends->send == NULL && ends->recv == NULL) {
+ return 1;
+ }
+ return 0;
+}
+
+static void
+_channelends_close_end(_channelends *ends, _channelend *end, int send)
+{
+ end->open = 0;
+ if (send) {
+ ends->numsendopen -= 1;
+ }
+ else {
+ ends->numrecvopen -= 1;
+ }
+}
+
+static int
+_channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
+{
+ _channelend *prev;
+ _channelend *end;
+ if (which >= 0) { // send/both
+ end = _channelend_find(ends->send, interp, &prev);
+ if (end == NULL) {
+ // never associated so add it
+ end = _channelends_add(ends, prev, interp, 1);
+ if (end == NULL) {
+ return -1;
+ }
+ }
+ _channelends_close_end(ends, end, 1);
+ }
+ if (which <= 0) { // recv/both
+ end = _channelend_find(ends->recv, interp, &prev);
+ if (end == NULL) {
+ // never associated so add it
+ end = _channelends_add(ends, prev, interp, 0);
+ if (end == NULL) {
+ return -1;
+ }
+ }
+ _channelends_close_end(ends, end, 0);
+ }
+ return 0;
+}
+
+static void
+_channelends_close_all(_channelends *ends, int which, int force)
+{
+ // XXX Handle the ends.
+ // XXX Handle force is True.
+
+ // Ensure all the "send"-associated interpreters are closed.
+ _channelend *end;
+ for (end = ends->send; end != NULL; end = end->next) {
+ _channelends_close_end(ends, end, 1);
+ }
+
+ // Ensure all the "recv"-associated interpreters are closed.
+ for (end = ends->recv; end != NULL; end = end->next) {
+ _channelends_close_end(ends, end, 0);
+ }
+}
+
+/* channels */
+
+struct _channel;
+struct _channel_closing;
+static void _channel_clear_closing(struct _channel *);
+static void _channel_finish_closing(struct _channel *);
+
+typedef struct _channel {
+ PyThread_type_lock mutex;
+ _channelqueue *queue;
+ _channelends *ends;
+ int open;
+ struct _channel_closing *closing;
+} _PyChannelState;
+
+static _PyChannelState *
+_channel_new(void)
+{
+ _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
+ if (chan == NULL) {
+ return NULL;
+ }
+ chan->mutex = PyThread_allocate_lock();
+ if (chan->mutex == NULL) {
+ PyMem_Free(chan);
+ PyErr_SetString(ChannelError,
+ "can't initialize mutex for new channel");
+ return NULL;
+ }
+ chan->queue = _channelqueue_new();
+ if (chan->queue == NULL) {
+ PyMem_Free(chan);
+ return NULL;
+ }
+ chan->ends = _channelends_new();
+ if (chan->ends == NULL) {
+ _channelqueue_free(chan->queue);
+ PyMem_Free(chan);
+ return NULL;
+ }
+ chan->open = 1;
+ chan->closing = NULL;
+ return chan;
+}
+
+static void
+_channel_free(_PyChannelState *chan)
+{
+ _channel_clear_closing(chan);
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+ _channelqueue_free(chan->queue);
+ _channelends_free(chan->ends);
+ PyThread_release_lock(chan->mutex);
+
+ PyThread_free_lock(chan->mutex);
+ PyMem_Free(chan);
+}
+
+static int
+_channel_add(_PyChannelState *chan, int64_t interp,
+ _PyCrossInterpreterData *data)
+{
+ int res = -1;
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+ if (!chan->open) {
+ PyErr_SetString(ChannelClosedError, "channel closed");
+ goto done;
+ }
+ if (_channelends_associate(chan->ends, interp, 1) != 0) {
+ goto done;
+ }
+
+ if (_channelqueue_put(chan->queue, data) != 0) {
+ goto done;
+ }
+
+ res = 0;
+done:
+ PyThread_release_lock(chan->mutex);
+ return res;
+}
+
+static _PyCrossInterpreterData *
+_channel_next(_PyChannelState *chan, int64_t interp)
+{
+ _PyCrossInterpreterData *data = NULL;
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+ if (!chan->open) {
+ PyErr_SetString(ChannelClosedError, "channel closed");
+ goto done;
+ }
+ if (_channelends_associate(chan->ends, interp, 0) != 0) {
+ goto done;
+ }
+
+ data = _channelqueue_get(chan->queue);
+ if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
+ chan->open = 0;
+ }
+
+done:
+ PyThread_release_lock(chan->mutex);
+ if (chan->queue->count == 0) {
+ _channel_finish_closing(chan);
+ }
+ return data;
+}
+
+static int
+_channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
+{
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+ int res = -1;
+ if (!chan->open) {
+ PyErr_SetString(ChannelClosedError, "channel already closed");
+ goto done;
+ }
+
+ if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
+ goto done;
+ }
+ chan->open = _channelends_is_open(chan->ends);
+
+ res = 0;
+done:
+ PyThread_release_lock(chan->mutex);
+ return res;
+}
+
+static int
+_channel_close_all(_PyChannelState *chan, int end, int force)
+{
+ int res = -1;
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+ if (!chan->open) {
+ PyErr_SetString(ChannelClosedError, "channel already closed");
+ goto done;
+ }
+
+ if (!force && chan->queue->count > 0) {
+ PyErr_SetString(ChannelNotEmptyError,
+ "may not be closed if not empty (try force=True)");
+ goto done;
+ }
+
+ chan->open = 0;
+
+ // We *could* also just leave these in place, since we've marked
+ // the channel as closed already.
+ _channelends_close_all(chan->ends, end, force);
+
+ res = 0;
+done:
+ PyThread_release_lock(chan->mutex);
+ return res;
+}
+
+/* the set of channels */
+
+struct _channelref;
+
+typedef struct _channelref {
+ int64_t id;
+ _PyChannelState *chan;
+ struct _channelref *next;
+ Py_ssize_t objcount;
+} _channelref;
+
+static _channelref *
+_channelref_new(int64_t id, _PyChannelState *chan)
+{
+ _channelref *ref = PyMem_NEW(_channelref, 1);
+ if (ref == NULL) {
+ return NULL;
+ }
+ ref->id = id;
+ ref->chan = chan;
+ ref->next = NULL;
+ ref->objcount = 0;
+ return ref;
+}
+
+//static void
+//_channelref_clear(_channelref *ref)
+//{
+// ref->id = -1;
+// ref->chan = NULL;
+// ref->next = NULL;
+// ref->objcount = 0;
+//}
+
+static void
+_channelref_free(_channelref *ref)
+{
+ if (ref->chan != NULL) {
+ _channel_clear_closing(ref->chan);
+ }
+ //_channelref_clear(ref);
+ PyMem_Free(ref);
+}
+
+static _channelref *
+_channelref_find(_channelref *first, int64_t id, _channelref **pprev)
+{
+ _channelref *prev = NULL;
+ _channelref *ref = first;
+ while (ref != NULL) {
+ if (ref->id == id) {
+ break;
+ }
+ prev = ref;
+ ref = ref->next;
+ }
+ if (pprev != NULL) {
+ *pprev = prev;
+ }
+ return ref;
+}
+
+typedef struct _channels {
+ PyThread_type_lock mutex;
+ _channelref *head;
+ int64_t numopen;
+ int64_t next_id;
+} _channels;
+
+static int
+_channels_init(_channels *channels)
+{
+ if (channels->mutex == NULL) {
+ channels->mutex = PyThread_allocate_lock();
+ if (channels->mutex == NULL) {
+ PyErr_SetString(ChannelError,
+ "can't initialize mutex for channel management");
+ return -1;
+ }
+ }
+ channels->head = NULL;
+ channels->numopen = 0;
+ channels->next_id = 0;
+ return 0;
+}
+
+static int64_t
+_channels_next_id(_channels *channels) // needs lock
+{
+ int64_t id = channels->next_id;
+ if (id < 0) {
+ /* overflow */
+ PyErr_SetString(ChannelError,
+ "failed to get a channel ID");
+ return -1;
+ }
+ channels->next_id += 1;
+ return id;
+}
+
+static _PyChannelState *
+_channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex)
+{
+ _PyChannelState *chan = NULL;
+ PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+ if (pmutex != NULL) {
+ *pmutex = NULL;
+ }
+
+ _channelref *ref = _channelref_find(channels->head, id, NULL);
+ if (ref == NULL) {
+ PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
+ goto done;
+ }
+ if (ref->chan == NULL || !ref->chan->open) {
+ PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
+ goto done;
+ }
+
+ if (pmutex != NULL) {
+ // The mutex will be closed by the caller.
+ *pmutex = channels->mutex;
+ }
+
+ chan = ref->chan;
+done:
+ if (pmutex == NULL || *pmutex == NULL) {
+ PyThread_release_lock(channels->mutex);
+ }
+ return chan;
+}
+
+static int64_t
+_channels_add(_channels *channels, _PyChannelState *chan)
+{
+ int64_t cid = -1;
+ PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+
+ // Create a new ref.
+ int64_t id = _channels_next_id(channels);
+ if (id < 0) {
+ goto done;
+ }
+ _channelref *ref = _channelref_new(id, chan);
+ if (ref == NULL) {
+ goto done;
+ }
+
+ // Add it to the list.
+ // We assume that the channel is a new one (not already in the list).
+ ref->next = channels->head;
+ channels->head = ref;
+ channels->numopen += 1;
+
+ cid = id;
+done:
+ PyThread_release_lock(channels->mutex);
+ return cid;
+}
+
+/* forward */
+static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
+
+static int
+_channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
+ int end, int force)
+{
+ int res = -1;
+ PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+ if (pchan != NULL) {
+ *pchan = NULL;
+ }
+
+ _channelref *ref = _channelref_find(channels->head, cid, NULL);
+ if (ref == NULL) {
+ PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", cid);
+ goto done;
+ }
+
+ if (ref->chan == NULL) {
+ PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
+ goto done;
+ }
+ else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
+ PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
+ goto done;
+ }
+ else {
+ if (_channel_close_all(ref->chan, end, force) != 0) {
+ if (end == CHANNEL_SEND &&
+ PyErr_ExceptionMatches(ChannelNotEmptyError)) {
+ if (ref->chan->closing != NULL) {
+ PyErr_Format(ChannelClosedError,
+ "channel %" PRId64 " closed", cid);
+ goto done;
+ }
+ // Mark the channel as closing and return. The channel
+ // will be cleaned up in _channel_next().
+ PyErr_Clear();
+ if (_channel_set_closing(ref, channels->mutex) != 0) {
+ goto done;
+ }
+ if (pchan != NULL) {
+ *pchan = ref->chan;
+ }
+ res = 0;
+ }
+ goto done;
+ }
+ if (pchan != NULL) {
+ *pchan = ref->chan;
+ }
+ else {
+ _channel_free(ref->chan);
+ }
+ ref->chan = NULL;
+ }
+
+ res = 0;
+done:
+ PyThread_release_lock(channels->mutex);
+ return res;
+}
+
+static void
+_channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
+ _PyChannelState **pchan)
+{
+ if (ref == channels->head) {
+ channels->head = ref->next;
+ }
+ else {
+ prev->next = ref->next;
+ }
+ channels->numopen -= 1;
+
+ if (pchan != NULL) {
+ *pchan = ref->chan;
+ }
+ _channelref_free(ref);
+}
+
+static int
+_channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
+{
+ int res = -1;
+ PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+
+ if (pchan != NULL) {
+ *pchan = NULL;
+ }
+
+ _channelref *prev = NULL;
+ _channelref *ref = _channelref_find(channels->head, id, &prev);
+ if (ref == NULL) {
+ PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
+ goto done;
+ }
+
+ _channels_remove_ref(channels, ref, prev, pchan);
+
+ res = 0;
+done:
+ PyThread_release_lock(channels->mutex);
+ return res;
+}
+
+static int
+_channels_add_id_object(_channels *channels, int64_t id)
+{
+ int res = -1;
+ PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+
+ _channelref *ref = _channelref_find(channels->head, id, NULL);
+ if (ref == NULL) {
+ PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
+ goto done;
+ }
+ ref->objcount += 1;
+
+ res = 0;
+done:
+ PyThread_release_lock(channels->mutex);
+ return res;
+}
+
+static void
+_channels_drop_id_object(_channels *channels, int64_t id)
+{
+ PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+
+ _channelref *prev = NULL;
+ _channelref *ref = _channelref_find(channels->head, id, &prev);
+ if (ref == NULL) {
+ // Already destroyed.
+ goto done;
+ }
+ ref->objcount -= 1;
+
+ // Destroy if no longer used.
+ if (ref->objcount == 0) {
+ _PyChannelState *chan = NULL;
+ _channels_remove_ref(channels, ref, prev, &chan);
+ if (chan != NULL) {
+ _channel_free(chan);
+ }
+ }
+
+done:
+ PyThread_release_lock(channels->mutex);
+}
+
+static int64_t *
+_channels_list_all(_channels *channels, int64_t *count)
+{
+ int64_t *cids = NULL;
+ PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+ int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
+ if (ids == NULL) {
+ goto done;
+ }
+ _channelref *ref = channels->head;
+ for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
+ ids[i] = ref->id;
+ }
+ *count = channels->numopen;
+
+ cids = ids;
+done:
+ PyThread_release_lock(channels->mutex);
+ return cids;
+}
+
+/* support for closing non-empty channels */
+
+struct _channel_closing {
+ struct _channelref *ref;
+};
+
+static int
+_channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
+ struct _channel *chan = ref->chan;
+ if (chan == NULL) {
+ // already closed
+ return 0;
+ }
+ int res = -1;
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+ if (chan->closing != NULL) {
+ PyErr_SetString(ChannelClosedError, "channel closed");
+ goto done;
+ }
+ chan->closing = PyMem_NEW(struct _channel_closing, 1);
+ if (chan->closing == NULL) {
+ goto done;
+ }
+ chan->closing->ref = ref;
+
+ res = 0;
+done:
+ PyThread_release_lock(chan->mutex);
+ return res;
+}
+
+static void
+_channel_clear_closing(struct _channel *chan) {
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+ if (chan->closing != NULL) {
+ PyMem_Free(chan->closing);
+ chan->closing = NULL;
+ }
+ PyThread_release_lock(chan->mutex);
+}
+
+static void
+_channel_finish_closing(struct _channel *chan) {
+ struct _channel_closing *closing = chan->closing;
+ if (closing == NULL) {
+ return;
+ }
+ _channelref *ref = closing->ref;
+ _channel_clear_closing(chan);
+ // Do the things that would have been done in _channels_close().
+ ref->chan = NULL;
+ _channel_free(chan);
+}
+
+/* "high"-level channel-related functions */
+
+static int64_t
+_channel_create(_channels *channels)
+{
+ _PyChannelState *chan = _channel_new();
+ if (chan == NULL) {
+ return -1;
+ }
+ int64_t id = _channels_add(channels, chan);
+ if (id < 0) {
+ _channel_free(chan);
+ return -1;
+ }
+ return id;
+}
+
+static int
+_channel_destroy(_channels *channels, int64_t id)
+{
+ _PyChannelState *chan = NULL;
+ if (_channels_remove(channels, id, &chan) != 0) {
+ return -1;
+ }
+ if (chan != NULL) {
+ _channel_free(chan);
+ }
+ return 0;
+}
+
+static int
+_channel_send(_channels *channels, int64_t id, PyObject *obj)
+{
+ PyInterpreterState *interp = _get_current();
+ if (interp == NULL) {
+ return -1;
+ }
+
+ // Look up the channel.
+ PyThread_type_lock mutex = NULL;
+ _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
+ if (chan == NULL) {
+ return -1;
+ }
+ // Past this point we are responsible for releasing the mutex.
+
+ if (chan->closing != NULL) {
+ PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
+ PyThread_release_lock(mutex);
+ return -1;
+ }
+
+ // Convert the object to cross-interpreter data.
+ _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
+ if (data == NULL) {
+ PyThread_release_lock(mutex);
+ return -1;
+ }
+ if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
+ PyThread_release_lock(mutex);
+ PyMem_Free(data);
+ return -1;
+ }
+
+ // Add the data to the channel.
+ int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
+ PyThread_release_lock(mutex);
+ if (res != 0) {
+ _PyCrossInterpreterData_Release(data);
+ PyMem_Free(data);
+ return -1;
+ }
+
+ return 0;
+}
+
+static PyObject *
+_channel_recv(_channels *channels, int64_t id)
+{
+ PyInterpreterState *interp = _get_current();
+ if (interp == NULL) {
+ return NULL;
+ }
+
+ // Look up the channel.
+ PyThread_type_lock mutex = NULL;
+ _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
+ if (chan == NULL) {
+ return NULL;
+ }
+ // Past this point we are responsible for releasing the mutex.
+
+ // Pop off the next item from the channel.
+ _PyCrossInterpreterData *data = _channel_next(chan, PyInterpreterState_GetID(interp));
+ PyThread_release_lock(mutex);
+ if (data == NULL) {
+ return NULL;
+ }
+
+ // Convert the data back to an object.
+ PyObject *obj = _PyCrossInterpreterData_NewObject(data);
+ _PyCrossInterpreterData_Release(data);
+ PyMem_Free(data);
+ if (obj == NULL) {
+ return NULL;
+ }
+
+ return obj;
+}
+
+static int
+_channel_drop(_channels *channels, int64_t id, int send, int recv)
+{
+ PyInterpreterState *interp = _get_current();
+ if (interp == NULL) {
+ return -1;
+ }
+
+ // Look up the channel.
+ PyThread_type_lock mutex = NULL;
+ _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
+ if (chan == NULL) {
+ return -1;
+ }
+ // Past this point we are responsible for releasing the mutex.
+
+ // Close one or both of the two ends.
+ int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
+ PyThread_release_lock(mutex);
+ return res;
+}
+
+static int
+_channel_close(_channels *channels, int64_t id, int end, int force)
+{
+ return _channels_close(channels, id, NULL, end, force);
+}
+
+static int
+_channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
+ int send)
+{
+ _PyChannelState *chan = _channels_lookup(channels, cid, NULL);
+ if (chan == NULL) {
+ return -1;
+ } else if (send && chan->closing != NULL) {
+ PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
+ return -1;
+ }
+
+ _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
+ interp, NULL);
+
+ return (end != NULL && end->open);
+}
+
+/* ChannelID class */
+
+static PyTypeObject ChannelIDtype;
+
+typedef struct channelid {
+ PyObject_HEAD
+ int64_t id;
+ int end;
+ int resolve;
+ _channels *channels;
+} channelid;
+
+static int
+channel_id_converter(PyObject *arg, void *ptr)
+{
+ int64_t cid;
+ if (PyObject_TypeCheck(arg, &ChannelIDtype)) {
+ cid = ((channelid *)arg)->id;
+ }
+ else if (PyIndex_Check(arg)) {
+ cid = PyLong_AsLongLong(arg);
+ if (cid == -1 && PyErr_Occurred()) {
+ return 0;
+ }
+ if (cid < 0) {
+ PyErr_Format(PyExc_ValueError,
+ "channel ID must be a non-negative int, got %R", arg);
+ return 0;
+ }
+ }
+ else {
+ PyErr_Format(PyExc_TypeError,
+ "channel ID must be an int, got %.100s",
+ Py_TYPE(arg)->tp_name);
+ return 0;
+ }
+ *(int64_t *)ptr = cid;
+ return 1;
+}
+
+static channelid *
+newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
+ int force, int resolve)
+{
+ channelid *self = PyObject_New(channelid, cls);
+ if (self == NULL) {
+ return NULL;
+ }
+ self->id = cid;
+ self->end = end;
+ self->resolve = resolve;
+ self->channels = channels;
+
+ if (_channels_add_id_object(channels, cid) != 0) {
+ if (force && PyErr_ExceptionMatches(ChannelNotFoundError)) {
+ PyErr_Clear();
+ }
+ else {
+ Py_DECREF((PyObject *)self);
+ return NULL;
+ }
+ }
+
+ return self;
+}
+
+static _channels * _global_channels(void);
+
+static PyObject *
+channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
+ int64_t cid;
+ int send = -1;
+ int recv = -1;
+ int force = 0;
+ int resolve = 0;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O&|$pppp:ChannelID.__new__", kwlist,
+ channel_id_converter, &cid, &send, &recv, &force, &resolve))
+ return NULL;
+
+ // Handle "send" and "recv".
+ if (send == 0 && recv == 0) {
+ PyErr_SetString(PyExc_ValueError,
+ "'send' and 'recv' cannot both be False");
+ return NULL;
+ }
+
+ int end = 0;
+ if (send == 1) {
+ if (recv == 0 || recv == -1) {
+ end = CHANNEL_SEND;
+ }
+ }
+ else if (recv == 1) {
+ end = CHANNEL_RECV;
+ }
+
+ return (PyObject *)newchannelid(cls, cid, end, _global_channels(),
+ force, resolve);
+}
+
+static void
+channelid_dealloc(PyObject *v)
+{
+ int64_t cid = ((channelid *)v)->id;
+ _channels *channels = ((channelid *)v)->channels;
+ Py_TYPE(v)->tp_free(v);
+
+ _channels_drop_id_object(channels, cid);
+}
+
+static PyObject *
+channelid_repr(PyObject *self)
+{
+ PyTypeObject *type = Py_TYPE(self);
+ const char *name = _PyType_Name(type);
+
+ channelid *cid = (channelid *)self;
+ const char *fmt;
+ if (cid->end == CHANNEL_SEND) {
+ fmt = "%s(%" PRId64 ", send=True)";
+ }
+ else if (cid->end == CHANNEL_RECV) {
+ fmt = "%s(%" PRId64 ", recv=True)";
+ }
+ else {
+ fmt = "%s(%" PRId64 ")";
+ }
+ return PyUnicode_FromFormat(fmt, name, cid->id);
+}
+
+static PyObject *
+channelid_str(PyObject *self)
+{
+ channelid *cid = (channelid *)self;
+ return PyUnicode_FromFormat("%" PRId64 "", cid->id);
+}
+
+static PyObject *
+channelid_int(PyObject *self)
+{
+ channelid *cid = (channelid *)self;
+ return PyLong_FromLongLong(cid->id);
+}
+
+static PyNumberMethods channelid_as_number = {
+ 0, /* nb_add */
+ 0, /* nb_subtract */
+ 0, /* nb_multiply */
+ 0, /* nb_remainder */
+ 0, /* nb_divmod */
+ 0, /* nb_power */
+ 0, /* nb_negative */
+ 0, /* nb_positive */
+ 0, /* nb_absolute */
+ 0, /* nb_bool */
+ 0, /* nb_invert */
+ 0, /* nb_lshift */
+ 0, /* nb_rshift */
+ 0, /* nb_and */
+ 0, /* nb_xor */
+ 0, /* nb_or */
+ (unaryfunc)channelid_int, /* nb_int */
+ 0, /* nb_reserved */
+ 0, /* nb_float */
+
+ 0, /* nb_inplace_add */
+ 0, /* nb_inplace_subtract */
+ 0, /* nb_inplace_multiply */
+ 0, /* nb_inplace_remainder */
+ 0, /* nb_inplace_power */
+ 0, /* nb_inplace_lshift */
+ 0, /* nb_inplace_rshift */
+ 0, /* nb_inplace_and */
+ 0, /* nb_inplace_xor */
+ 0, /* nb_inplace_or */
+
+ 0, /* nb_floor_divide */
+ 0, /* nb_true_divide */
+ 0, /* nb_inplace_floor_divide */
+ 0, /* nb_inplace_true_divide */
+
+ (unaryfunc)channelid_int, /* nb_index */
+};
+
+static Py_hash_t
+channelid_hash(PyObject *self)
+{
+ channelid *cid = (channelid *)self;
+ PyObject *id = PyLong_FromLongLong(cid->id);
+ if (id == NULL) {
+ return -1;
+ }
+ Py_hash_t hash = PyObject_Hash(id);
+ Py_DECREF(id);
+ return hash;
+}
+
+static PyObject *
+channelid_richcompare(PyObject *self, PyObject *other, int op)
+{
+ if (op != Py_EQ && op != Py_NE) {
+ Py_RETURN_NOTIMPLEMENTED;
+ }
+
+ if (!PyObject_TypeCheck(self, &ChannelIDtype)) {
+ Py_RETURN_NOTIMPLEMENTED;
+ }
+
+ channelid *cid = (channelid *)self;
+ int equal;
+ if (PyObject_TypeCheck(other, &ChannelIDtype)) {
+ channelid *othercid = (channelid *)other;
+ equal = (cid->end == othercid->end) && (cid->id == othercid->id);
+ }
+ else if (PyLong_Check(other)) {
+ /* Fast path */
+ int overflow;
+ long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
+ if (othercid == -1 && PyErr_Occurred()) {
+ return NULL;
+ }
+ equal = !overflow && (othercid >= 0) && (cid->id == othercid);
+ }
+ else if (PyNumber_Check(other)) {
+ PyObject *pyid = PyLong_FromLongLong(cid->id);
+ if (pyid == NULL) {
+ return NULL;
+ }
+ PyObject *res = PyObject_RichCompare(pyid, other, op);
+ Py_DECREF(pyid);
+ return res;
+ }
+ else {
+ Py_RETURN_NOTIMPLEMENTED;
+ }
+
+ if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
+ Py_RETURN_TRUE;
+ }
+ Py_RETURN_FALSE;
+}
+
+static PyObject *
+_channel_from_cid(PyObject *cid, int end)
+{
+ PyObject *highlevel = PyImport_ImportModule("interpreters");
+ if (highlevel == NULL) {
+ PyErr_Clear();
+ highlevel = PyImport_ImportModule("test.support.interpreters");
+ if (highlevel == NULL) {
+ return NULL;
+ }
+ }
+ const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
+ "SendChannel";
+ PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
+ Py_DECREF(highlevel);
+ if (cls == NULL) {
+ return NULL;
+ }
+ PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
+ Py_DECREF(cls);
+ if (chan == NULL) {
+ return NULL;
+ }
+ return chan;
+}
+
+struct _channelid_xid {
+ int64_t id;
+ int end;
+ int resolve;
+};
+
+static PyObject *
+_channelid_from_xid(_PyCrossInterpreterData *data)
+{
+ struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
+ // Note that we do not preserve the "resolve" flag.
+ PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end,
+ _global_channels(), 0, 0);
+ if (xid->end == 0) {
+ return cid;
+ }
+ if (!xid->resolve) {
+ return cid;
+ }
+
+ /* Try returning a high-level channel end but fall back to the ID. */
+ PyObject *chan = _channel_from_cid(cid, xid->end);
+ if (chan == NULL) {
+ PyErr_Clear();
+ return cid;
+ }
+ Py_DECREF(cid);
+ return chan;
+}
+
+static int
+_channelid_shared(PyObject *obj, _PyCrossInterpreterData *data)
+{
+ struct _channelid_xid *xid = PyMem_NEW(struct _channelid_xid, 1);
+ if (xid == NULL) {
+ return -1;
+ }
+ xid->id = ((channelid *)obj)->id;
+ xid->end = ((channelid *)obj)->end;
+ xid->resolve = ((channelid *)obj)->resolve;
+
+ data->data = xid;
+ Py_INCREF(obj);
+ data->obj = obj;
+ data->new_object = _channelid_from_xid;
+ data->free = PyMem_Free;
+ return 0;
+}
+
+static PyObject *
+channelid_end(PyObject *self, void *end)
+{
+ int force = 1;
+ channelid *cid = (channelid *)self;
+ if (end != NULL) {
+ return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end,
+ cid->channels, force, cid->resolve);
+ }
+
+ if (cid->end == CHANNEL_SEND) {
+ return PyUnicode_InternFromString("send");
+ }
+ if (cid->end == CHANNEL_RECV) {
+ return PyUnicode_InternFromString("recv");
+ }
+ return PyUnicode_InternFromString("both");
+}
+
+static int _channelid_end_send = CHANNEL_SEND;
+static int _channelid_end_recv = CHANNEL_RECV;
+
+static PyGetSetDef channelid_getsets[] = {
+ {"end", (getter)channelid_end, NULL,
+ PyDoc_STR("'send', 'recv', or 'both'")},
+ {"send", (getter)channelid_end, NULL,
+ PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
+ {"recv", (getter)channelid_end, NULL,
+ PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
+ {NULL}
+};
+
+PyDoc_STRVAR(channelid_doc,
+"A channel ID identifies a channel and may be used as an int.");
+
+static PyTypeObject ChannelIDtype = {
+ PyVarObject_HEAD_INIT(&PyType_Type, 0)
+ "_xxsubinterpreters.ChannelID", /* tp_name */
+ sizeof(channelid), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor)channelid_dealloc, /* tp_dealloc */
+ 0, /* tp_vectorcall_offset */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_as_async */
+ (reprfunc)channelid_repr, /* tp_repr */
+ &channelid_as_number, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ channelid_hash, /* tp_hash */
+ 0, /* tp_call */
+ (reprfunc)channelid_str, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ channelid_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ channelid_richcompare, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ 0, /* tp_methods */
+ 0, /* tp_members */
+ channelid_getsets, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ // Note that we do not set tp_new to channelid_new. Instead we
+ // set it to NULL, meaning it cannot be instantiated from Python
+ // code. We do this because there is a strong relationship between
+ // channel IDs and the channel lifecycle, so this limitation avoids
+ // related complications.
+ NULL, /* tp_new */
+};
+
+
+/* interpreter-specific code ************************************************/
+
+static PyObject * RunFailedError = NULL;
+
+static int
+interp_exceptions_init(PyObject *ns)
+{
+ // XXX Move the exceptions into per-module memory?
+
+ if (RunFailedError == NULL) {
+ // An uncaught exception came out of interp_run_string().
+ RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
+ PyExc_RuntimeError, NULL);
+ if (RunFailedError == NULL) {
+ return -1;
+ }
+ if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int
+_is_running(PyInterpreterState *interp)
+{
+ PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
+ if (PyThreadState_Next(tstate) != NULL) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "interpreter has more than one thread");
+ return -1;
+ }
+
+ assert(!PyErr_Occurred());
+ PyFrameObject *frame = PyThreadState_GetFrame(tstate);
+ if (frame == NULL) {
+ return 0;
+ }
+
+ int executing = (int)(frame->f_executing);
+ Py_DECREF(frame);
+
+ return executing;
+}
+
+static int
+_ensure_not_running(PyInterpreterState *interp)
+{
+ int is_running = _is_running(interp);
+ if (is_running < 0) {
+ return -1;
+ }
+ if (is_running) {
+ PyErr_Format(PyExc_RuntimeError, "interpreter already running");
+ return -1;
+ }
+ return 0;
+}
+
+static int
+_run_script(PyInterpreterState *interp, const char *codestr,
+ _sharedns *shared, _sharedexception **exc)
+{
+ PyObject *exctype = NULL;
+ PyObject *excval = NULL;
+ PyObject *tb = NULL;
+
+ PyObject *main_mod = _PyInterpreterState_GetMainModule(interp);
+ if (main_mod == NULL) {
+ goto error;
+ }
+ PyObject *ns = PyModule_GetDict(main_mod); // borrowed
+ Py_DECREF(main_mod);
+ if (ns == NULL) {
+ goto error;
+ }
+ Py_INCREF(ns);
+
+ // Apply the cross-interpreter data.
+ if (shared != NULL) {
+ if (_sharedns_apply(shared, ns) != 0) {
+ Py_DECREF(ns);
+ goto error;
+ }
+ }
+
+ // Run the string (see PyRun_SimpleStringFlags).
+ PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL);
+ Py_DECREF(ns);
+ if (result == NULL) {
+ goto error;
+ }
+ else {
+ Py_DECREF(result); // We throw away the result.
+ }
+
+ *exc = NULL;
+ return 0;
+
+error:
+ PyErr_Fetch(&exctype, &excval, &tb);
+
+ _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb);
+ Py_XDECREF(exctype);
+ Py_XDECREF(excval);
+ Py_XDECREF(tb);
+ if (sharedexc == NULL) {
+ fprintf(stderr, "RunFailedError: script raised an uncaught exception");
+ PyErr_Clear();
+ sharedexc = NULL;
+ }
+ else {
+ assert(!PyErr_Occurred());
+ }
+ *exc = sharedexc;
+ return -1;
+}
+
+static int
+_run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
+ PyObject *shareables)
+{
+ if (_ensure_not_running(interp) < 0) {
+ return -1;
+ }
+
+ _sharedns *shared = _get_shared_ns(shareables);
+ if (shared == NULL && PyErr_Occurred()) {
+ return -1;
+ }
+
+ // Switch to interpreter.
+ PyThreadState *save_tstate = NULL;
+ if (interp != PyInterpreterState_Get()) {
+ // XXX Using the "head" thread isn't strictly correct.
+ PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
+ // XXX Possible GILState issues?
+ save_tstate = PyThreadState_Swap(tstate);
+ }
+
+ // Run the script.
+ _sharedexception *exc = NULL;
+ int result = _run_script(interp, codestr, shared, &exc);
+
+ // Switch back.
+ if (save_tstate != NULL) {
+ PyThreadState_Swap(save_tstate);
+ }
+
+ // Propagate any exception out to the caller.
+ if (exc != NULL) {
+ _sharedexception_apply(exc, RunFailedError);
+ _sharedexception_free(exc);
+ }
+ else if (result != 0) {
+ // We were unable to allocate a shared exception.
+ PyErr_NoMemory();
+ }
+
+ if (shared != NULL) {
+ _sharedns_free(shared);
+ }
+
+ return result;
+}
+
+
+/* module level code ********************************************************/
+
+/* globals is the process-global state for the module. It holds all
+ the data that we need to share between interpreters, so it cannot
+ hold PyObject values. */
+static struct globals {
+ _channels channels;
+} _globals = {{0}};
+
+static int
+_init_globals(void)
+{
+ if (_channels_init(&_globals.channels) != 0) {
+ return -1;
+ }
+ return 0;
+}
+
+static _channels *
+_global_channels(void) {
+ return &_globals.channels;
+}
+
+static PyObject *
+interp_create(PyObject *self, PyObject *args, PyObject *kwds)
+{
+
+ static char *kwlist[] = {"isolated", NULL};
+ int isolated = 1;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "|$i:create", kwlist,
+ &isolated)) {
+ return NULL;
+ }
+
+ // Create and initialize the new interpreter.
+ PyThreadState *save_tstate = PyThreadState_Get();
+ // XXX Possible GILState issues?
+ PyThreadState *tstate = _Py_NewInterpreter(isolated);
+ PyThreadState_Swap(save_tstate);
+ if (tstate == NULL) {
+ /* Since no new thread state was created, there is no exception to
+ propagate; raise a fresh one after swapping in the old thread
+ state. */
+ PyErr_SetString(PyExc_RuntimeError, "interpreter creation failed");
+ return NULL;
+ }
+ PyInterpreterState *interp = PyThreadState_GetInterpreter(tstate);
+ PyObject *idobj = _PyInterpreterState_GetIDObject(interp);
+ if (idobj == NULL) {
+ // XXX Possible GILState issues?
+ save_tstate = PyThreadState_Swap(tstate);
+ Py_EndInterpreter(tstate);
+ PyThreadState_Swap(save_tstate);
+ return NULL;
+ }
+ _PyInterpreterState_RequireIDRef(interp, 1);
+ return idobj;
+}
+
+PyDoc_STRVAR(create_doc,
+"create() -> ID\n\
+\n\
+Create a new interpreter and return a unique generated ID.");
+
+
+static PyObject *
+interp_destroy(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"id", NULL};
+ PyObject *id;
+ // XXX Use "L" for id?
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O:destroy", kwlist, &id)) {
+ return NULL;
+ }
+
+ // Look up the interpreter.
+ PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
+ if (interp == NULL) {
+ return NULL;
+ }
+
+ // Ensure we don't try to destroy the current interpreter.
+ PyInterpreterState *current = _get_current();
+ if (current == NULL) {
+ return NULL;
+ }
+ if (interp == current) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "cannot destroy the current interpreter");
+ return NULL;
+ }
+
+ // Ensure the interpreter isn't running.
+ /* XXX We *could* support destroying a running interpreter but
+ aren't going to worry about it for now. */
+ if (_ensure_not_running(interp) < 0) {
+ return NULL;
+ }
+
+ // Destroy the interpreter.
+ PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
+ // XXX Possible GILState issues?
+ PyThreadState *save_tstate = PyThreadState_Swap(tstate);
+ Py_EndInterpreter(tstate);
+ PyThreadState_Swap(save_tstate);
+
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(destroy_doc,
+"destroy(id)\n\
+\n\
+Destroy the identified interpreter.\n\
+\n\
+Attempting to destroy the current interpreter results in a RuntimeError.\n\
+So does an unrecognized ID.");
+
+
+static PyObject *
+interp_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+ PyObject *ids, *id;
+ PyInterpreterState *interp;
+
+ ids = PyList_New(0);
+ if (ids == NULL) {
+ return NULL;
+ }
+
+ interp = PyInterpreterState_Head();
+ while (interp != NULL) {
+ id = _PyInterpreterState_GetIDObject(interp);
+ if (id == NULL) {
+ Py_DECREF(ids);
+ return NULL;
+ }
+ // insert at front of list
+ int res = PyList_Insert(ids, 0, id);
+ Py_DECREF(id);
+ if (res < 0) {
+ Py_DECREF(ids);
+ return NULL;
+ }
+
+ interp = PyInterpreterState_Next(interp);
+ }
+
+ return ids;
+}
+
+PyDoc_STRVAR(list_all_doc,
+"list_all() -> [ID]\n\
+\n\
+Return a list containing the ID of every existing interpreter.");
+
+
+static PyObject *
+interp_get_current(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+ PyInterpreterState *interp =_get_current();
+ if (interp == NULL) {
+ return NULL;
+ }
+ return _PyInterpreterState_GetIDObject(interp);
+}
+
+PyDoc_STRVAR(get_current_doc,
+"get_current() -> ID\n\
+\n\
+Return the ID of current interpreter.");
+
+
+static PyObject *
+interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+ // Currently, 0 is always the main interpreter.
+ int64_t id = 0;
+ return _PyInterpreterID_New(id);
+}
+
+PyDoc_STRVAR(get_main_doc,
+"get_main() -> ID\n\
+\n\
+Return the ID of main interpreter.");
+
+
+static PyObject *
+interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"id", "script", "shared", NULL};
+ PyObject *id, *code;
+ PyObject *shared = NULL;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "OU|O:run_string", kwlist,
+ &id, &code, &shared)) {
+ return NULL;
+ }
+
+ // Look up the interpreter.
+ PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
+ if (interp == NULL) {
+ return NULL;
+ }
+
+ // Extract code.
+ Py_ssize_t size;
+ const char *codestr = PyUnicode_AsUTF8AndSize(code, &size);
+ if (codestr == NULL) {
+ return NULL;
+ }
+ if (strlen(codestr) != (size_t)size) {
+ PyErr_SetString(PyExc_ValueError,
+ "source code string cannot contain null bytes");
+ return NULL;
+ }
+
+ // Run the code in the interpreter.
+ if (_run_script_in_interpreter(interp, codestr, shared) != 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(run_string_doc,
+"run_string(id, script, shared)\n\
+\n\
+Execute the provided string in the identified interpreter.\n\
+\n\
+See PyRun_SimpleStrings.");
+
+
+static PyObject *
+object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"obj", NULL};
+ PyObject *obj;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O:is_shareable", kwlist, &obj)) {
+ return NULL;
+ }
+
+ if (_PyObject_CheckCrossInterpreterData(obj) == 0) {
+ Py_RETURN_TRUE;
+ }
+ PyErr_Clear();
+ Py_RETURN_FALSE;
+}
+
+PyDoc_STRVAR(is_shareable_doc,
+"is_shareable(obj) -> bool\n\
+\n\
+Return True if the object's data may be shared between interpreters and\n\
+False otherwise.");
+
+
+static PyObject *
+interp_is_running(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"id", NULL};
+ PyObject *id;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O:is_running", kwlist, &id)) {
+ return NULL;
+ }
+
+ PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
+ if (interp == NULL) {
+ return NULL;
+ }
+ int is_running = _is_running(interp);
+ if (is_running < 0) {
+ return NULL;
+ }
+ if (is_running) {
+ Py_RETURN_TRUE;
+ }
+ Py_RETURN_FALSE;
+}
+
+PyDoc_STRVAR(is_running_doc,
+"is_running(id) -> bool\n\
+\n\
+Return whether or not the identified interpreter is running.");
+
+static PyObject *
+channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+ int64_t cid = _channel_create(&_globals.channels);
+ if (cid < 0) {
+ return NULL;
+ }
+ PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0,
+ &_globals.channels, 0, 0);
+ if (id == NULL) {
+ if (_channel_destroy(&_globals.channels, cid) != 0) {
+ // XXX issue a warning?
+ }
+ return NULL;
+ }
+ assert(((channelid *)id)->channels != NULL);
+ return id;
+}
+
+PyDoc_STRVAR(channel_create_doc,
+"channel_create() -> cid\n\
+\n\
+Create a new cross-interpreter channel and return a unique generated ID.");
+
+static PyObject *
+channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"cid", NULL};
+ int64_t cid;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
+ channel_id_converter, &cid)) {
+ return NULL;
+ }
+
+ if (_channel_destroy(&_globals.channels, cid) != 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(channel_destroy_doc,
+"channel_destroy(cid)\n\
+\n\
+Close and finalize the channel. Afterward attempts to use the channel\n\
+will behave as though it never existed.");
+
+static PyObject *
+channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+ int64_t count = 0;
+ int64_t *cids = _channels_list_all(&_globals.channels, &count);
+ if (cids == NULL) {
+ if (count == 0) {
+ return PyList_New(0);
+ }
+ return NULL;
+ }
+ PyObject *ids = PyList_New((Py_ssize_t)count);
+ if (ids == NULL) {
+ goto finally;
+ }
+ int64_t *cur = cids;
+ for (int64_t i=0; i < count; cur++, i++) {
+ PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0,
+ &_globals.channels, 0, 0);
+ if (id == NULL) {
+ Py_DECREF(ids);
+ ids = NULL;
+ break;
+ }
+ PyList_SET_ITEM(ids, i, id);
+ }
+
+finally:
+ PyMem_Free(cids);
+ return ids;
+}
+
+PyDoc_STRVAR(channel_list_all_doc,
+"channel_list_all() -> [cid]\n\
+\n\
+Return the list of all IDs for active channels.");
+
+static PyObject *
+channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"cid", "send", NULL};
+ int64_t cid; /* Channel ID */
+ int send = 0; /* Send or receive end? */
+ int64_t id;
+ PyObject *ids, *id_obj;
+ PyInterpreterState *interp;
+
+ if (!PyArg_ParseTupleAndKeywords(
+ args, kwds, "O&$p:channel_list_interpreters",
+ kwlist, channel_id_converter, &cid, &send)) {
+ return NULL;
+ }
+
+ ids = PyList_New(0);
+ if (ids == NULL) {
+ goto except;
+ }
+
+ interp = PyInterpreterState_Head();
+ while (interp != NULL) {
+ id = PyInterpreterState_GetID(interp);
+ assert(id >= 0);
+ int res = _channel_is_associated(&_globals.channels, cid, id, send);
+ if (res < 0) {
+ goto except;
+ }
+ if (res) {
+ id_obj = _PyInterpreterState_GetIDObject(interp);
+ if (id_obj == NULL) {
+ goto except;
+ }
+ res = PyList_Insert(ids, 0, id_obj);
+ Py_DECREF(id_obj);
+ if (res < 0) {
+ goto except;
+ }
+ }
+ interp = PyInterpreterState_Next(interp);
+ }
+
+ goto finally;
+
+except:
+ Py_XDECREF(ids);
+ ids = NULL;
+
+finally:
+ return ids;
+}
+
+PyDoc_STRVAR(channel_list_interpreters_doc,
+"channel_list_interpreters(cid, *, send) -> [id]\n\
+\n\
+Return the list of all interpreter IDs associated with an end of the channel.\n\
+\n\
+The 'send' argument should be a boolean indicating whether to use the send or\n\
+receive end.");
+
+
+static PyObject *
+channel_send(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"cid", "obj", NULL};
+ int64_t cid;
+ PyObject *obj;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
+ channel_id_converter, &cid, &obj)) {
+ return NULL;
+ }
+
+ if (_channel_send(&_globals.channels, cid, obj) != 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(channel_send_doc,
+"channel_send(cid, obj)\n\
+\n\
+Add the object's data to the channel's queue.");
+
+static PyObject *
+channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"cid", "default", NULL};
+ int64_t cid;
+ PyObject *dflt = NULL;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
+ channel_id_converter, &cid, &dflt)) {
+ return NULL;
+ }
+ Py_XINCREF(dflt);
+
+ PyObject *obj = _channel_recv(&_globals.channels, cid);
+ if (obj != NULL) {
+ Py_XDECREF(dflt);
+ return obj;
+ } else if (PyErr_Occurred()) {
+ Py_XDECREF(dflt);
+ return NULL;
+ } else if (dflt != NULL) {
+ return dflt;
+ } else {
+ PyErr_Format(ChannelEmptyError, "channel %" PRId64 " is empty", cid);
+ return NULL;
+ }
+}
+
+PyDoc_STRVAR(channel_recv_doc,
+"channel_recv(cid, [default]) -> obj\n\
+\n\
+Return a new object from the data at the front of the channel's queue.\n\
+\n\
+If there is nothing to receive then raise ChannelEmptyError, unless\n\
+a default value is provided. In that case return it.");
+
+static PyObject *
+channel_close(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
+ int64_t cid;
+ int send = 0;
+ int recv = 0;
+ int force = 0;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O&|$ppp:channel_close", kwlist,
+ channel_id_converter, &cid, &send, &recv, &force)) {
+ return NULL;
+ }
+
+ if (_channel_close(&_globals.channels, cid, send-recv, force) != 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(channel_close_doc,
+"channel_close(cid, *, send=None, recv=None, force=False)\n\
+\n\
+Close the channel for all interpreters.\n\
+\n\
+If the channel is empty then the keyword args are ignored and both\n\
+ends are immediately closed. Otherwise, if 'force' is True then\n\
+all queued items are released and both ends are immediately\n\
+closed.\n\
+\n\
+If the channel is not empty *and* 'force' is False then following\n\
+happens:\n\
+\n\
+ * recv is True (regardless of send):\n\
+ - raise ChannelNotEmptyError\n\
+ * recv is None and send is None:\n\
+ - raise ChannelNotEmptyError\n\
+ * send is True and recv is not True:\n\
+ - fully close the 'send' end\n\
+ - close the 'recv' end to interpreters not already receiving\n\
+ - fully close it once empty\n\
+\n\
+Closing an already closed channel results in a ChannelClosedError.\n\
+\n\
+Once the channel's ID has no more ref counts in any interpreter\n\
+the channel will be destroyed.");
+
+static PyObject *
+channel_release(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ // Note that only the current interpreter is affected.
+ static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
+ int64_t cid;
+ int send = 0;
+ int recv = 0;
+ int force = 0;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O&|$ppp:channel_release", kwlist,
+ channel_id_converter, &cid, &send, &recv, &force)) {
+ return NULL;
+ }
+ if (send == 0 && recv == 0) {
+ send = 1;
+ recv = 1;
+ }
+
+ // XXX Handle force is True.
+ // XXX Fix implicit release.
+
+ if (_channel_drop(&_globals.channels, cid, send, recv) != 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(channel_release_doc,
+"channel_release(cid, *, send=None, recv=None, force=True)\n\
+\n\
+Close the channel for the current interpreter. 'send' and 'recv'\n\
+(bool) may be used to indicate the ends to close. By default both\n\
+ends are closed. Closing an already closed end is a noop.");
+
+static PyObject *
+channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ return channelid_new(&ChannelIDtype, args, kwds);
+}
+
+static PyMethodDef module_functions[] = {
+ {"create", (PyCFunction)(void(*)(void))interp_create,
+ METH_VARARGS | METH_KEYWORDS, create_doc},
+ {"destroy", (PyCFunction)(void(*)(void))interp_destroy,
+ METH_VARARGS | METH_KEYWORDS, destroy_doc},
+ {"list_all", interp_list_all,
+ METH_NOARGS, list_all_doc},
+ {"get_current", interp_get_current,
+ METH_NOARGS, get_current_doc},
+ {"get_main", interp_get_main,
+ METH_NOARGS, get_main_doc},
+ {"is_running", (PyCFunction)(void(*)(void))interp_is_running,
+ METH_VARARGS | METH_KEYWORDS, is_running_doc},
+ {"run_string", (PyCFunction)(void(*)(void))interp_run_string,
+ METH_VARARGS | METH_KEYWORDS, run_string_doc},
+
+ {"is_shareable", (PyCFunction)(void(*)(void))object_is_shareable,
+ METH_VARARGS | METH_KEYWORDS, is_shareable_doc},
+
+ {"channel_create", channel_create,
+ METH_NOARGS, channel_create_doc},
+ {"channel_destroy", (PyCFunction)(void(*)(void))channel_destroy,
+ METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
+ {"channel_list_all", channel_list_all,
+ METH_NOARGS, channel_list_all_doc},
+ {"channel_list_interpreters", (PyCFunction)(void(*)(void))channel_list_interpreters,
+ METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
+ {"channel_send", (PyCFunction)(void(*)(void))channel_send,
+ METH_VARARGS | METH_KEYWORDS, channel_send_doc},
+ {"channel_recv", (PyCFunction)(void(*)(void))channel_recv,
+ METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
+ {"channel_close", (PyCFunction)(void(*)(void))channel_close,
+ METH_VARARGS | METH_KEYWORDS, channel_close_doc},
+ {"channel_release", (PyCFunction)(void(*)(void))channel_release,
+ METH_VARARGS | METH_KEYWORDS, channel_release_doc},
+ {"_channel_id", (PyCFunction)(void(*)(void))channel__channel_id,
+ METH_VARARGS | METH_KEYWORDS, NULL},
+
+ {NULL, NULL} /* sentinel */
+};
+
+
+/* initialization function */
+
+PyDoc_STRVAR(module_doc,
+"This module provides primitive operations to manage Python interpreters.\n\
+The 'interpreters' module provides a more convenient interface.");
+
+static struct PyModuleDef interpretersmodule = {
+ PyModuleDef_HEAD_INIT,
+ "_xxsubinterpreters", /* m_name */
+ module_doc, /* m_doc */
+ -1, /* m_size */
+ module_functions, /* m_methods */
+ NULL, /* m_slots */
+ NULL, /* m_traverse */
+ NULL, /* m_clear */
+ NULL /* m_free */
+};
+
+
+PyMODINIT_FUNC
+PyInit__xxsubinterpreters(void)
+{
+ if (_init_globals() != 0) {
+ return NULL;
+ }
+
+ /* Initialize types */
+ if (PyType_Ready(&ChannelIDtype) != 0) {
+ return NULL;
+ }
+
+ /* Create the module */
+ PyObject *module = PyModule_Create(&interpretersmodule);
+ if (module == NULL) {
+ return NULL;
+ }
+
+ /* Add exception types */
+ PyObject *ns = PyModule_GetDict(module); // borrowed
+ if (interp_exceptions_init(ns) != 0) {
+ return NULL;
+ }
+ if (channel_exceptions_init(ns) != 0) {
+ return NULL;
+ }
+
+ /* Add other types */
+ Py_INCREF(&ChannelIDtype);
+ if (PyDict_SetItemString(ns, "ChannelID", (PyObject *)&ChannelIDtype) != 0) {
+ return NULL;
+ }
+ Py_INCREF(&_PyInterpreterID_Type);
+ if (PyDict_SetItemString(ns, "InterpreterID", (PyObject *)&_PyInterpreterID_Type) != 0) {
+ return NULL;
+ }
+
+ if (_PyCrossInterpreterData_RegisterClass(&ChannelIDtype, _channelid_shared)) {
+ return NULL;
+ }
+
+ return module;
+}