aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/restricted/aws/aws-c-io/source/posix/pipe.c
diff options
context:
space:
mode:
authororivej <orivej@yandex-team.ru>2022-02-10 16:44:49 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:49 +0300
commit718c552901d703c502ccbefdfc3c9028d608b947 (patch)
tree46534a98bbefcd7b1f3faa5b52c138ab27db75b7 /contrib/restricted/aws/aws-c-io/source/posix/pipe.c
parente9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (diff)
downloadydb-718c552901d703c502ccbefdfc3c9028d608b947.tar.gz
Restoring authorship annotation for <orivej@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/restricted/aws/aws-c-io/source/posix/pipe.c')
-rw-r--r--contrib/restricted/aws/aws-c-io/source/posix/pipe.c1166
1 files changed, 583 insertions, 583 deletions
diff --git a/contrib/restricted/aws/aws-c-io/source/posix/pipe.c b/contrib/restricted/aws/aws-c-io/source/posix/pipe.c
index 141cd05cbe..049a15d690 100644
--- a/contrib/restricted/aws/aws-c-io/source/posix/pipe.c
+++ b/contrib/restricted/aws/aws-c-io/source/posix/pipe.c
@@ -1,583 +1,583 @@
-/**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
-
-#include <aws/io/pipe.h>
-
-#include <aws/io/event_loop.h>
-
-#ifdef __GLIBC__
-# define __USE_GNU
-#endif
-
-/* TODO: move this detection to CMAKE and a config header */
-#if !defined(COMPAT_MODE) && defined(__GLIBC__) && __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 9
-# define HAVE_PIPE2 1
-#else
-# define HAVE_PIPE2 0
-#endif
-
-#include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
-
-/* This isn't defined on ancient linux distros (breaking the builds).
- * However, if this is a prebuild, we purposely build on an ancient system, but
- * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
- * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
- * gets passed as long as it does.
- */
-#ifndef O_CLOEXEC
-# define O_CLOEXEC 02000000
-#endif
-
-struct read_end_impl {
- struct aws_allocator *alloc;
- struct aws_io_handle handle;
- struct aws_event_loop *event_loop;
- aws_pipe_on_readable_fn *on_readable_user_callback;
- void *on_readable_user_data;
-
- /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up.
- * If clean_up() sees that the pointer is set, the bool it points to will get set true. */
- bool *did_user_callback_clean_up_read_end;
-
- bool is_subscribed;
-};
-
-struct write_request {
- struct aws_byte_cursor original_cursor;
- struct aws_byte_cursor cursor; /* tracks progress of write */
- size_t num_bytes_written;
- aws_pipe_on_write_completed_fn *user_callback;
- void *user_data;
- struct aws_linked_list_node list_node;
-
- /* True if the write-end is cleaned up while the user callback is being invoked */
- bool did_user_callback_clean_up_write_end;
-};
-
-struct write_end_impl {
- struct aws_allocator *alloc;
- struct aws_io_handle handle;
- struct aws_event_loop *event_loop;
- struct aws_linked_list write_list;
-
- /* Valid while invoking user callback on a completed write request. */
- struct write_request *currently_invoking_write_callback;
-
- bool is_writable;
-
- /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated write_request around
- * and re-using it whenever possible */
-};
-
-static void s_write_end_on_event(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- void *user_data);
-
-static int s_translate_posix_error(int err) {
- AWS_ASSERT(err);
-
- switch (err) {
- case EPIPE:
- return AWS_IO_BROKEN_PIPE;
- default:
- return AWS_ERROR_SYS_CALL_FAILURE;
- }
-}
-
-static int s_raise_posix_error(int err) {
- return aws_raise_error(s_translate_posix_error(err));
-}
-
-AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) {
- int err;
-
-#if HAVE_PIPE2
- err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC);
- if (err) {
- return s_raise_posix_error(err);
- }
-
- return AWS_OP_SUCCESS;
-#else
- err = pipe(pipe_fds);
- if (err) {
- return s_raise_posix_error(err);
- }
-
- for (int i = 0; i < 2; ++i) {
- int flags = fcntl(pipe_fds[i], F_GETFL);
- if (flags == -1) {
- s_raise_posix_error(err);
- goto error;
- }
-
- flags |= O_NONBLOCK | O_CLOEXEC;
- if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) {
- s_raise_posix_error(err);
- goto error;
- }
- }
-
- return AWS_OP_SUCCESS;
-error:
- close(pipe_fds[0]);
- close(pipe_fds[1]);
- return AWS_OP_ERR;
-#endif
-}
-
-int aws_pipe_init(
- struct aws_pipe_read_end *read_end,
- struct aws_event_loop *read_end_event_loop,
- struct aws_pipe_write_end *write_end,
- struct aws_event_loop *write_end_event_loop,
- struct aws_allocator *allocator) {
-
- AWS_ASSERT(read_end);
- AWS_ASSERT(read_end_event_loop);
- AWS_ASSERT(write_end);
- AWS_ASSERT(write_end_event_loop);
- AWS_ASSERT(allocator);
-
- AWS_ZERO_STRUCT(*read_end);
- AWS_ZERO_STRUCT(*write_end);
-
- struct read_end_impl *read_impl = NULL;
- struct write_end_impl *write_impl = NULL;
- int err;
-
- /* Open pipe */
- int pipe_fds[2];
- err = aws_open_nonblocking_posix_pipe(pipe_fds);
- if (err) {
- return AWS_OP_ERR;
- }
-
- /* Init read-end */
- read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl));
- if (!read_impl) {
- goto error;
- }
-
- read_impl->alloc = allocator;
- read_impl->handle.data.fd = pipe_fds[0];
- read_impl->event_loop = read_end_event_loop;
-
- /* Init write-end */
- write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl));
- if (!write_impl) {
- goto error;
- }
-
- write_impl->alloc = allocator;
- write_impl->handle.data.fd = pipe_fds[1];
- write_impl->event_loop = write_end_event_loop;
- write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */
- aws_linked_list_init(&write_impl->write_list);
-
- read_end->impl_data = read_impl;
- write_end->impl_data = write_impl;
-
- err = aws_event_loop_subscribe_to_io_events(
- write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end);
- if (err) {
- goto error;
- }
-
- return AWS_OP_SUCCESS;
-
-error:
- close(pipe_fds[0]);
- close(pipe_fds[1]);
-
- if (read_impl) {
- aws_mem_release(allocator, read_impl);
- }
-
- if (write_impl) {
- aws_mem_release(allocator, write_impl);
- }
-
- read_end->impl_data = NULL;
- write_end->impl_data = NULL;
-
- return AWS_OP_ERR;
-}
-
-int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) {
- struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
-
- if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
-
- if (read_impl->is_subscribed) {
- int err = aws_pipe_unsubscribe_from_readable_events(read_end);
- if (err) {
- return AWS_OP_ERR;
- }
- }
-
- /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */
- if (read_impl->did_user_callback_clean_up_read_end) {
- *read_impl->did_user_callback_clean_up_read_end = true;
- }
-
- close(read_impl->handle.data.fd);
-
- aws_mem_release(read_impl->alloc, read_impl);
- AWS_ZERO_STRUCT(*read_end);
- return AWS_OP_SUCCESS;
-}
-
-struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) {
- const struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- aws_raise_error(AWS_IO_BROKEN_PIPE);
- return NULL;
- }
-
- return read_impl->event_loop;
-}
-
-struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) {
- const struct write_end_impl *write_impl = write_end->impl_data;
- if (!write_impl) {
- aws_raise_error(AWS_IO_BROKEN_PIPE);
- return NULL;
- }
-
- return write_impl->event_loop;
-}
-
-int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) {
- AWS_ASSERT(dst_buffer && dst_buffer->buffer);
-
- struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
-
- if (num_bytes_read) {
- *num_bytes_read = 0;
- }
-
- size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len;
-
- ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read);
-
- if (read_val < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
- }
- return s_raise_posix_error(errno);
- }
-
- /* Success */
- dst_buffer->len += read_val;
-
- if (num_bytes_read) {
- *num_bytes_read = read_val;
- }
-
- return AWS_OP_SUCCESS;
-}
-
-static void s_read_end_on_event(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- void *user_data) {
-
- (void)event_loop;
- (void)handle;
-
- /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */
- struct aws_pipe_read_end *read_end = user_data;
- struct read_end_impl *read_impl = read_end->impl_data;
- AWS_ASSERT(read_impl);
- AWS_ASSERT(read_impl->event_loop == event_loop);
- AWS_ASSERT(&read_impl->handle == handle);
- AWS_ASSERT(read_impl->is_subscribed);
- AWS_ASSERT(events != 0);
- AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL);
-
- /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */
- bool did_user_callback_clean_up_read_end = false;
- read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end;
-
- /* If readable event received, tell user to try and read, even if "error" events have also occurred. */
- if (events & AWS_IO_EVENT_TYPE_READABLE) {
- read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data);
-
- if (did_user_callback_clean_up_read_end) {
- return;
- }
-
- events &= ~AWS_IO_EVENT_TYPE_READABLE;
- }
-
- if (events) {
- /* Check that user didn't unsubscribe in the previous callback */
- if (read_impl->is_subscribed) {
- read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data);
-
- if (did_user_callback_clean_up_read_end) {
- return;
- }
- }
- }
-
- read_impl->did_user_callback_clean_up_read_end = NULL;
-}
-
-int aws_pipe_subscribe_to_readable_events(
- struct aws_pipe_read_end *read_end,
- aws_pipe_on_readable_fn *on_readable,
- void *user_data) {
-
- AWS_ASSERT(on_readable);
-
- struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
-
- if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
-
- if (read_impl->is_subscribed) {
- return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
- }
-
- read_impl->is_subscribed = true;
- read_impl->on_readable_user_callback = on_readable;
- read_impl->on_readable_user_data = user_data;
-
- int err = aws_event_loop_subscribe_to_io_events(
- read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end);
- if (err) {
- read_impl->is_subscribed = false;
- read_impl->on_readable_user_callback = NULL;
- read_impl->on_readable_user_data = NULL;
-
- return AWS_OP_ERR;
- }
-
- return AWS_OP_SUCCESS;
-}
-
-int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) {
- struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
-
- if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
-
- if (!read_impl->is_subscribed) {
- return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED);
- }
-
- int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle);
- if (err) {
- return AWS_OP_ERR;
- }
-
- read_impl->is_subscribed = false;
- read_impl->on_readable_user_callback = NULL;
- read_impl->on_readable_user_data = NULL;
-
- return AWS_OP_SUCCESS;
-}
-
-/* Pop front write request, invoke its callback, and delete it.
- * Returns whether the callback resulted in the write-end getting cleaned up */
-static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) {
- struct write_end_impl *write_impl = write_end->impl_data;
-
- AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list));
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
- struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node);
-
- struct aws_allocator *alloc = write_impl->alloc;
-
- /* Let the write-end know that a callback is in process, so the write-end can inform the callback
- * whether it resulted in clean_up() being called. */
- bool write_end_cleaned_up_during_callback = false;
- struct write_request *prev_invoking_request = write_impl->currently_invoking_write_callback;
- write_impl->currently_invoking_write_callback = request;
-
- if (request->user_callback) {
- request->user_callback(write_end, error_code, request->original_cursor, request->user_data);
- write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end;
- }
-
- if (!write_end_cleaned_up_during_callback) {
- write_impl->currently_invoking_write_callback = prev_invoking_request;
- }
-
- aws_mem_release(alloc, request);
-
- return write_end_cleaned_up_during_callback;
-}
-
-/* Process write requests as long as the pipe remains writable */
-static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) {
- struct write_end_impl *write_impl = write_end->impl_data;
- AWS_ASSERT(write_impl);
-
- while (!aws_linked_list_empty(&write_impl->write_list)) {
- struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list);
- struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node);
-
- int completed_error_code = AWS_ERROR_SUCCESS;
-
- if (request->cursor.len > 0) {
- ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len);
-
- if (write_val < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- /* The pipe is no longer writable. Bail out */
- write_impl->is_writable = false;
- return;
- }
-
- /* A non-recoverable error occurred during this write */
- completed_error_code = s_translate_posix_error(errno);
-
- } else {
- aws_byte_cursor_advance(&request->cursor, write_val);
-
- if (request->cursor.len > 0) {
- /* There was a partial write, loop again to try and write the rest. */
- continue;
- }
- }
- }
-
- /* If we got this far in the loop, then the write request is complete.
- * Note that the callback may result in the pipe being cleaned up. */
- bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code);
- if (write_end_cleaned_up) {
- /* Bail out! Any remaining requests were canceled during clean_up() */
- return;
- }
- }
-}
-
-/* Handle events on the write-end's file handle */
-static void s_write_end_on_event(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- void *user_data) {
-
- (void)event_loop;
- (void)handle;
-
- /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */
- struct aws_pipe_write_end *write_end = user_data;
- struct write_end_impl *write_impl = write_end->impl_data;
- AWS_ASSERT(write_impl);
- AWS_ASSERT(write_impl->event_loop == event_loop);
- AWS_ASSERT(&write_impl->handle == handle);
-
- /* Only care about the writable event. */
- if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) {
- return;
- }
-
- write_impl->is_writable = true;
-
- s_write_end_process_requests(write_end);
-}
-
-int aws_pipe_write(
- struct aws_pipe_write_end *write_end,
- struct aws_byte_cursor src_buffer,
- aws_pipe_on_write_completed_fn *on_completed,
- void *user_data) {
-
- AWS_ASSERT(src_buffer.ptr);
-
- struct write_end_impl *write_impl = write_end->impl_data;
- if (!write_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
-
- if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
-
- struct write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct write_request));
- if (!request) {
- return AWS_OP_ERR;
- }
-
- request->original_cursor = src_buffer;
- request->cursor = src_buffer;
- request->user_callback = on_completed;
- request->user_data = user_data;
-
- aws_linked_list_push_back(&write_impl->write_list, &request->list_node);
-
- /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could
- * happen if a this aws_pipe_write() call was made by another write's completion callback */
- if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) {
- s_write_end_process_requests(write_end);
- }
-
- return AWS_OP_SUCCESS;
-}
-
-int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) {
- struct write_end_impl *write_impl = write_end->impl_data;
- if (!write_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
-
- if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
-
- int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle);
- if (err) {
- return AWS_OP_ERR;
- }
-
- close(write_impl->handle.data.fd);
-
- /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */
- AWS_ZERO_STRUCT(*write_end);
-
- /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */
- if (write_impl->currently_invoking_write_callback) {
- write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true;
- }
-
- /* Force any outstanding write requests to complete with an error status. */
- while (!aws_linked_list_empty(&write_impl->write_list)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
- struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node);
- if (request->user_callback) {
- request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data);
- }
- aws_mem_release(write_impl->alloc, request);
- }
-
- aws_mem_release(write_impl->alloc, write_impl);
- return AWS_OP_SUCCESS;
-}
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/io/pipe.h>
+
+#include <aws/io/event_loop.h>
+
+#ifdef __GLIBC__
+# define __USE_GNU
+#endif
+
+/* TODO: move this detection to CMAKE and a config header */
+#if !defined(COMPAT_MODE) && defined(__GLIBC__) && __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 9
+# define HAVE_PIPE2 1
+#else
+# define HAVE_PIPE2 0
+#endif
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+/* This isn't defined on ancient linux distros (breaking the builds).
+ * However, if this is a prebuild, we purposely build on an ancient system, but
+ * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
+ * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
+ * gets passed as long as it does.
+ */
+#ifndef O_CLOEXEC
+# define O_CLOEXEC 02000000
+#endif
+
+struct read_end_impl {
+ struct aws_allocator *alloc;
+ struct aws_io_handle handle;
+ struct aws_event_loop *event_loop;
+ aws_pipe_on_readable_fn *on_readable_user_callback;
+ void *on_readable_user_data;
+
+ /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up.
+ * If clean_up() sees that the pointer is set, the bool it points to will get set true. */
+ bool *did_user_callback_clean_up_read_end;
+
+ bool is_subscribed;
+};
+
+struct write_request {
+ struct aws_byte_cursor original_cursor;
+ struct aws_byte_cursor cursor; /* tracks progress of write */
+ size_t num_bytes_written;
+ aws_pipe_on_write_completed_fn *user_callback;
+ void *user_data;
+ struct aws_linked_list_node list_node;
+
+ /* True if the write-end is cleaned up while the user callback is being invoked */
+ bool did_user_callback_clean_up_write_end;
+};
+
+struct write_end_impl {
+ struct aws_allocator *alloc;
+ struct aws_io_handle handle;
+ struct aws_event_loop *event_loop;
+ struct aws_linked_list write_list;
+
+ /* Valid while invoking user callback on a completed write request. */
+ struct write_request *currently_invoking_write_callback;
+
+ bool is_writable;
+
+ /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated write_request around
+ * and re-using it whenever possible */
+};
+
+static void s_write_end_on_event(
+ struct aws_event_loop *event_loop,
+ struct aws_io_handle *handle,
+ int events,
+ void *user_data);
+
+static int s_translate_posix_error(int err) {
+ AWS_ASSERT(err);
+
+ switch (err) {
+ case EPIPE:
+ return AWS_IO_BROKEN_PIPE;
+ default:
+ return AWS_ERROR_SYS_CALL_FAILURE;
+ }
+}
+
+static int s_raise_posix_error(int err) {
+ return aws_raise_error(s_translate_posix_error(err));
+}
+
+AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) {
+ int err;
+
+#if HAVE_PIPE2
+ err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC);
+ if (err) {
+ return s_raise_posix_error(err);
+ }
+
+ return AWS_OP_SUCCESS;
+#else
+ err = pipe(pipe_fds);
+ if (err) {
+ return s_raise_posix_error(err);
+ }
+
+ for (int i = 0; i < 2; ++i) {
+ int flags = fcntl(pipe_fds[i], F_GETFL);
+ if (flags == -1) {
+ s_raise_posix_error(err);
+ goto error;
+ }
+
+ flags |= O_NONBLOCK | O_CLOEXEC;
+ if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) {
+ s_raise_posix_error(err);
+ goto error;
+ }
+ }
+
+ return AWS_OP_SUCCESS;
+error:
+ close(pipe_fds[0]);
+ close(pipe_fds[1]);
+ return AWS_OP_ERR;
+#endif
+}
+
+int aws_pipe_init(
+ struct aws_pipe_read_end *read_end,
+ struct aws_event_loop *read_end_event_loop,
+ struct aws_pipe_write_end *write_end,
+ struct aws_event_loop *write_end_event_loop,
+ struct aws_allocator *allocator) {
+
+ AWS_ASSERT(read_end);
+ AWS_ASSERT(read_end_event_loop);
+ AWS_ASSERT(write_end);
+ AWS_ASSERT(write_end_event_loop);
+ AWS_ASSERT(allocator);
+
+ AWS_ZERO_STRUCT(*read_end);
+ AWS_ZERO_STRUCT(*write_end);
+
+ struct read_end_impl *read_impl = NULL;
+ struct write_end_impl *write_impl = NULL;
+ int err;
+
+ /* Open pipe */
+ int pipe_fds[2];
+ err = aws_open_nonblocking_posix_pipe(pipe_fds);
+ if (err) {
+ return AWS_OP_ERR;
+ }
+
+ /* Init read-end */
+ read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl));
+ if (!read_impl) {
+ goto error;
+ }
+
+ read_impl->alloc = allocator;
+ read_impl->handle.data.fd = pipe_fds[0];
+ read_impl->event_loop = read_end_event_loop;
+
+ /* Init write-end */
+ write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl));
+ if (!write_impl) {
+ goto error;
+ }
+
+ write_impl->alloc = allocator;
+ write_impl->handle.data.fd = pipe_fds[1];
+ write_impl->event_loop = write_end_event_loop;
+ write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */
+ aws_linked_list_init(&write_impl->write_list);
+
+ read_end->impl_data = read_impl;
+ write_end->impl_data = write_impl;
+
+ err = aws_event_loop_subscribe_to_io_events(
+ write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end);
+ if (err) {
+ goto error;
+ }
+
+ return AWS_OP_SUCCESS;
+
+error:
+ close(pipe_fds[0]);
+ close(pipe_fds[1]);
+
+ if (read_impl) {
+ aws_mem_release(allocator, read_impl);
+ }
+
+ if (write_impl) {
+ aws_mem_release(allocator, write_impl);
+ }
+
+ read_end->impl_data = NULL;
+ write_end->impl_data = NULL;
+
+ return AWS_OP_ERR;
+}
+
+int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) {
+ struct read_end_impl *read_impl = read_end->impl_data;
+ if (!read_impl) {
+ return aws_raise_error(AWS_IO_BROKEN_PIPE);
+ }
+
+ if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
+ return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
+ }
+
+ if (read_impl->is_subscribed) {
+ int err = aws_pipe_unsubscribe_from_readable_events(read_end);
+ if (err) {
+ return AWS_OP_ERR;
+ }
+ }
+
+ /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */
+ if (read_impl->did_user_callback_clean_up_read_end) {
+ *read_impl->did_user_callback_clean_up_read_end = true;
+ }
+
+ close(read_impl->handle.data.fd);
+
+ aws_mem_release(read_impl->alloc, read_impl);
+ AWS_ZERO_STRUCT(*read_end);
+ return AWS_OP_SUCCESS;
+}
+
+struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) {
+ const struct read_end_impl *read_impl = read_end->impl_data;
+ if (!read_impl) {
+ aws_raise_error(AWS_IO_BROKEN_PIPE);
+ return NULL;
+ }
+
+ return read_impl->event_loop;
+}
+
+struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) {
+ const struct write_end_impl *write_impl = write_end->impl_data;
+ if (!write_impl) {
+ aws_raise_error(AWS_IO_BROKEN_PIPE);
+ return NULL;
+ }
+
+ return write_impl->event_loop;
+}
+
+int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) {
+ AWS_ASSERT(dst_buffer && dst_buffer->buffer);
+
+ struct read_end_impl *read_impl = read_end->impl_data;
+ if (!read_impl) {
+ return aws_raise_error(AWS_IO_BROKEN_PIPE);
+ }
+
+ if (num_bytes_read) {
+ *num_bytes_read = 0;
+ }
+
+ size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len;
+
+ ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read);
+
+ if (read_val < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
+ }
+ return s_raise_posix_error(errno);
+ }
+
+ /* Success */
+ dst_buffer->len += read_val;
+
+ if (num_bytes_read) {
+ *num_bytes_read = read_val;
+ }
+
+ return AWS_OP_SUCCESS;
+}
+
+static void s_read_end_on_event(
+ struct aws_event_loop *event_loop,
+ struct aws_io_handle *handle,
+ int events,
+ void *user_data) {
+
+ (void)event_loop;
+ (void)handle;
+
+ /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */
+ struct aws_pipe_read_end *read_end = user_data;
+ struct read_end_impl *read_impl = read_end->impl_data;
+ AWS_ASSERT(read_impl);
+ AWS_ASSERT(read_impl->event_loop == event_loop);
+ AWS_ASSERT(&read_impl->handle == handle);
+ AWS_ASSERT(read_impl->is_subscribed);
+ AWS_ASSERT(events != 0);
+ AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL);
+
+ /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */
+ bool did_user_callback_clean_up_read_end = false;
+ read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end;
+
+ /* If readable event received, tell user to try and read, even if "error" events have also occurred. */
+ if (events & AWS_IO_EVENT_TYPE_READABLE) {
+ read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data);
+
+ if (did_user_callback_clean_up_read_end) {
+ return;
+ }
+
+ events &= ~AWS_IO_EVENT_TYPE_READABLE;
+ }
+
+ if (events) {
+ /* Check that user didn't unsubscribe in the previous callback */
+ if (read_impl->is_subscribed) {
+ read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data);
+
+ if (did_user_callback_clean_up_read_end) {
+ return;
+ }
+ }
+ }
+
+ read_impl->did_user_callback_clean_up_read_end = NULL;
+}
+
+int aws_pipe_subscribe_to_readable_events(
+ struct aws_pipe_read_end *read_end,
+ aws_pipe_on_readable_fn *on_readable,
+ void *user_data) {
+
+ AWS_ASSERT(on_readable);
+
+ struct read_end_impl *read_impl = read_end->impl_data;
+ if (!read_impl) {
+ return aws_raise_error(AWS_IO_BROKEN_PIPE);
+ }
+
+ if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
+ return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
+ }
+
+ if (read_impl->is_subscribed) {
+ return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
+ }
+
+ read_impl->is_subscribed = true;
+ read_impl->on_readable_user_callback = on_readable;
+ read_impl->on_readable_user_data = user_data;
+
+ int err = aws_event_loop_subscribe_to_io_events(
+ read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end);
+ if (err) {
+ read_impl->is_subscribed = false;
+ read_impl->on_readable_user_callback = NULL;
+ read_impl->on_readable_user_data = NULL;
+
+ return AWS_OP_ERR;
+ }
+
+ return AWS_OP_SUCCESS;
+}
+
+int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) {
+ struct read_end_impl *read_impl = read_end->impl_data;
+ if (!read_impl) {
+ return aws_raise_error(AWS_IO_BROKEN_PIPE);
+ }
+
+ if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
+ return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
+ }
+
+ if (!read_impl->is_subscribed) {
+ return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED);
+ }
+
+ int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle);
+ if (err) {
+ return AWS_OP_ERR;
+ }
+
+ read_impl->is_subscribed = false;
+ read_impl->on_readable_user_callback = NULL;
+ read_impl->on_readable_user_data = NULL;
+
+ return AWS_OP_SUCCESS;
+}
+
+/* Pop front write request, invoke its callback, and delete it.
+ * Returns whether the callback resulted in the write-end getting cleaned up */
+static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) {
+ struct write_end_impl *write_impl = write_end->impl_data;
+
+ AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list));
+ struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
+ struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node);
+
+ struct aws_allocator *alloc = write_impl->alloc;
+
+ /* Let the write-end know that a callback is in process, so the write-end can inform the callback
+ * whether it resulted in clean_up() being called. */
+ bool write_end_cleaned_up_during_callback = false;
+ struct write_request *prev_invoking_request = write_impl->currently_invoking_write_callback;
+ write_impl->currently_invoking_write_callback = request;
+
+ if (request->user_callback) {
+ request->user_callback(write_end, error_code, request->original_cursor, request->user_data);
+ write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end;
+ }
+
+ if (!write_end_cleaned_up_during_callback) {
+ write_impl->currently_invoking_write_callback = prev_invoking_request;
+ }
+
+ aws_mem_release(alloc, request);
+
+ return write_end_cleaned_up_during_callback;
+}
+
+/* Process write requests as long as the pipe remains writable */
+static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) {
+ struct write_end_impl *write_impl = write_end->impl_data;
+ AWS_ASSERT(write_impl);
+
+ while (!aws_linked_list_empty(&write_impl->write_list)) {
+ struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list);
+ struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node);
+
+ int completed_error_code = AWS_ERROR_SUCCESS;
+
+ if (request->cursor.len > 0) {
+ ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len);
+
+ if (write_val < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* The pipe is no longer writable. Bail out */
+ write_impl->is_writable = false;
+ return;
+ }
+
+ /* A non-recoverable error occurred during this write */
+ completed_error_code = s_translate_posix_error(errno);
+
+ } else {
+ aws_byte_cursor_advance(&request->cursor, write_val);
+
+ if (request->cursor.len > 0) {
+ /* There was a partial write, loop again to try and write the rest. */
+ continue;
+ }
+ }
+ }
+
+ /* If we got this far in the loop, then the write request is complete.
+ * Note that the callback may result in the pipe being cleaned up. */
+ bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code);
+ if (write_end_cleaned_up) {
+ /* Bail out! Any remaining requests were canceled during clean_up() */
+ return;
+ }
+ }
+}
+
+/* Handle events on the write-end's file handle */
+static void s_write_end_on_event(
+ struct aws_event_loop *event_loop,
+ struct aws_io_handle *handle,
+ int events,
+ void *user_data) {
+
+ (void)event_loop;
+ (void)handle;
+
+ /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */
+ struct aws_pipe_write_end *write_end = user_data;
+ struct write_end_impl *write_impl = write_end->impl_data;
+ AWS_ASSERT(write_impl);
+ AWS_ASSERT(write_impl->event_loop == event_loop);
+ AWS_ASSERT(&write_impl->handle == handle);
+
+ /* Only care about the writable event. */
+ if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) {
+ return;
+ }
+
+ write_impl->is_writable = true;
+
+ s_write_end_process_requests(write_end);
+}
+
+int aws_pipe_write(
+ struct aws_pipe_write_end *write_end,
+ struct aws_byte_cursor src_buffer,
+ aws_pipe_on_write_completed_fn *on_completed,
+ void *user_data) {
+
+ AWS_ASSERT(src_buffer.ptr);
+
+ struct write_end_impl *write_impl = write_end->impl_data;
+ if (!write_impl) {
+ return aws_raise_error(AWS_IO_BROKEN_PIPE);
+ }
+
+ if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
+ return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
+ }
+
+ struct write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct write_request));
+ if (!request) {
+ return AWS_OP_ERR;
+ }
+
+ request->original_cursor = src_buffer;
+ request->cursor = src_buffer;
+ request->user_callback = on_completed;
+ request->user_data = user_data;
+
+ aws_linked_list_push_back(&write_impl->write_list, &request->list_node);
+
+ /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could
+ * happen if a this aws_pipe_write() call was made by another write's completion callback */
+ if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) {
+ s_write_end_process_requests(write_end);
+ }
+
+ return AWS_OP_SUCCESS;
+}
+
+int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) {
+ struct write_end_impl *write_impl = write_end->impl_data;
+ if (!write_impl) {
+ return aws_raise_error(AWS_IO_BROKEN_PIPE);
+ }
+
+ if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
+ return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
+ }
+
+ int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle);
+ if (err) {
+ return AWS_OP_ERR;
+ }
+
+ close(write_impl->handle.data.fd);
+
+ /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */
+ AWS_ZERO_STRUCT(*write_end);
+
+ /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */
+ if (write_impl->currently_invoking_write_callback) {
+ write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true;
+ }
+
+ /* Force any outstanding write requests to complete with an error status. */
+ while (!aws_linked_list_empty(&write_impl->write_list)) {
+ struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
+ struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node);
+ if (request->user_callback) {
+ request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data);
+ }
+ aws_mem_release(write_impl->alloc, request);
+ }
+
+ aws_mem_release(write_impl->alloc, write_impl);
+ return AWS_OP_SUCCESS;
+}