aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python/src/Modules/_multiprocessing/socket_connection.c
diff options
context:
space:
mode:
authornkozlovskiy <nmk@ydb.tech>2023-09-29 12:24:06 +0300
committernkozlovskiy <nmk@ydb.tech>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/tools/python/src/Modules/_multiprocessing/socket_connection.c
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
downloadydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz
add ydb deps
Diffstat (limited to 'contrib/tools/python/src/Modules/_multiprocessing/socket_connection.c')
-rw-r--r--contrib/tools/python/src/Modules/_multiprocessing/socket_connection.c277
1 files changed, 277 insertions, 0 deletions
diff --git a/contrib/tools/python/src/Modules/_multiprocessing/socket_connection.c b/contrib/tools/python/src/Modules/_multiprocessing/socket_connection.c
new file mode 100644
index 0000000000..bdb0a32e37
--- /dev/null
+++ b/contrib/tools/python/src/Modules/_multiprocessing/socket_connection.c
@@ -0,0 +1,277 @@
+/*
+ * A type which wraps a socket
+ *
+ * socket_connection.c
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#include "multiprocessing.h"
+
+#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
+# include "poll.h"
+#endif
+
+#ifdef MS_WINDOWS
+# define WRITE(h, buffer, length) send((SOCKET)h, buffer, length, 0)
+# define READ(h, buffer, length) recv((SOCKET)h, buffer, length, 0)
+# define CLOSE(h) closesocket((SOCKET)h)
+#else
+# define WRITE(h, buffer, length) write(h, buffer, length)
+# define READ(h, buffer, length) read(h, buffer, length)
+# define CLOSE(h) close(h)
+#endif
+
+/*
+ * Wrapper for PyErr_CheckSignals() which can be called without the GIL
+ */
+
+static int
+check_signals(void)
+{
+ PyGILState_STATE state;
+ int res;
+ state = PyGILState_Ensure();
+ res = PyErr_CheckSignals();
+ PyGILState_Release(state);
+ return res;
+}
+
+/*
+ * Send string to file descriptor
+ */
+
+static Py_ssize_t
+_conn_sendall(HANDLE h, char *string, size_t length)
+{
+ char *p = string;
+ Py_ssize_t res;
+
+ while (length > 0) {
+ res = WRITE(h, p, length);
+ if (res < 0) {
+ if (errno == EINTR) {
+ if (check_signals() < 0)
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ continue;
+ }
+ return MP_SOCKET_ERROR;
+ }
+ length -= res;
+ p += res;
+ }
+
+ return MP_SUCCESS;
+}
+
+/*
+ * Receive string of exact length from file descriptor
+ */
+
+static Py_ssize_t
+_conn_recvall(HANDLE h, char *buffer, size_t length)
+{
+ size_t remaining = length;
+ Py_ssize_t temp;
+ char *p = buffer;
+
+ while (remaining > 0) {
+ temp = READ(h, p, remaining);
+ if (temp < 0) {
+ if (errno == EINTR) {
+ if (check_signals() < 0)
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ continue;
+ }
+ return temp;
+ }
+ else if (temp == 0) {
+ return remaining == length ? MP_END_OF_FILE : MP_EARLY_END_OF_FILE;
+ }
+ remaining -= temp;
+ p += temp;
+ }
+
+ return MP_SUCCESS;
+}
+
+/*
+ * Send a string prepended by the string length in network byte order
+ */
+
+static Py_ssize_t
+conn_send_string(ConnectionObject *conn, char *string, size_t length)
+{
+ Py_ssize_t res;
+ /* The "header" of the message is a 32 bit unsigned number (in
+ network order) which specifies the length of the "body". If
+ the message is shorter than about 16kb then it is quicker to
+ combine the "header" and the "body" of the message and send
+ them at once. */
+ if (length < (16*1024)) {
+ char *message;
+
+ message = PyMem_Malloc(length+4);
+ if (message == NULL)
+ return MP_MEMORY_ERROR;
+
+ *(UINT32*)message = htonl((UINT32)length);
+ memcpy(message+4, string, length);
+ Py_BEGIN_ALLOW_THREADS
+ res = _conn_sendall(conn->handle, message, length+4);
+ Py_END_ALLOW_THREADS
+ PyMem_Free(message);
+ } else {
+ UINT32 lenbuff;
+
+ if (length > MAX_MESSAGE_LENGTH)
+ return MP_BAD_MESSAGE_LENGTH;
+
+ lenbuff = htonl((UINT32)length);
+ Py_BEGIN_ALLOW_THREADS
+ res = _conn_sendall(conn->handle, (char*)&lenbuff, 4) ||
+ _conn_sendall(conn->handle, string, length);
+ Py_END_ALLOW_THREADS
+ }
+ return res;
+}
+
+/*
+ * Attempts to read into buffer, or failing that into *newbuffer
+ *
+ * Returns number of bytes read.
+ */
+
+static Py_ssize_t
+conn_recv_string(ConnectionObject *conn, char *buffer,
+ size_t buflength, char **newbuffer, size_t maxlength)
+{
+ Py_ssize_t res;
+ UINT32 ulength;
+
+ *newbuffer = NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ res = _conn_recvall(conn->handle, (char*)&ulength, 4);
+ Py_END_ALLOW_THREADS
+ if (res < 0)
+ return res;
+
+ ulength = ntohl(ulength);
+ if (ulength > maxlength)
+ return MP_BAD_MESSAGE_LENGTH;
+
+ if (ulength > buflength) {
+ *newbuffer = buffer = PyMem_Malloc((size_t)ulength);
+ if (buffer == NULL)
+ return MP_MEMORY_ERROR;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ res = _conn_recvall(conn->handle, buffer, (size_t)ulength);
+ Py_END_ALLOW_THREADS
+
+ if (res >= 0) {
+ res = (Py_ssize_t)ulength;
+ } else if (*newbuffer != NULL) {
+ PyMem_Free(*newbuffer);
+ *newbuffer = NULL;
+ }
+ return res;
+}
+
+/*
+ * Check whether any data is available for reading -- neg timeout blocks
+ */
+
+static int
+conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
+{
+#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
+ int res;
+ struct pollfd p;
+
+ p.fd = (int)conn->handle;
+ p.events = POLLIN | POLLPRI;
+ p.revents = 0;
+
+ if (timeout < 0) {
+ do {
+ res = poll(&p, 1, -1);
+ } while (res < 0 && errno == EINTR);
+ } else {
+ res = poll(&p, 1, (int)(timeout * 1000 + 0.5));
+ if (res < 0 && errno == EINTR) {
+ /* We were interrupted by a signal. Just indicate a
+ timeout even though we are early. */
+ return FALSE;
+ }
+ }
+
+ if (res < 0) {
+ return MP_SOCKET_ERROR;
+ } else if (p.revents & (POLLNVAL|POLLERR)) {
+ Py_BLOCK_THREADS
+ PyErr_SetString(PyExc_IOError, "poll() gave POLLNVAL or POLLERR");
+ Py_UNBLOCK_THREADS
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ } else if (p.revents != 0) {
+ return TRUE;
+ } else {
+ assert(res == 0);
+ return FALSE;
+ }
+#else
+ int res;
+ fd_set rfds;
+
+ /*
+ * Verify the handle, issue 3321. Not required for windows.
+ */
+ #ifndef MS_WINDOWS
+ if (((int)conn->handle) < 0 || ((int)conn->handle) >= FD_SETSIZE) {
+ Py_BLOCK_THREADS
+ PyErr_SetString(PyExc_IOError, "handle out of range in select()");
+ Py_UNBLOCK_THREADS
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ }
+ #endif
+
+ FD_ZERO(&rfds);
+ FD_SET((SOCKET)conn->handle, &rfds);
+
+ if (timeout < 0.0) {
+ do {
+ res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL);
+ } while (res < 0 && errno == EINTR);
+ } else {
+ struct timeval tv;
+ tv.tv_sec = (long)timeout;
+ tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5);
+ res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv);
+ if (res < 0 && errno == EINTR) {
+ /* We were interrupted by a signal. Just indicate a
+ timeout even though we are early. */
+ return FALSE;
+ }
+ }
+
+ if (res < 0) {
+ return MP_SOCKET_ERROR;
+ } else if (FD_ISSET(conn->handle, &rfds)) {
+ return TRUE;
+ } else {
+ assert(res == 0);
+ return FALSE;
+ }
+#endif
+}
+
+/*
+ * "connection.h" defines the Connection type using defs above
+ */
+
+#define CONNECTION_NAME "Connection"
+#define CONNECTION_TYPE ConnectionType
+
+#include "connection.h"