diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c')
-rw-r--r-- | contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c | 2656 |
1 files changed, 2656 insertions, 0 deletions
diff --git a/contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c b/contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c new file mode 100644 index 0000000000..314059d108 --- /dev/null +++ b/contrib/tools/python3/src/Modules/_xxsubinterpretersmodule.c @@ -0,0 +1,2656 @@ + +/* interpreters module */ +/* low-level access to interpreter primitives */ + +#include "Python.h" +#include "frameobject.h" +#include "interpreteridobject.h" + + +static char * +_copy_raw_string(PyObject *strobj) +{ + const char *str = PyUnicode_AsUTF8(strobj); + if (str == NULL) { + return NULL; + } + char *copied = PyMem_Malloc(strlen(str)+1); + if (copied == NULL) { + PyErr_NoMemory(); + return NULL; + } + strcpy(copied, str); + return copied; +} + +static PyInterpreterState * +_get_current(void) +{ + // PyInterpreterState_Get() aborts if lookup fails, so don't need + // to check the result for NULL. + return PyInterpreterState_Get(); +} + + +/* data-sharing-specific code ***********************************************/ + +struct _sharednsitem { + char *name; + _PyCrossInterpreterData data; +}; + +static void _sharednsitem_clear(struct _sharednsitem *); // forward + +static int +_sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value) +{ + item->name = _copy_raw_string(key); + if (item->name == NULL) { + return -1; + } + if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) { + _sharednsitem_clear(item); + return -1; + } + return 0; +} + +static void +_sharednsitem_clear(struct _sharednsitem *item) +{ + if (item->name != NULL) { + PyMem_Free(item->name); + item->name = NULL; + } + _PyCrossInterpreterData_Release(&item->data); +} + +static int +_sharednsitem_apply(struct _sharednsitem *item, PyObject *ns) +{ + PyObject *name = PyUnicode_FromString(item->name); + if (name == NULL) { + return -1; + } + PyObject *value = _PyCrossInterpreterData_NewObject(&item->data); + if (value == NULL) { + Py_DECREF(name); + return -1; + } + int res = PyDict_SetItem(ns, name, value); + Py_DECREF(name); + Py_DECREF(value); + return res; +} + +typedef struct _sharedns { + Py_ssize_t len; + struct _sharednsitem* items; +} _sharedns; + +static _sharedns * +_sharedns_new(Py_ssize_t len) +{ + _sharedns *shared = PyMem_NEW(_sharedns, 1); + if (shared == NULL) { + PyErr_NoMemory(); + return NULL; + } + shared->len = len; + shared->items = PyMem_NEW(struct _sharednsitem, len); + if (shared->items == NULL) { + PyErr_NoMemory(); + PyMem_Free(shared); + return NULL; + } + return shared; +} + +static void +_sharedns_free(_sharedns *shared) +{ + for (Py_ssize_t i=0; i < shared->len; i++) { + _sharednsitem_clear(&shared->items[i]); + } + PyMem_Free(shared->items); + PyMem_Free(shared); +} + +static _sharedns * +_get_shared_ns(PyObject *shareable) +{ + if (shareable == NULL || shareable == Py_None) { + return NULL; + } + Py_ssize_t len = PyDict_Size(shareable); + if (len == 0) { + return NULL; + } + + _sharedns *shared = _sharedns_new(len); + if (shared == NULL) { + return NULL; + } + Py_ssize_t pos = 0; + for (Py_ssize_t i=0; i < len; i++) { + PyObject *key, *value; + if (PyDict_Next(shareable, &pos, &key, &value) == 0) { + break; + } + if (_sharednsitem_init(&shared->items[i], key, value) != 0) { + break; + } + } + if (PyErr_Occurred()) { + _sharedns_free(shared); + return NULL; + } + return shared; +} + +static int +_sharedns_apply(_sharedns *shared, PyObject *ns) +{ + for (Py_ssize_t i=0; i < shared->len; i++) { + if (_sharednsitem_apply(&shared->items[i], ns) != 0) { + return -1; + } + } + return 0; +} + +// Ultimately we'd like to preserve enough information about the +// exception and traceback that we could re-constitute (or at least +// simulate, a la traceback.TracebackException), and even chain, a copy +// of the exception in the calling interpreter. + +typedef struct _sharedexception { + char *name; + char *msg; +} _sharedexception; + +static _sharedexception * +_sharedexception_new(void) +{ + _sharedexception *err = PyMem_NEW(_sharedexception, 1); + if (err == NULL) { + PyErr_NoMemory(); + return NULL; + } + err->name = NULL; + err->msg = NULL; + return err; +} + +static void +_sharedexception_clear(_sharedexception *exc) +{ + if (exc->name != NULL) { + PyMem_Free(exc->name); + } + if (exc->msg != NULL) { + PyMem_Free(exc->msg); + } +} + +static void +_sharedexception_free(_sharedexception *exc) +{ + _sharedexception_clear(exc); + PyMem_Free(exc); +} + +static _sharedexception * +_sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb) +{ + assert(exctype != NULL); + char *failure = NULL; + + _sharedexception *err = _sharedexception_new(); + if (err == NULL) { + goto finally; + } + + PyObject *name = PyUnicode_FromFormat("%S", exctype); + if (name == NULL) { + failure = "unable to format exception type name"; + goto finally; + } + err->name = _copy_raw_string(name); + Py_DECREF(name); + if (err->name == NULL) { + if (PyErr_ExceptionMatches(PyExc_MemoryError)) { + failure = "out of memory copying exception type name"; + } else { + failure = "unable to encode and copy exception type name"; + } + goto finally; + } + + if (exc != NULL) { + PyObject *msg = PyUnicode_FromFormat("%S", exc); + if (msg == NULL) { + failure = "unable to format exception message"; + goto finally; + } + err->msg = _copy_raw_string(msg); + Py_DECREF(msg); + if (err->msg == NULL) { + if (PyErr_ExceptionMatches(PyExc_MemoryError)) { + failure = "out of memory copying exception message"; + } else { + failure = "unable to encode and copy exception message"; + } + goto finally; + } + } + +finally: + if (failure != NULL) { + PyErr_Clear(); + if (err->name != NULL) { + PyMem_Free(err->name); + err->name = NULL; + } + err->msg = failure; + } + return err; +} + +static void +_sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass) +{ + if (exc->name != NULL) { + if (exc->msg != NULL) { + PyErr_Format(wrapperclass, "%s: %s", exc->name, exc->msg); + } + else { + PyErr_SetString(wrapperclass, exc->name); + } + } + else if (exc->msg != NULL) { + PyErr_SetString(wrapperclass, exc->msg); + } + else { + PyErr_SetNone(wrapperclass); + } +} + + +/* channel-specific code ****************************************************/ + +#define CHANNEL_SEND 1 +#define CHANNEL_BOTH 0 +#define CHANNEL_RECV -1 + +static PyObject *ChannelError; +static PyObject *ChannelNotFoundError; +static PyObject *ChannelClosedError; +static PyObject *ChannelEmptyError; +static PyObject *ChannelNotEmptyError; + +static int +channel_exceptions_init(PyObject *ns) +{ + // XXX Move the exceptions into per-module memory? + + // A channel-related operation failed. + ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError", + PyExc_RuntimeError, NULL); + if (ChannelError == NULL) { + return -1; + } + if (PyDict_SetItemString(ns, "ChannelError", ChannelError) != 0) { + return -1; + } + + // An operation tried to use a channel that doesn't exist. + ChannelNotFoundError = PyErr_NewException( + "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL); + if (ChannelNotFoundError == NULL) { + return -1; + } + if (PyDict_SetItemString(ns, "ChannelNotFoundError", ChannelNotFoundError) != 0) { + return -1; + } + + // An operation tried to use a closed channel. + ChannelClosedError = PyErr_NewException( + "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL); + if (ChannelClosedError == NULL) { + return -1; + } + if (PyDict_SetItemString(ns, "ChannelClosedError", ChannelClosedError) != 0) { + return -1; + } + + // An operation tried to pop from an empty channel. + ChannelEmptyError = PyErr_NewException( + "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL); + if (ChannelEmptyError == NULL) { + return -1; + } + if (PyDict_SetItemString(ns, "ChannelEmptyError", ChannelEmptyError) != 0) { + return -1; + } + + // An operation tried to close a non-empty channel. + ChannelNotEmptyError = PyErr_NewException( + "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL); + if (ChannelNotEmptyError == NULL) { + return -1; + } + if (PyDict_SetItemString(ns, "ChannelNotEmptyError", ChannelNotEmptyError) != 0) { + return -1; + } + + return 0; +} + +/* the channel queue */ + +struct _channelitem; + +typedef struct _channelitem { + _PyCrossInterpreterData *data; + struct _channelitem *next; +} _channelitem; + +static _channelitem * +_channelitem_new(void) +{ + _channelitem *item = PyMem_NEW(_channelitem, 1); + if (item == NULL) { + PyErr_NoMemory(); + return NULL; + } + item->data = NULL; + item->next = NULL; + return item; +} + +static void +_channelitem_clear(_channelitem *item) +{ + if (item->data != NULL) { + _PyCrossInterpreterData_Release(item->data); + PyMem_Free(item->data); + item->data = NULL; + } + item->next = NULL; +} + +static void +_channelitem_free(_channelitem *item) +{ + _channelitem_clear(item); + PyMem_Free(item); +} + +static void +_channelitem_free_all(_channelitem *item) +{ + while (item != NULL) { + _channelitem *last = item; + item = item->next; + _channelitem_free(last); + } +} + +static _PyCrossInterpreterData * +_channelitem_popped(_channelitem *item) +{ + _PyCrossInterpreterData *data = item->data; + item->data = NULL; + _channelitem_free(item); + return data; +} + +typedef struct _channelqueue { + int64_t count; + _channelitem *first; + _channelitem *last; +} _channelqueue; + +static _channelqueue * +_channelqueue_new(void) +{ + _channelqueue *queue = PyMem_NEW(_channelqueue, 1); + if (queue == NULL) { + PyErr_NoMemory(); + return NULL; + } + queue->count = 0; + queue->first = NULL; + queue->last = NULL; + return queue; +} + +static void +_channelqueue_clear(_channelqueue *queue) +{ + _channelitem_free_all(queue->first); + queue->count = 0; + queue->first = NULL; + queue->last = NULL; +} + +static void +_channelqueue_free(_channelqueue *queue) +{ + _channelqueue_clear(queue); + PyMem_Free(queue); +} + +static int +_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data) +{ + _channelitem *item = _channelitem_new(); + if (item == NULL) { + return -1; + } + item->data = data; + + queue->count += 1; + if (queue->first == NULL) { + queue->first = item; + } + else { + queue->last->next = item; + } + queue->last = item; + return 0; +} + +static _PyCrossInterpreterData * +_channelqueue_get(_channelqueue *queue) +{ + _channelitem *item = queue->first; + if (item == NULL) { + return NULL; + } + queue->first = item->next; + if (queue->last == item) { + queue->last = NULL; + } + queue->count -= 1; + + return _channelitem_popped(item); +} + +/* channel-interpreter associations */ + +struct _channelend; + +typedef struct _channelend { + struct _channelend *next; + int64_t interp; + int open; +} _channelend; + +static _channelend * +_channelend_new(int64_t interp) +{ + _channelend *end = PyMem_NEW(_channelend, 1); + if (end == NULL) { + PyErr_NoMemory(); + return NULL; + } + end->next = NULL; + end->interp = interp; + end->open = 1; + return end; +} + +static void +_channelend_free(_channelend *end) +{ + PyMem_Free(end); +} + +static void +_channelend_free_all(_channelend *end) +{ + while (end != NULL) { + _channelend *last = end; + end = end->next; + _channelend_free(last); + } +} + +static _channelend * +_channelend_find(_channelend *first, int64_t interp, _channelend **pprev) +{ + _channelend *prev = NULL; + _channelend *end = first; + while (end != NULL) { + if (end->interp == interp) { + break; + } + prev = end; + end = end->next; + } + if (pprev != NULL) { + *pprev = prev; + } + return end; +} + +typedef struct _channelassociations { + // Note that the list entries are never removed for interpreter + // for which the channel is closed. This should not be a problem in + // practice. Also, a channel isn't automatically closed when an + // interpreter is destroyed. + int64_t numsendopen; + int64_t numrecvopen; + _channelend *send; + _channelend *recv; +} _channelends; + +static _channelends * +_channelends_new(void) +{ + _channelends *ends = PyMem_NEW(_channelends, 1); + if (ends== NULL) { + return NULL; + } + ends->numsendopen = 0; + ends->numrecvopen = 0; + ends->send = NULL; + ends->recv = NULL; + return ends; +} + +static void +_channelends_clear(_channelends *ends) +{ + _channelend_free_all(ends->send); + ends->send = NULL; + ends->numsendopen = 0; + + _channelend_free_all(ends->recv); + ends->recv = NULL; + ends->numrecvopen = 0; +} + +static void +_channelends_free(_channelends *ends) +{ + _channelends_clear(ends); + PyMem_Free(ends); +} + +static _channelend * +_channelends_add(_channelends *ends, _channelend *prev, int64_t interp, + int send) +{ + _channelend *end = _channelend_new(interp); + if (end == NULL) { + return NULL; + } + + if (prev == NULL) { + if (send) { + ends->send = end; + } + else { + ends->recv = end; + } + } + else { + prev->next = end; + } + if (send) { + ends->numsendopen += 1; + } + else { + ends->numrecvopen += 1; + } + return end; +} + +static int +_channelends_associate(_channelends *ends, int64_t interp, int send) +{ + _channelend *prev; + _channelend *end = _channelend_find(send ? ends->send : ends->recv, + interp, &prev); + if (end != NULL) { + if (!end->open) { + PyErr_SetString(ChannelClosedError, "channel already closed"); + return -1; + } + // already associated + return 0; + } + if (_channelends_add(ends, prev, interp, send) == NULL) { + return -1; + } + return 0; +} + +static int +_channelends_is_open(_channelends *ends) +{ + if (ends->numsendopen != 0 || ends->numrecvopen != 0) { + return 1; + } + if (ends->send == NULL && ends->recv == NULL) { + return 1; + } + return 0; +} + +static void +_channelends_close_end(_channelends *ends, _channelend *end, int send) +{ + end->open = 0; + if (send) { + ends->numsendopen -= 1; + } + else { + ends->numrecvopen -= 1; + } +} + +static int +_channelends_close_interpreter(_channelends *ends, int64_t interp, int which) +{ + _channelend *prev; + _channelend *end; + if (which >= 0) { // send/both + end = _channelend_find(ends->send, interp, &prev); + if (end == NULL) { + // never associated so add it + end = _channelends_add(ends, prev, interp, 1); + if (end == NULL) { + return -1; + } + } + _channelends_close_end(ends, end, 1); + } + if (which <= 0) { // recv/both + end = _channelend_find(ends->recv, interp, &prev); + if (end == NULL) { + // never associated so add it + end = _channelends_add(ends, prev, interp, 0); + if (end == NULL) { + return -1; + } + } + _channelends_close_end(ends, end, 0); + } + return 0; +} + +static void +_channelends_close_all(_channelends *ends, int which, int force) +{ + // XXX Handle the ends. + // XXX Handle force is True. + + // Ensure all the "send"-associated interpreters are closed. + _channelend *end; + for (end = ends->send; end != NULL; end = end->next) { + _channelends_close_end(ends, end, 1); + } + + // Ensure all the "recv"-associated interpreters are closed. + for (end = ends->recv; end != NULL; end = end->next) { + _channelends_close_end(ends, end, 0); + } +} + +/* channels */ + +struct _channel; +struct _channel_closing; +static void _channel_clear_closing(struct _channel *); +static void _channel_finish_closing(struct _channel *); + +typedef struct _channel { + PyThread_type_lock mutex; + _channelqueue *queue; + _channelends *ends; + int open; + struct _channel_closing *closing; +} _PyChannelState; + +static _PyChannelState * +_channel_new(void) +{ + _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1); + if (chan == NULL) { + return NULL; + } + chan->mutex = PyThread_allocate_lock(); + if (chan->mutex == NULL) { + PyMem_Free(chan); + PyErr_SetString(ChannelError, + "can't initialize mutex for new channel"); + return NULL; + } + chan->queue = _channelqueue_new(); + if (chan->queue == NULL) { + PyMem_Free(chan); + return NULL; + } + chan->ends = _channelends_new(); + if (chan->ends == NULL) { + _channelqueue_free(chan->queue); + PyMem_Free(chan); + return NULL; + } + chan->open = 1; + chan->closing = NULL; + return chan; +} + +static void +_channel_free(_PyChannelState *chan) +{ + _channel_clear_closing(chan); + PyThread_acquire_lock(chan->mutex, WAIT_LOCK); + _channelqueue_free(chan->queue); + _channelends_free(chan->ends); + PyThread_release_lock(chan->mutex); + + PyThread_free_lock(chan->mutex); + PyMem_Free(chan); +} + +static int +_channel_add(_PyChannelState *chan, int64_t interp, + _PyCrossInterpreterData *data) +{ + int res = -1; + PyThread_acquire_lock(chan->mutex, WAIT_LOCK); + + if (!chan->open) { + PyErr_SetString(ChannelClosedError, "channel closed"); + goto done; + } + if (_channelends_associate(chan->ends, interp, 1) != 0) { + goto done; + } + + if (_channelqueue_put(chan->queue, data) != 0) { + goto done; + } + + res = 0; +done: + PyThread_release_lock(chan->mutex); + return res; +} + +static _PyCrossInterpreterData * +_channel_next(_PyChannelState *chan, int64_t interp) +{ + _PyCrossInterpreterData *data = NULL; + PyThread_acquire_lock(chan->mutex, WAIT_LOCK); + + if (!chan->open) { + PyErr_SetString(ChannelClosedError, "channel closed"); + goto done; + } + if (_channelends_associate(chan->ends, interp, 0) != 0) { + goto done; + } + + data = _channelqueue_get(chan->queue); + if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) { + chan->open = 0; + } + +done: + PyThread_release_lock(chan->mutex); + if (chan->queue->count == 0) { + _channel_finish_closing(chan); + } + return data; +} + +static int +_channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end) +{ + PyThread_acquire_lock(chan->mutex, WAIT_LOCK); + + int res = -1; + if (!chan->open) { + PyErr_SetString(ChannelClosedError, "channel already closed"); + goto done; + } + + if (_channelends_close_interpreter(chan->ends, interp, end) != 0) { + goto done; + } + chan->open = _channelends_is_open(chan->ends); + + res = 0; +done: + PyThread_release_lock(chan->mutex); + return res; +} + +static int +_channel_close_all(_PyChannelState *chan, int end, int force) +{ + int res = -1; + PyThread_acquire_lock(chan->mutex, WAIT_LOCK); + + if (!chan->open) { + PyErr_SetString(ChannelClosedError, "channel already closed"); + goto done; + } + + if (!force && chan->queue->count > 0) { + PyErr_SetString(ChannelNotEmptyError, + "may not be closed if not empty (try force=True)"); + goto done; + } + + chan->open = 0; + + // We *could* also just leave these in place, since we've marked + // the channel as closed already. + _channelends_close_all(chan->ends, end, force); + + res = 0; +done: + PyThread_release_lock(chan->mutex); + return res; +} + +/* the set of channels */ + +struct _channelref; + +typedef struct _channelref { + int64_t id; + _PyChannelState *chan; + struct _channelref *next; + Py_ssize_t objcount; +} _channelref; + +static _channelref * +_channelref_new(int64_t id, _PyChannelState *chan) +{ + _channelref *ref = PyMem_NEW(_channelref, 1); + if (ref == NULL) { + return NULL; + } + ref->id = id; + ref->chan = chan; + ref->next = NULL; + ref->objcount = 0; + return ref; +} + +//static void +//_channelref_clear(_channelref *ref) +//{ +// ref->id = -1; +// ref->chan = NULL; +// ref->next = NULL; +// ref->objcount = 0; +//} + +static void +_channelref_free(_channelref *ref) +{ + if (ref->chan != NULL) { + _channel_clear_closing(ref->chan); + } + //_channelref_clear(ref); + PyMem_Free(ref); +} + +static _channelref * +_channelref_find(_channelref *first, int64_t id, _channelref **pprev) +{ + _channelref *prev = NULL; + _channelref *ref = first; + while (ref != NULL) { + if (ref->id == id) { + break; + } + prev = ref; + ref = ref->next; + } + if (pprev != NULL) { + *pprev = prev; + } + return ref; +} + +typedef struct _channels { + PyThread_type_lock mutex; + _channelref *head; + int64_t numopen; + int64_t next_id; +} _channels; + +static int +_channels_init(_channels *channels) +{ + if (channels->mutex == NULL) { + channels->mutex = PyThread_allocate_lock(); + if (channels->mutex == NULL) { + PyErr_SetString(ChannelError, + "can't initialize mutex for channel management"); + return -1; + } + } + channels->head = NULL; + channels->numopen = 0; + channels->next_id = 0; + return 0; +} + +static int64_t +_channels_next_id(_channels *channels) // needs lock +{ + int64_t id = channels->next_id; + if (id < 0) { + /* overflow */ + PyErr_SetString(ChannelError, + "failed to get a channel ID"); + return -1; + } + channels->next_id += 1; + return id; +} + +static _PyChannelState * +_channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex) +{ + _PyChannelState *chan = NULL; + PyThread_acquire_lock(channels->mutex, WAIT_LOCK); + if (pmutex != NULL) { + *pmutex = NULL; + } + + _channelref *ref = _channelref_find(channels->head, id, NULL); + if (ref == NULL) { + PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id); + goto done; + } + if (ref->chan == NULL || !ref->chan->open) { + PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id); + goto done; + } + + if (pmutex != NULL) { + // The mutex will be closed by the caller. + *pmutex = channels->mutex; + } + + chan = ref->chan; +done: + if (pmutex == NULL || *pmutex == NULL) { + PyThread_release_lock(channels->mutex); + } + return chan; +} + +static int64_t +_channels_add(_channels *channels, _PyChannelState *chan) +{ + int64_t cid = -1; + PyThread_acquire_lock(channels->mutex, WAIT_LOCK); + + // Create a new ref. + int64_t id = _channels_next_id(channels); + if (id < 0) { + goto done; + } + _channelref *ref = _channelref_new(id, chan); + if (ref == NULL) { + goto done; + } + + // Add it to the list. + // We assume that the channel is a new one (not already in the list). + ref->next = channels->head; + channels->head = ref; + channels->numopen += 1; + + cid = id; +done: + PyThread_release_lock(channels->mutex); + return cid; +} + +/* forward */ +static int _channel_set_closing(struct _channelref *, PyThread_type_lock); + +static int +_channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan, + int end, int force) +{ + int res = -1; + PyThread_acquire_lock(channels->mutex, WAIT_LOCK); + if (pchan != NULL) { + *pchan = NULL; + } + + _channelref *ref = _channelref_find(channels->head, cid, NULL); + if (ref == NULL) { + PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", cid); + goto done; + } + + if (ref->chan == NULL) { + PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid); + goto done; + } + else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) { + PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid); + goto done; + } + else { + if (_channel_close_all(ref->chan, end, force) != 0) { + if (end == CHANNEL_SEND && + PyErr_ExceptionMatches(ChannelNotEmptyError)) { + if (ref->chan->closing != NULL) { + PyErr_Format(ChannelClosedError, + "channel %" PRId64 " closed", cid); + goto done; + } + // Mark the channel as closing and return. The channel + // will be cleaned up in _channel_next(). + PyErr_Clear(); + if (_channel_set_closing(ref, channels->mutex) != 0) { + goto done; + } + if (pchan != NULL) { + *pchan = ref->chan; + } + res = 0; + } + goto done; + } + if (pchan != NULL) { + *pchan = ref->chan; + } + else { + _channel_free(ref->chan); + } + ref->chan = NULL; + } + + res = 0; +done: + PyThread_release_lock(channels->mutex); + return res; +} + +static void +_channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev, + _PyChannelState **pchan) +{ + if (ref == channels->head) { + channels->head = ref->next; + } + else { + prev->next = ref->next; + } + channels->numopen -= 1; + + if (pchan != NULL) { + *pchan = ref->chan; + } + _channelref_free(ref); +} + +static int +_channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan) +{ + int res = -1; + PyThread_acquire_lock(channels->mutex, WAIT_LOCK); + + if (pchan != NULL) { + *pchan = NULL; + } + + _channelref *prev = NULL; + _channelref *ref = _channelref_find(channels->head, id, &prev); + if (ref == NULL) { + PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id); + goto done; + } + + _channels_remove_ref(channels, ref, prev, pchan); + + res = 0; +done: + PyThread_release_lock(channels->mutex); + return res; +} + +static int +_channels_add_id_object(_channels *channels, int64_t id) +{ + int res = -1; + PyThread_acquire_lock(channels->mutex, WAIT_LOCK); + + _channelref *ref = _channelref_find(channels->head, id, NULL); + if (ref == NULL) { + PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id); + goto done; + } + ref->objcount += 1; + + res = 0; +done: + PyThread_release_lock(channels->mutex); + return res; +} + +static void +_channels_drop_id_object(_channels *channels, int64_t id) +{ + PyThread_acquire_lock(channels->mutex, WAIT_LOCK); + + _channelref *prev = NULL; + _channelref *ref = _channelref_find(channels->head, id, &prev); + if (ref == NULL) { + // Already destroyed. + goto done; + } + ref->objcount -= 1; + + // Destroy if no longer used. + if (ref->objcount == 0) { + _PyChannelState *chan = NULL; + _channels_remove_ref(channels, ref, prev, &chan); + if (chan != NULL) { + _channel_free(chan); + } + } + +done: + PyThread_release_lock(channels->mutex); +} + +static int64_t * +_channels_list_all(_channels *channels, int64_t *count) +{ + int64_t *cids = NULL; + PyThread_acquire_lock(channels->mutex, WAIT_LOCK); + int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen)); + if (ids == NULL) { + goto done; + } + _channelref *ref = channels->head; + for (int64_t i=0; ref != NULL; ref = ref->next, i++) { + ids[i] = ref->id; + } + *count = channels->numopen; + + cids = ids; +done: + PyThread_release_lock(channels->mutex); + return cids; +} + +/* support for closing non-empty channels */ + +struct _channel_closing { + struct _channelref *ref; +}; + +static int +_channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) { + struct _channel *chan = ref->chan; + if (chan == NULL) { + // already closed + return 0; + } + int res = -1; + PyThread_acquire_lock(chan->mutex, WAIT_LOCK); + if (chan->closing != NULL) { + PyErr_SetString(ChannelClosedError, "channel closed"); + goto done; + } + chan->closing = PyMem_NEW(struct _channel_closing, 1); + if (chan->closing == NULL) { + goto done; + } + chan->closing->ref = ref; + + res = 0; +done: + PyThread_release_lock(chan->mutex); + return res; +} + +static void +_channel_clear_closing(struct _channel *chan) { + PyThread_acquire_lock(chan->mutex, WAIT_LOCK); + if (chan->closing != NULL) { + PyMem_Free(chan->closing); + chan->closing = NULL; + } + PyThread_release_lock(chan->mutex); +} + +static void +_channel_finish_closing(struct _channel *chan) { + struct _channel_closing *closing = chan->closing; + if (closing == NULL) { + return; + } + _channelref *ref = closing->ref; + _channel_clear_closing(chan); + // Do the things that would have been done in _channels_close(). + ref->chan = NULL; + _channel_free(chan); +} + +/* "high"-level channel-related functions */ + +static int64_t +_channel_create(_channels *channels) +{ + _PyChannelState *chan = _channel_new(); + if (chan == NULL) { + return -1; + } + int64_t id = _channels_add(channels, chan); + if (id < 0) { + _channel_free(chan); + return -1; + } + return id; +} + +static int +_channel_destroy(_channels *channels, int64_t id) +{ + _PyChannelState *chan = NULL; + if (_channels_remove(channels, id, &chan) != 0) { + return -1; + } + if (chan != NULL) { + _channel_free(chan); + } + return 0; +} + +static int +_channel_send(_channels *channels, int64_t id, PyObject *obj) +{ + PyInterpreterState *interp = _get_current(); + if (interp == NULL) { + return -1; + } + + // Look up the channel. + PyThread_type_lock mutex = NULL; + _PyChannelState *chan = _channels_lookup(channels, id, &mutex); + if (chan == NULL) { + return -1; + } + // Past this point we are responsible for releasing the mutex. + + if (chan->closing != NULL) { + PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id); + PyThread_release_lock(mutex); + return -1; + } + + // Convert the object to cross-interpreter data. + _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1); + if (data == NULL) { + PyThread_release_lock(mutex); + return -1; + } + if (_PyObject_GetCrossInterpreterData(obj, data) != 0) { + PyThread_release_lock(mutex); + PyMem_Free(data); + return -1; + } + + // Add the data to the channel. + int res = _channel_add(chan, PyInterpreterState_GetID(interp), data); + PyThread_release_lock(mutex); + if (res != 0) { + _PyCrossInterpreterData_Release(data); + PyMem_Free(data); + return -1; + } + + return 0; +} + +static PyObject * +_channel_recv(_channels *channels, int64_t id) +{ + PyInterpreterState *interp = _get_current(); + if (interp == NULL) { + return NULL; + } + + // Look up the channel. + PyThread_type_lock mutex = NULL; + _PyChannelState *chan = _channels_lookup(channels, id, &mutex); + if (chan == NULL) { + return NULL; + } + // Past this point we are responsible for releasing the mutex. + + // Pop off the next item from the channel. + _PyCrossInterpreterData *data = _channel_next(chan, PyInterpreterState_GetID(interp)); + PyThread_release_lock(mutex); + if (data == NULL) { + return NULL; + } + + // Convert the data back to an object. + PyObject *obj = _PyCrossInterpreterData_NewObject(data); + _PyCrossInterpreterData_Release(data); + PyMem_Free(data); + if (obj == NULL) { + return NULL; + } + + return obj; +} + +static int +_channel_drop(_channels *channels, int64_t id, int send, int recv) +{ + PyInterpreterState *interp = _get_current(); + if (interp == NULL) { + return -1; + } + + // Look up the channel. + PyThread_type_lock mutex = NULL; + _PyChannelState *chan = _channels_lookup(channels, id, &mutex); + if (chan == NULL) { + return -1; + } + // Past this point we are responsible for releasing the mutex. + + // Close one or both of the two ends. + int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv); + PyThread_release_lock(mutex); + return res; +} + +static int +_channel_close(_channels *channels, int64_t id, int end, int force) +{ + return _channels_close(channels, id, NULL, end, force); +} + +static int +_channel_is_associated(_channels *channels, int64_t cid, int64_t interp, + int send) +{ + _PyChannelState *chan = _channels_lookup(channels, cid, NULL); + if (chan == NULL) { + return -1; + } else if (send && chan->closing != NULL) { + PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid); + return -1; + } + + _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv, + interp, NULL); + + return (end != NULL && end->open); +} + +/* ChannelID class */ + +static PyTypeObject ChannelIDtype; + +typedef struct channelid { + PyObject_HEAD + int64_t id; + int end; + int resolve; + _channels *channels; +} channelid; + +static int +channel_id_converter(PyObject *arg, void *ptr) +{ + int64_t cid; + if (PyObject_TypeCheck(arg, &ChannelIDtype)) { + cid = ((channelid *)arg)->id; + } + else if (PyIndex_Check(arg)) { + cid = PyLong_AsLongLong(arg); + if (cid == -1 && PyErr_Occurred()) { + return 0; + } + if (cid < 0) { + PyErr_Format(PyExc_ValueError, + "channel ID must be a non-negative int, got %R", arg); + return 0; + } + } + else { + PyErr_Format(PyExc_TypeError, + "channel ID must be an int, got %.100s", + Py_TYPE(arg)->tp_name); + return 0; + } + *(int64_t *)ptr = cid; + return 1; +} + +static channelid * +newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels, + int force, int resolve) +{ + channelid *self = PyObject_New(channelid, cls); + if (self == NULL) { + return NULL; + } + self->id = cid; + self->end = end; + self->resolve = resolve; + self->channels = channels; + + if (_channels_add_id_object(channels, cid) != 0) { + if (force && PyErr_ExceptionMatches(ChannelNotFoundError)) { + PyErr_Clear(); + } + else { + Py_DECREF((PyObject *)self); + return NULL; + } + } + + return self; +} + +static _channels * _global_channels(void); + +static PyObject * +channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL}; + int64_t cid; + int send = -1; + int recv = -1; + int force = 0; + int resolve = 0; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O&|$pppp:ChannelID.__new__", kwlist, + channel_id_converter, &cid, &send, &recv, &force, &resolve)) + return NULL; + + // Handle "send" and "recv". + if (send == 0 && recv == 0) { + PyErr_SetString(PyExc_ValueError, + "'send' and 'recv' cannot both be False"); + return NULL; + } + + int end = 0; + if (send == 1) { + if (recv == 0 || recv == -1) { + end = CHANNEL_SEND; + } + } + else if (recv == 1) { + end = CHANNEL_RECV; + } + + return (PyObject *)newchannelid(cls, cid, end, _global_channels(), + force, resolve); +} + +static void +channelid_dealloc(PyObject *v) +{ + int64_t cid = ((channelid *)v)->id; + _channels *channels = ((channelid *)v)->channels; + Py_TYPE(v)->tp_free(v); + + _channels_drop_id_object(channels, cid); +} + +static PyObject * +channelid_repr(PyObject *self) +{ + PyTypeObject *type = Py_TYPE(self); + const char *name = _PyType_Name(type); + + channelid *cid = (channelid *)self; + const char *fmt; + if (cid->end == CHANNEL_SEND) { + fmt = "%s(%" PRId64 ", send=True)"; + } + else if (cid->end == CHANNEL_RECV) { + fmt = "%s(%" PRId64 ", recv=True)"; + } + else { + fmt = "%s(%" PRId64 ")"; + } + return PyUnicode_FromFormat(fmt, name, cid->id); +} + +static PyObject * +channelid_str(PyObject *self) +{ + channelid *cid = (channelid *)self; + return PyUnicode_FromFormat("%" PRId64 "", cid->id); +} + +static PyObject * +channelid_int(PyObject *self) +{ + channelid *cid = (channelid *)self; + return PyLong_FromLongLong(cid->id); +} + +static PyNumberMethods channelid_as_number = { + 0, /* nb_add */ + 0, /* nb_subtract */ + 0, /* nb_multiply */ + 0, /* nb_remainder */ + 0, /* nb_divmod */ + 0, /* nb_power */ + 0, /* nb_negative */ + 0, /* nb_positive */ + 0, /* nb_absolute */ + 0, /* nb_bool */ + 0, /* nb_invert */ + 0, /* nb_lshift */ + 0, /* nb_rshift */ + 0, /* nb_and */ + 0, /* nb_xor */ + 0, /* nb_or */ + (unaryfunc)channelid_int, /* nb_int */ + 0, /* nb_reserved */ + 0, /* nb_float */ + + 0, /* nb_inplace_add */ + 0, /* nb_inplace_subtract */ + 0, /* nb_inplace_multiply */ + 0, /* nb_inplace_remainder */ + 0, /* nb_inplace_power */ + 0, /* nb_inplace_lshift */ + 0, /* nb_inplace_rshift */ + 0, /* nb_inplace_and */ + 0, /* nb_inplace_xor */ + 0, /* nb_inplace_or */ + + 0, /* nb_floor_divide */ + 0, /* nb_true_divide */ + 0, /* nb_inplace_floor_divide */ + 0, /* nb_inplace_true_divide */ + + (unaryfunc)channelid_int, /* nb_index */ +}; + +static Py_hash_t +channelid_hash(PyObject *self) +{ + channelid *cid = (channelid *)self; + PyObject *id = PyLong_FromLongLong(cid->id); + if (id == NULL) { + return -1; + } + Py_hash_t hash = PyObject_Hash(id); + Py_DECREF(id); + return hash; +} + +static PyObject * +channelid_richcompare(PyObject *self, PyObject *other, int op) +{ + if (op != Py_EQ && op != Py_NE) { + Py_RETURN_NOTIMPLEMENTED; + } + + if (!PyObject_TypeCheck(self, &ChannelIDtype)) { + Py_RETURN_NOTIMPLEMENTED; + } + + channelid *cid = (channelid *)self; + int equal; + if (PyObject_TypeCheck(other, &ChannelIDtype)) { + channelid *othercid = (channelid *)other; + equal = (cid->end == othercid->end) && (cid->id == othercid->id); + } + else if (PyLong_Check(other)) { + /* Fast path */ + int overflow; + long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow); + if (othercid == -1 && PyErr_Occurred()) { + return NULL; + } + equal = !overflow && (othercid >= 0) && (cid->id == othercid); + } + else if (PyNumber_Check(other)) { + PyObject *pyid = PyLong_FromLongLong(cid->id); + if (pyid == NULL) { + return NULL; + } + PyObject *res = PyObject_RichCompare(pyid, other, op); + Py_DECREF(pyid); + return res; + } + else { + Py_RETURN_NOTIMPLEMENTED; + } + + if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) { + Py_RETURN_TRUE; + } + Py_RETURN_FALSE; +} + +static PyObject * +_channel_from_cid(PyObject *cid, int end) +{ + PyObject *highlevel = PyImport_ImportModule("interpreters"); + if (highlevel == NULL) { + PyErr_Clear(); + highlevel = PyImport_ImportModule("test.support.interpreters"); + if (highlevel == NULL) { + return NULL; + } + } + const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" : + "SendChannel"; + PyObject *cls = PyObject_GetAttrString(highlevel, clsname); + Py_DECREF(highlevel); + if (cls == NULL) { + return NULL; + } + PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL); + Py_DECREF(cls); + if (chan == NULL) { + return NULL; + } + return chan; +} + +struct _channelid_xid { + int64_t id; + int end; + int resolve; +}; + +static PyObject * +_channelid_from_xid(_PyCrossInterpreterData *data) +{ + struct _channelid_xid *xid = (struct _channelid_xid *)data->data; + // Note that we do not preserve the "resolve" flag. + PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end, + _global_channels(), 0, 0); + if (xid->end == 0) { + return cid; + } + if (!xid->resolve) { + return cid; + } + + /* Try returning a high-level channel end but fall back to the ID. */ + PyObject *chan = _channel_from_cid(cid, xid->end); + if (chan == NULL) { + PyErr_Clear(); + return cid; + } + Py_DECREF(cid); + return chan; +} + +static int +_channelid_shared(PyObject *obj, _PyCrossInterpreterData *data) +{ + struct _channelid_xid *xid = PyMem_NEW(struct _channelid_xid, 1); + if (xid == NULL) { + return -1; + } + xid->id = ((channelid *)obj)->id; + xid->end = ((channelid *)obj)->end; + xid->resolve = ((channelid *)obj)->resolve; + + data->data = xid; + Py_INCREF(obj); + data->obj = obj; + data->new_object = _channelid_from_xid; + data->free = PyMem_Free; + return 0; +} + +static PyObject * +channelid_end(PyObject *self, void *end) +{ + int force = 1; + channelid *cid = (channelid *)self; + if (end != NULL) { + return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end, + cid->channels, force, cid->resolve); + } + + if (cid->end == CHANNEL_SEND) { + return PyUnicode_InternFromString("send"); + } + if (cid->end == CHANNEL_RECV) { + return PyUnicode_InternFromString("recv"); + } + return PyUnicode_InternFromString("both"); +} + +static int _channelid_end_send = CHANNEL_SEND; +static int _channelid_end_recv = CHANNEL_RECV; + +static PyGetSetDef channelid_getsets[] = { + {"end", (getter)channelid_end, NULL, + PyDoc_STR("'send', 'recv', or 'both'")}, + {"send", (getter)channelid_end, NULL, + PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send}, + {"recv", (getter)channelid_end, NULL, + PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv}, + {NULL} +}; + +PyDoc_STRVAR(channelid_doc, +"A channel ID identifies a channel and may be used as an int."); + +static PyTypeObject ChannelIDtype = { + PyVarObject_HEAD_INIT(&PyType_Type, 0) + "_xxsubinterpreters.ChannelID", /* tp_name */ + sizeof(channelid), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)channelid_dealloc, /* tp_dealloc */ + 0, /* tp_vectorcall_offset */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_as_async */ + (reprfunc)channelid_repr, /* tp_repr */ + &channelid_as_number, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + channelid_hash, /* tp_hash */ + 0, /* tp_call */ + (reprfunc)channelid_str, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */ + channelid_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + channelid_richcompare, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + 0, /* tp_methods */ + 0, /* tp_members */ + channelid_getsets, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + // Note that we do not set tp_new to channelid_new. Instead we + // set it to NULL, meaning it cannot be instantiated from Python + // code. We do this because there is a strong relationship between + // channel IDs and the channel lifecycle, so this limitation avoids + // related complications. + NULL, /* tp_new */ +}; + + +/* interpreter-specific code ************************************************/ + +static PyObject * RunFailedError = NULL; + +static int +interp_exceptions_init(PyObject *ns) +{ + // XXX Move the exceptions into per-module memory? + + if (RunFailedError == NULL) { + // An uncaught exception came out of interp_run_string(). + RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError", + PyExc_RuntimeError, NULL); + if (RunFailedError == NULL) { + return -1; + } + if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) { + return -1; + } + } + + return 0; +} + +static int +_is_running(PyInterpreterState *interp) +{ + PyThreadState *tstate = PyInterpreterState_ThreadHead(interp); + if (PyThreadState_Next(tstate) != NULL) { + PyErr_SetString(PyExc_RuntimeError, + "interpreter has more than one thread"); + return -1; + } + + assert(!PyErr_Occurred()); + PyFrameObject *frame = PyThreadState_GetFrame(tstate); + if (frame == NULL) { + return 0; + } + + int executing = (int)(frame->f_executing); + Py_DECREF(frame); + + return executing; +} + +static int +_ensure_not_running(PyInterpreterState *interp) +{ + int is_running = _is_running(interp); + if (is_running < 0) { + return -1; + } + if (is_running) { + PyErr_Format(PyExc_RuntimeError, "interpreter already running"); + return -1; + } + return 0; +} + +static int +_run_script(PyInterpreterState *interp, const char *codestr, + _sharedns *shared, _sharedexception **exc) +{ + PyObject *exctype = NULL; + PyObject *excval = NULL; + PyObject *tb = NULL; + + PyObject *main_mod = _PyInterpreterState_GetMainModule(interp); + if (main_mod == NULL) { + goto error; + } + PyObject *ns = PyModule_GetDict(main_mod); // borrowed + Py_DECREF(main_mod); + if (ns == NULL) { + goto error; + } + Py_INCREF(ns); + + // Apply the cross-interpreter data. + if (shared != NULL) { + if (_sharedns_apply(shared, ns) != 0) { + Py_DECREF(ns); + goto error; + } + } + + // Run the string (see PyRun_SimpleStringFlags). + PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL); + Py_DECREF(ns); + if (result == NULL) { + goto error; + } + else { + Py_DECREF(result); // We throw away the result. + } + + *exc = NULL; + return 0; + +error: + PyErr_Fetch(&exctype, &excval, &tb); + + _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb); + Py_XDECREF(exctype); + Py_XDECREF(excval); + Py_XDECREF(tb); + if (sharedexc == NULL) { + fprintf(stderr, "RunFailedError: script raised an uncaught exception"); + PyErr_Clear(); + sharedexc = NULL; + } + else { + assert(!PyErr_Occurred()); + } + *exc = sharedexc; + return -1; +} + +static int +_run_script_in_interpreter(PyInterpreterState *interp, const char *codestr, + PyObject *shareables) +{ + if (_ensure_not_running(interp) < 0) { + return -1; + } + + _sharedns *shared = _get_shared_ns(shareables); + if (shared == NULL && PyErr_Occurred()) { + return -1; + } + + // Switch to interpreter. + PyThreadState *save_tstate = NULL; + if (interp != PyInterpreterState_Get()) { + // XXX Using the "head" thread isn't strictly correct. + PyThreadState *tstate = PyInterpreterState_ThreadHead(interp); + // XXX Possible GILState issues? + save_tstate = PyThreadState_Swap(tstate); + } + + // Run the script. + _sharedexception *exc = NULL; + int result = _run_script(interp, codestr, shared, &exc); + + // Switch back. + if (save_tstate != NULL) { + PyThreadState_Swap(save_tstate); + } + + // Propagate any exception out to the caller. + if (exc != NULL) { + _sharedexception_apply(exc, RunFailedError); + _sharedexception_free(exc); + } + else if (result != 0) { + // We were unable to allocate a shared exception. + PyErr_NoMemory(); + } + + if (shared != NULL) { + _sharedns_free(shared); + } + + return result; +} + + +/* module level code ********************************************************/ + +/* globals is the process-global state for the module. It holds all + the data that we need to share between interpreters, so it cannot + hold PyObject values. */ +static struct globals { + _channels channels; +} _globals = {{0}}; + +static int +_init_globals(void) +{ + if (_channels_init(&_globals.channels) != 0) { + return -1; + } + return 0; +} + +static _channels * +_global_channels(void) { + return &_globals.channels; +} + +static PyObject * +interp_create(PyObject *self, PyObject *args, PyObject *kwds) +{ + + static char *kwlist[] = {"isolated", NULL}; + int isolated = 1; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|$i:create", kwlist, + &isolated)) { + return NULL; + } + + // Create and initialize the new interpreter. + PyThreadState *save_tstate = PyThreadState_Get(); + // XXX Possible GILState issues? + PyThreadState *tstate = _Py_NewInterpreter(isolated); + PyThreadState_Swap(save_tstate); + if (tstate == NULL) { + /* Since no new thread state was created, there is no exception to + propagate; raise a fresh one after swapping in the old thread + state. */ + PyErr_SetString(PyExc_RuntimeError, "interpreter creation failed"); + return NULL; + } + PyInterpreterState *interp = PyThreadState_GetInterpreter(tstate); + PyObject *idobj = _PyInterpreterState_GetIDObject(interp); + if (idobj == NULL) { + // XXX Possible GILState issues? + save_tstate = PyThreadState_Swap(tstate); + Py_EndInterpreter(tstate); + PyThreadState_Swap(save_tstate); + return NULL; + } + _PyInterpreterState_RequireIDRef(interp, 1); + return idobj; +} + +PyDoc_STRVAR(create_doc, +"create() -> ID\n\ +\n\ +Create a new interpreter and return a unique generated ID."); + + +static PyObject * +interp_destroy(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"id", NULL}; + PyObject *id; + // XXX Use "L" for id? + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O:destroy", kwlist, &id)) { + return NULL; + } + + // Look up the interpreter. + PyInterpreterState *interp = _PyInterpreterID_LookUp(id); + if (interp == NULL) { + return NULL; + } + + // Ensure we don't try to destroy the current interpreter. + PyInterpreterState *current = _get_current(); + if (current == NULL) { + return NULL; + } + if (interp == current) { + PyErr_SetString(PyExc_RuntimeError, + "cannot destroy the current interpreter"); + return NULL; + } + + // Ensure the interpreter isn't running. + /* XXX We *could* support destroying a running interpreter but + aren't going to worry about it for now. */ + if (_ensure_not_running(interp) < 0) { + return NULL; + } + + // Destroy the interpreter. + PyThreadState *tstate = PyInterpreterState_ThreadHead(interp); + // XXX Possible GILState issues? + PyThreadState *save_tstate = PyThreadState_Swap(tstate); + Py_EndInterpreter(tstate); + PyThreadState_Swap(save_tstate); + + Py_RETURN_NONE; +} + +PyDoc_STRVAR(destroy_doc, +"destroy(id)\n\ +\n\ +Destroy the identified interpreter.\n\ +\n\ +Attempting to destroy the current interpreter results in a RuntimeError.\n\ +So does an unrecognized ID."); + + +static PyObject * +interp_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) +{ + PyObject *ids, *id; + PyInterpreterState *interp; + + ids = PyList_New(0); + if (ids == NULL) { + return NULL; + } + + interp = PyInterpreterState_Head(); + while (interp != NULL) { + id = _PyInterpreterState_GetIDObject(interp); + if (id == NULL) { + Py_DECREF(ids); + return NULL; + } + // insert at front of list + int res = PyList_Insert(ids, 0, id); + Py_DECREF(id); + if (res < 0) { + Py_DECREF(ids); + return NULL; + } + + interp = PyInterpreterState_Next(interp); + } + + return ids; +} + +PyDoc_STRVAR(list_all_doc, +"list_all() -> [ID]\n\ +\n\ +Return a list containing the ID of every existing interpreter."); + + +static PyObject * +interp_get_current(PyObject *self, PyObject *Py_UNUSED(ignored)) +{ + PyInterpreterState *interp =_get_current(); + if (interp == NULL) { + return NULL; + } + return _PyInterpreterState_GetIDObject(interp); +} + +PyDoc_STRVAR(get_current_doc, +"get_current() -> ID\n\ +\n\ +Return the ID of current interpreter."); + + +static PyObject * +interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored)) +{ + // Currently, 0 is always the main interpreter. + int64_t id = 0; + return _PyInterpreterID_New(id); +} + +PyDoc_STRVAR(get_main_doc, +"get_main() -> ID\n\ +\n\ +Return the ID of main interpreter."); + + +static PyObject * +interp_run_string(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"id", "script", "shared", NULL}; + PyObject *id, *code; + PyObject *shared = NULL; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "OU|O:run_string", kwlist, + &id, &code, &shared)) { + return NULL; + } + + // Look up the interpreter. + PyInterpreterState *interp = _PyInterpreterID_LookUp(id); + if (interp == NULL) { + return NULL; + } + + // Extract code. + Py_ssize_t size; + const char *codestr = PyUnicode_AsUTF8AndSize(code, &size); + if (codestr == NULL) { + return NULL; + } + if (strlen(codestr) != (size_t)size) { + PyErr_SetString(PyExc_ValueError, + "source code string cannot contain null bytes"); + return NULL; + } + + // Run the code in the interpreter. + if (_run_script_in_interpreter(interp, codestr, shared) != 0) { + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(run_string_doc, +"run_string(id, script, shared)\n\ +\n\ +Execute the provided string in the identified interpreter.\n\ +\n\ +See PyRun_SimpleStrings."); + + +static PyObject * +object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"obj", NULL}; + PyObject *obj; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O:is_shareable", kwlist, &obj)) { + return NULL; + } + + if (_PyObject_CheckCrossInterpreterData(obj) == 0) { + Py_RETURN_TRUE; + } + PyErr_Clear(); + Py_RETURN_FALSE; +} + +PyDoc_STRVAR(is_shareable_doc, +"is_shareable(obj) -> bool\n\ +\n\ +Return True if the object's data may be shared between interpreters and\n\ +False otherwise."); + + +static PyObject * +interp_is_running(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"id", NULL}; + PyObject *id; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O:is_running", kwlist, &id)) { + return NULL; + } + + PyInterpreterState *interp = _PyInterpreterID_LookUp(id); + if (interp == NULL) { + return NULL; + } + int is_running = _is_running(interp); + if (is_running < 0) { + return NULL; + } + if (is_running) { + Py_RETURN_TRUE; + } + Py_RETURN_FALSE; +} + +PyDoc_STRVAR(is_running_doc, +"is_running(id) -> bool\n\ +\n\ +Return whether or not the identified interpreter is running."); + +static PyObject * +channel_create(PyObject *self, PyObject *Py_UNUSED(ignored)) +{ + int64_t cid = _channel_create(&_globals.channels); + if (cid < 0) { + return NULL; + } + PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0, + &_globals.channels, 0, 0); + if (id == NULL) { + if (_channel_destroy(&_globals.channels, cid) != 0) { + // XXX issue a warning? + } + return NULL; + } + assert(((channelid *)id)->channels != NULL); + return id; +} + +PyDoc_STRVAR(channel_create_doc, +"channel_create() -> cid\n\ +\n\ +Create a new cross-interpreter channel and return a unique generated ID."); + +static PyObject * +channel_destroy(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", NULL}; + int64_t cid; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist, + channel_id_converter, &cid)) { + return NULL; + } + + if (_channel_destroy(&_globals.channels, cid) != 0) { + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(channel_destroy_doc, +"channel_destroy(cid)\n\ +\n\ +Close and finalize the channel. Afterward attempts to use the channel\n\ +will behave as though it never existed."); + +static PyObject * +channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) +{ + int64_t count = 0; + int64_t *cids = _channels_list_all(&_globals.channels, &count); + if (cids == NULL) { + if (count == 0) { + return PyList_New(0); + } + return NULL; + } + PyObject *ids = PyList_New((Py_ssize_t)count); + if (ids == NULL) { + goto finally; + } + int64_t *cur = cids; + for (int64_t i=0; i < count; cur++, i++) { + PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0, + &_globals.channels, 0, 0); + if (id == NULL) { + Py_DECREF(ids); + ids = NULL; + break; + } + PyList_SET_ITEM(ids, i, id); + } + +finally: + PyMem_Free(cids); + return ids; +} + +PyDoc_STRVAR(channel_list_all_doc, +"channel_list_all() -> [cid]\n\ +\n\ +Return the list of all IDs for active channels."); + +static PyObject * +channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", "send", NULL}; + int64_t cid; /* Channel ID */ + int send = 0; /* Send or receive end? */ + int64_t id; + PyObject *ids, *id_obj; + PyInterpreterState *interp; + + if (!PyArg_ParseTupleAndKeywords( + args, kwds, "O&$p:channel_list_interpreters", + kwlist, channel_id_converter, &cid, &send)) { + return NULL; + } + + ids = PyList_New(0); + if (ids == NULL) { + goto except; + } + + interp = PyInterpreterState_Head(); + while (interp != NULL) { + id = PyInterpreterState_GetID(interp); + assert(id >= 0); + int res = _channel_is_associated(&_globals.channels, cid, id, send); + if (res < 0) { + goto except; + } + if (res) { + id_obj = _PyInterpreterState_GetIDObject(interp); + if (id_obj == NULL) { + goto except; + } + res = PyList_Insert(ids, 0, id_obj); + Py_DECREF(id_obj); + if (res < 0) { + goto except; + } + } + interp = PyInterpreterState_Next(interp); + } + + goto finally; + +except: + Py_XDECREF(ids); + ids = NULL; + +finally: + return ids; +} + +PyDoc_STRVAR(channel_list_interpreters_doc, +"channel_list_interpreters(cid, *, send) -> [id]\n\ +\n\ +Return the list of all interpreter IDs associated with an end of the channel.\n\ +\n\ +The 'send' argument should be a boolean indicating whether to use the send or\n\ +receive end."); + + +static PyObject * +channel_send(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", "obj", NULL}; + int64_t cid; + PyObject *obj; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist, + channel_id_converter, &cid, &obj)) { + return NULL; + } + + if (_channel_send(&_globals.channels, cid, obj) != 0) { + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(channel_send_doc, +"channel_send(cid, obj)\n\ +\n\ +Add the object's data to the channel's queue."); + +static PyObject * +channel_recv(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", "default", NULL}; + int64_t cid; + PyObject *dflt = NULL; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist, + channel_id_converter, &cid, &dflt)) { + return NULL; + } + Py_XINCREF(dflt); + + PyObject *obj = _channel_recv(&_globals.channels, cid); + if (obj != NULL) { + Py_XDECREF(dflt); + return obj; + } else if (PyErr_Occurred()) { + Py_XDECREF(dflt); + return NULL; + } else if (dflt != NULL) { + return dflt; + } else { + PyErr_Format(ChannelEmptyError, "channel %" PRId64 " is empty", cid); + return NULL; + } +} + +PyDoc_STRVAR(channel_recv_doc, +"channel_recv(cid, [default]) -> obj\n\ +\n\ +Return a new object from the data at the front of the channel's queue.\n\ +\n\ +If there is nothing to receive then raise ChannelEmptyError, unless\n\ +a default value is provided. In that case return it."); + +static PyObject * +channel_close(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", "send", "recv", "force", NULL}; + int64_t cid; + int send = 0; + int recv = 0; + int force = 0; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O&|$ppp:channel_close", kwlist, + channel_id_converter, &cid, &send, &recv, &force)) { + return NULL; + } + + if (_channel_close(&_globals.channels, cid, send-recv, force) != 0) { + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(channel_close_doc, +"channel_close(cid, *, send=None, recv=None, force=False)\n\ +\n\ +Close the channel for all interpreters.\n\ +\n\ +If the channel is empty then the keyword args are ignored and both\n\ +ends are immediately closed. Otherwise, if 'force' is True then\n\ +all queued items are released and both ends are immediately\n\ +closed.\n\ +\n\ +If the channel is not empty *and* 'force' is False then following\n\ +happens:\n\ +\n\ + * recv is True (regardless of send):\n\ + - raise ChannelNotEmptyError\n\ + * recv is None and send is None:\n\ + - raise ChannelNotEmptyError\n\ + * send is True and recv is not True:\n\ + - fully close the 'send' end\n\ + - close the 'recv' end to interpreters not already receiving\n\ + - fully close it once empty\n\ +\n\ +Closing an already closed channel results in a ChannelClosedError.\n\ +\n\ +Once the channel's ID has no more ref counts in any interpreter\n\ +the channel will be destroyed."); + +static PyObject * +channel_release(PyObject *self, PyObject *args, PyObject *kwds) +{ + // Note that only the current interpreter is affected. + static char *kwlist[] = {"cid", "send", "recv", "force", NULL}; + int64_t cid; + int send = 0; + int recv = 0; + int force = 0; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O&|$ppp:channel_release", kwlist, + channel_id_converter, &cid, &send, &recv, &force)) { + return NULL; + } + if (send == 0 && recv == 0) { + send = 1; + recv = 1; + } + + // XXX Handle force is True. + // XXX Fix implicit release. + + if (_channel_drop(&_globals.channels, cid, send, recv) != 0) { + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(channel_release_doc, +"channel_release(cid, *, send=None, recv=None, force=True)\n\ +\n\ +Close the channel for the current interpreter. 'send' and 'recv'\n\ +(bool) may be used to indicate the ends to close. By default both\n\ +ends are closed. Closing an already closed end is a noop."); + +static PyObject * +channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds) +{ + return channelid_new(&ChannelIDtype, args, kwds); +} + +static PyMethodDef module_functions[] = { + {"create", (PyCFunction)(void(*)(void))interp_create, + METH_VARARGS | METH_KEYWORDS, create_doc}, + {"destroy", (PyCFunction)(void(*)(void))interp_destroy, + METH_VARARGS | METH_KEYWORDS, destroy_doc}, + {"list_all", interp_list_all, + METH_NOARGS, list_all_doc}, + {"get_current", interp_get_current, + METH_NOARGS, get_current_doc}, + {"get_main", interp_get_main, + METH_NOARGS, get_main_doc}, + {"is_running", (PyCFunction)(void(*)(void))interp_is_running, + METH_VARARGS | METH_KEYWORDS, is_running_doc}, + {"run_string", (PyCFunction)(void(*)(void))interp_run_string, + METH_VARARGS | METH_KEYWORDS, run_string_doc}, + + {"is_shareable", (PyCFunction)(void(*)(void))object_is_shareable, + METH_VARARGS | METH_KEYWORDS, is_shareable_doc}, + + {"channel_create", channel_create, + METH_NOARGS, channel_create_doc}, + {"channel_destroy", (PyCFunction)(void(*)(void))channel_destroy, + METH_VARARGS | METH_KEYWORDS, channel_destroy_doc}, + {"channel_list_all", channel_list_all, + METH_NOARGS, channel_list_all_doc}, + {"channel_list_interpreters", (PyCFunction)(void(*)(void))channel_list_interpreters, + METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc}, + {"channel_send", (PyCFunction)(void(*)(void))channel_send, + METH_VARARGS | METH_KEYWORDS, channel_send_doc}, + {"channel_recv", (PyCFunction)(void(*)(void))channel_recv, + METH_VARARGS | METH_KEYWORDS, channel_recv_doc}, + {"channel_close", (PyCFunction)(void(*)(void))channel_close, + METH_VARARGS | METH_KEYWORDS, channel_close_doc}, + {"channel_release", (PyCFunction)(void(*)(void))channel_release, + METH_VARARGS | METH_KEYWORDS, channel_release_doc}, + {"_channel_id", (PyCFunction)(void(*)(void))channel__channel_id, + METH_VARARGS | METH_KEYWORDS, NULL}, + + {NULL, NULL} /* sentinel */ +}; + + +/* initialization function */ + +PyDoc_STRVAR(module_doc, +"This module provides primitive operations to manage Python interpreters.\n\ +The 'interpreters' module provides a more convenient interface."); + +static struct PyModuleDef interpretersmodule = { + PyModuleDef_HEAD_INIT, + "_xxsubinterpreters", /* m_name */ + module_doc, /* m_doc */ + -1, /* m_size */ + module_functions, /* m_methods */ + NULL, /* m_slots */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL /* m_free */ +}; + + +PyMODINIT_FUNC +PyInit__xxsubinterpreters(void) +{ + if (_init_globals() != 0) { + return NULL; + } + + /* Initialize types */ + if (PyType_Ready(&ChannelIDtype) != 0) { + return NULL; + } + + /* Create the module */ + PyObject *module = PyModule_Create(&interpretersmodule); + if (module == NULL) { + return NULL; + } + + /* Add exception types */ + PyObject *ns = PyModule_GetDict(module); // borrowed + if (interp_exceptions_init(ns) != 0) { + return NULL; + } + if (channel_exceptions_init(ns) != 0) { + return NULL; + } + + /* Add other types */ + Py_INCREF(&ChannelIDtype); + if (PyDict_SetItemString(ns, "ChannelID", (PyObject *)&ChannelIDtype) != 0) { + return NULL; + } + Py_INCREF(&_PyInterpreterID_Type); + if (PyDict_SetItemString(ns, "InterpreterID", (PyObject *)&_PyInterpreterID_Type) != 0) { + return NULL; + } + + if (_PyCrossInterpreterData_RegisterClass(&ChannelIDtype, _channelid_shared)) { + return NULL; + } + + return module; +} |