diff options
author | orivej <orivej@yandex-team.ru> | 2022-02-10 16:44:49 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:49 +0300 |
commit | 718c552901d703c502ccbefdfc3c9028d608b947 (patch) | |
tree | 46534a98bbefcd7b1f3faa5b52c138ab27db75b7 /contrib/restricted/aws/aws-c-io/source/posix/pipe.c | |
parent | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (diff) | |
download | ydb-718c552901d703c502ccbefdfc3c9028d608b947.tar.gz |
Restoring authorship annotation for <orivej@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/restricted/aws/aws-c-io/source/posix/pipe.c')
-rw-r--r-- | contrib/restricted/aws/aws-c-io/source/posix/pipe.c | 1166 |
1 files changed, 583 insertions, 583 deletions
diff --git a/contrib/restricted/aws/aws-c-io/source/posix/pipe.c b/contrib/restricted/aws/aws-c-io/source/posix/pipe.c index 141cd05cbe..049a15d690 100644 --- a/contrib/restricted/aws/aws-c-io/source/posix/pipe.c +++ b/contrib/restricted/aws/aws-c-io/source/posix/pipe.c @@ -1,583 +1,583 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include <aws/io/pipe.h> - -#include <aws/io/event_loop.h> - -#ifdef __GLIBC__ -# define __USE_GNU -#endif - -/* TODO: move this detection to CMAKE and a config header */ -#if !defined(COMPAT_MODE) && defined(__GLIBC__) && __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 9 -# define HAVE_PIPE2 1 -#else -# define HAVE_PIPE2 0 -#endif - -#include <errno.h> -#include <fcntl.h> -#include <unistd.h> - -/* This isn't defined on ancient linux distros (breaking the builds). - * However, if this is a prebuild, we purposely build on an ancient system, but - * we want the kernel calls to still be the same as a modern build since that's likely the target of the application - * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag - * gets passed as long as it does. - */ -#ifndef O_CLOEXEC -# define O_CLOEXEC 02000000 -#endif - -struct read_end_impl { - struct aws_allocator *alloc; - struct aws_io_handle handle; - struct aws_event_loop *event_loop; - aws_pipe_on_readable_fn *on_readable_user_callback; - void *on_readable_user_data; - - /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up. - * If clean_up() sees that the pointer is set, the bool it points to will get set true. */ - bool *did_user_callback_clean_up_read_end; - - bool is_subscribed; -}; - -struct write_request { - struct aws_byte_cursor original_cursor; - struct aws_byte_cursor cursor; /* tracks progress of write */ - size_t num_bytes_written; - aws_pipe_on_write_completed_fn *user_callback; - void *user_data; - struct aws_linked_list_node list_node; - - /* True if the write-end is cleaned up while the user callback is being invoked */ - bool did_user_callback_clean_up_write_end; -}; - -struct write_end_impl { - struct aws_allocator *alloc; - struct aws_io_handle handle; - struct aws_event_loop *event_loop; - struct aws_linked_list write_list; - - /* Valid while invoking user callback on a completed write request. */ - struct write_request *currently_invoking_write_callback; - - bool is_writable; - - /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated write_request around - * and re-using it whenever possible */ -}; - -static void s_write_end_on_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data); - -static int s_translate_posix_error(int err) { - AWS_ASSERT(err); - - switch (err) { - case EPIPE: - return AWS_IO_BROKEN_PIPE; - default: - return AWS_ERROR_SYS_CALL_FAILURE; - } -} - -static int s_raise_posix_error(int err) { - return aws_raise_error(s_translate_posix_error(err)); -} - -AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) { - int err; - -#if HAVE_PIPE2 - err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC); - if (err) { - return s_raise_posix_error(err); - } - - return AWS_OP_SUCCESS; -#else - err = pipe(pipe_fds); - if (err) { - return s_raise_posix_error(err); - } - - for (int i = 0; i < 2; ++i) { - int flags = fcntl(pipe_fds[i], F_GETFL); - if (flags == -1) { - s_raise_posix_error(err); - goto error; - } - - flags |= O_NONBLOCK | O_CLOEXEC; - if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) { - s_raise_posix_error(err); - goto error; - } - } - - return AWS_OP_SUCCESS; -error: - close(pipe_fds[0]); - close(pipe_fds[1]); - return AWS_OP_ERR; -#endif -} - -int aws_pipe_init( - struct aws_pipe_read_end *read_end, - struct aws_event_loop *read_end_event_loop, - struct aws_pipe_write_end *write_end, - struct aws_event_loop *write_end_event_loop, - struct aws_allocator *allocator) { - - AWS_ASSERT(read_end); - AWS_ASSERT(read_end_event_loop); - AWS_ASSERT(write_end); - AWS_ASSERT(write_end_event_loop); - AWS_ASSERT(allocator); - - AWS_ZERO_STRUCT(*read_end); - AWS_ZERO_STRUCT(*write_end); - - struct read_end_impl *read_impl = NULL; - struct write_end_impl *write_impl = NULL; - int err; - - /* Open pipe */ - int pipe_fds[2]; - err = aws_open_nonblocking_posix_pipe(pipe_fds); - if (err) { - return AWS_OP_ERR; - } - - /* Init read-end */ - read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl)); - if (!read_impl) { - goto error; - } - - read_impl->alloc = allocator; - read_impl->handle.data.fd = pipe_fds[0]; - read_impl->event_loop = read_end_event_loop; - - /* Init write-end */ - write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl)); - if (!write_impl) { - goto error; - } - - write_impl->alloc = allocator; - write_impl->handle.data.fd = pipe_fds[1]; - write_impl->event_loop = write_end_event_loop; - write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */ - aws_linked_list_init(&write_impl->write_list); - - read_end->impl_data = read_impl; - write_end->impl_data = write_impl; - - err = aws_event_loop_subscribe_to_io_events( - write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end); - if (err) { - goto error; - } - - return AWS_OP_SUCCESS; - -error: - close(pipe_fds[0]); - close(pipe_fds[1]); - - if (read_impl) { - aws_mem_release(allocator, read_impl); - } - - if (write_impl) { - aws_mem_release(allocator, write_impl); - } - - read_end->impl_data = NULL; - write_end->impl_data = NULL; - - return AWS_OP_ERR; -} - -int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) { - struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (read_impl->is_subscribed) { - int err = aws_pipe_unsubscribe_from_readable_events(read_end); - if (err) { - return AWS_OP_ERR; - } - } - - /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */ - if (read_impl->did_user_callback_clean_up_read_end) { - *read_impl->did_user_callback_clean_up_read_end = true; - } - - close(read_impl->handle.data.fd); - - aws_mem_release(read_impl->alloc, read_impl); - AWS_ZERO_STRUCT(*read_end); - return AWS_OP_SUCCESS; -} - -struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) { - const struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - aws_raise_error(AWS_IO_BROKEN_PIPE); - return NULL; - } - - return read_impl->event_loop; -} - -struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) { - const struct write_end_impl *write_impl = write_end->impl_data; - if (!write_impl) { - aws_raise_error(AWS_IO_BROKEN_PIPE); - return NULL; - } - - return write_impl->event_loop; -} - -int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) { - AWS_ASSERT(dst_buffer && dst_buffer->buffer); - - struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (num_bytes_read) { - *num_bytes_read = 0; - } - - size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len; - - ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read); - - if (read_val < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); - } - return s_raise_posix_error(errno); - } - - /* Success */ - dst_buffer->len += read_val; - - if (num_bytes_read) { - *num_bytes_read = read_val; - } - - return AWS_OP_SUCCESS; -} - -static void s_read_end_on_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - - (void)event_loop; - (void)handle; - - /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */ - struct aws_pipe_read_end *read_end = user_data; - struct read_end_impl *read_impl = read_end->impl_data; - AWS_ASSERT(read_impl); - AWS_ASSERT(read_impl->event_loop == event_loop); - AWS_ASSERT(&read_impl->handle == handle); - AWS_ASSERT(read_impl->is_subscribed); - AWS_ASSERT(events != 0); - AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL); - - /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */ - bool did_user_callback_clean_up_read_end = false; - read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end; - - /* If readable event received, tell user to try and read, even if "error" events have also occurred. */ - if (events & AWS_IO_EVENT_TYPE_READABLE) { - read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data); - - if (did_user_callback_clean_up_read_end) { - return; - } - - events &= ~AWS_IO_EVENT_TYPE_READABLE; - } - - if (events) { - /* Check that user didn't unsubscribe in the previous callback */ - if (read_impl->is_subscribed) { - read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data); - - if (did_user_callback_clean_up_read_end) { - return; - } - } - } - - read_impl->did_user_callback_clean_up_read_end = NULL; -} - -int aws_pipe_subscribe_to_readable_events( - struct aws_pipe_read_end *read_end, - aws_pipe_on_readable_fn *on_readable, - void *user_data) { - - AWS_ASSERT(on_readable); - - struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (read_impl->is_subscribed) { - return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); - } - - read_impl->is_subscribed = true; - read_impl->on_readable_user_callback = on_readable; - read_impl->on_readable_user_data = user_data; - - int err = aws_event_loop_subscribe_to_io_events( - read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end); - if (err) { - read_impl->is_subscribed = false; - read_impl->on_readable_user_callback = NULL; - read_impl->on_readable_user_data = NULL; - - return AWS_OP_ERR; - } - - return AWS_OP_SUCCESS; -} - -int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) { - struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (!read_impl->is_subscribed) { - return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED); - } - - int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle); - if (err) { - return AWS_OP_ERR; - } - - read_impl->is_subscribed = false; - read_impl->on_readable_user_callback = NULL; - read_impl->on_readable_user_data = NULL; - - return AWS_OP_SUCCESS; -} - -/* Pop front write request, invoke its callback, and delete it. - * Returns whether the callback resulted in the write-end getting cleaned up */ -static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) { - struct write_end_impl *write_impl = write_end->impl_data; - - AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list)); - struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); - struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); - - struct aws_allocator *alloc = write_impl->alloc; - - /* Let the write-end know that a callback is in process, so the write-end can inform the callback - * whether it resulted in clean_up() being called. */ - bool write_end_cleaned_up_during_callback = false; - struct write_request *prev_invoking_request = write_impl->currently_invoking_write_callback; - write_impl->currently_invoking_write_callback = request; - - if (request->user_callback) { - request->user_callback(write_end, error_code, request->original_cursor, request->user_data); - write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end; - } - - if (!write_end_cleaned_up_during_callback) { - write_impl->currently_invoking_write_callback = prev_invoking_request; - } - - aws_mem_release(alloc, request); - - return write_end_cleaned_up_during_callback; -} - -/* Process write requests as long as the pipe remains writable */ -static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) { - struct write_end_impl *write_impl = write_end->impl_data; - AWS_ASSERT(write_impl); - - while (!aws_linked_list_empty(&write_impl->write_list)) { - struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list); - struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); - - int completed_error_code = AWS_ERROR_SUCCESS; - - if (request->cursor.len > 0) { - ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len); - - if (write_val < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - /* The pipe is no longer writable. Bail out */ - write_impl->is_writable = false; - return; - } - - /* A non-recoverable error occurred during this write */ - completed_error_code = s_translate_posix_error(errno); - - } else { - aws_byte_cursor_advance(&request->cursor, write_val); - - if (request->cursor.len > 0) { - /* There was a partial write, loop again to try and write the rest. */ - continue; - } - } - } - - /* If we got this far in the loop, then the write request is complete. - * Note that the callback may result in the pipe being cleaned up. */ - bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code); - if (write_end_cleaned_up) { - /* Bail out! Any remaining requests were canceled during clean_up() */ - return; - } - } -} - -/* Handle events on the write-end's file handle */ -static void s_write_end_on_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - - (void)event_loop; - (void)handle; - - /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */ - struct aws_pipe_write_end *write_end = user_data; - struct write_end_impl *write_impl = write_end->impl_data; - AWS_ASSERT(write_impl); - AWS_ASSERT(write_impl->event_loop == event_loop); - AWS_ASSERT(&write_impl->handle == handle); - - /* Only care about the writable event. */ - if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) { - return; - } - - write_impl->is_writable = true; - - s_write_end_process_requests(write_end); -} - -int aws_pipe_write( - struct aws_pipe_write_end *write_end, - struct aws_byte_cursor src_buffer, - aws_pipe_on_write_completed_fn *on_completed, - void *user_data) { - - AWS_ASSERT(src_buffer.ptr); - - struct write_end_impl *write_impl = write_end->impl_data; - if (!write_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - struct write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct write_request)); - if (!request) { - return AWS_OP_ERR; - } - - request->original_cursor = src_buffer; - request->cursor = src_buffer; - request->user_callback = on_completed; - request->user_data = user_data; - - aws_linked_list_push_back(&write_impl->write_list, &request->list_node); - - /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could - * happen if a this aws_pipe_write() call was made by another write's completion callback */ - if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) { - s_write_end_process_requests(write_end); - } - - return AWS_OP_SUCCESS; -} - -int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) { - struct write_end_impl *write_impl = write_end->impl_data; - if (!write_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle); - if (err) { - return AWS_OP_ERR; - } - - close(write_impl->handle.data.fd); - - /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */ - AWS_ZERO_STRUCT(*write_end); - - /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */ - if (write_impl->currently_invoking_write_callback) { - write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true; - } - - /* Force any outstanding write requests to complete with an error status. */ - while (!aws_linked_list_empty(&write_impl->write_list)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); - struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); - if (request->user_callback) { - request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data); - } - aws_mem_release(write_impl->alloc, request); - } - - aws_mem_release(write_impl->alloc, write_impl); - return AWS_OP_SUCCESS; -} +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/io/pipe.h> + +#include <aws/io/event_loop.h> + +#ifdef __GLIBC__ +# define __USE_GNU +#endif + +/* TODO: move this detection to CMAKE and a config header */ +#if !defined(COMPAT_MODE) && defined(__GLIBC__) && __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 9 +# define HAVE_PIPE2 1 +#else +# define HAVE_PIPE2 0 +#endif + +#include <errno.h> +#include <fcntl.h> +#include <unistd.h> + +/* This isn't defined on ancient linux distros (breaking the builds). + * However, if this is a prebuild, we purposely build on an ancient system, but + * we want the kernel calls to still be the same as a modern build since that's likely the target of the application + * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag + * gets passed as long as it does. + */ +#ifndef O_CLOEXEC +# define O_CLOEXEC 02000000 +#endif + +struct read_end_impl { + struct aws_allocator *alloc; + struct aws_io_handle handle; + struct aws_event_loop *event_loop; + aws_pipe_on_readable_fn *on_readable_user_callback; + void *on_readable_user_data; + + /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up. + * If clean_up() sees that the pointer is set, the bool it points to will get set true. */ + bool *did_user_callback_clean_up_read_end; + + bool is_subscribed; +}; + +struct write_request { + struct aws_byte_cursor original_cursor; + struct aws_byte_cursor cursor; /* tracks progress of write */ + size_t num_bytes_written; + aws_pipe_on_write_completed_fn *user_callback; + void *user_data; + struct aws_linked_list_node list_node; + + /* True if the write-end is cleaned up while the user callback is being invoked */ + bool did_user_callback_clean_up_write_end; +}; + +struct write_end_impl { + struct aws_allocator *alloc; + struct aws_io_handle handle; + struct aws_event_loop *event_loop; + struct aws_linked_list write_list; + + /* Valid while invoking user callback on a completed write request. */ + struct write_request *currently_invoking_write_callback; + + bool is_writable; + + /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated write_request around + * and re-using it whenever possible */ +}; + +static void s_write_end_on_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data); + +static int s_translate_posix_error(int err) { + AWS_ASSERT(err); + + switch (err) { + case EPIPE: + return AWS_IO_BROKEN_PIPE; + default: + return AWS_ERROR_SYS_CALL_FAILURE; + } +} + +static int s_raise_posix_error(int err) { + return aws_raise_error(s_translate_posix_error(err)); +} + +AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) { + int err; + +#if HAVE_PIPE2 + err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC); + if (err) { + return s_raise_posix_error(err); + } + + return AWS_OP_SUCCESS; +#else + err = pipe(pipe_fds); + if (err) { + return s_raise_posix_error(err); + } + + for (int i = 0; i < 2; ++i) { + int flags = fcntl(pipe_fds[i], F_GETFL); + if (flags == -1) { + s_raise_posix_error(err); + goto error; + } + + flags |= O_NONBLOCK | O_CLOEXEC; + if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) { + s_raise_posix_error(err); + goto error; + } + } + + return AWS_OP_SUCCESS; +error: + close(pipe_fds[0]); + close(pipe_fds[1]); + return AWS_OP_ERR; +#endif +} + +int aws_pipe_init( + struct aws_pipe_read_end *read_end, + struct aws_event_loop *read_end_event_loop, + struct aws_pipe_write_end *write_end, + struct aws_event_loop *write_end_event_loop, + struct aws_allocator *allocator) { + + AWS_ASSERT(read_end); + AWS_ASSERT(read_end_event_loop); + AWS_ASSERT(write_end); + AWS_ASSERT(write_end_event_loop); + AWS_ASSERT(allocator); + + AWS_ZERO_STRUCT(*read_end); + AWS_ZERO_STRUCT(*write_end); + + struct read_end_impl *read_impl = NULL; + struct write_end_impl *write_impl = NULL; + int err; + + /* Open pipe */ + int pipe_fds[2]; + err = aws_open_nonblocking_posix_pipe(pipe_fds); + if (err) { + return AWS_OP_ERR; + } + + /* Init read-end */ + read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl)); + if (!read_impl) { + goto error; + } + + read_impl->alloc = allocator; + read_impl->handle.data.fd = pipe_fds[0]; + read_impl->event_loop = read_end_event_loop; + + /* Init write-end */ + write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl)); + if (!write_impl) { + goto error; + } + + write_impl->alloc = allocator; + write_impl->handle.data.fd = pipe_fds[1]; + write_impl->event_loop = write_end_event_loop; + write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */ + aws_linked_list_init(&write_impl->write_list); + + read_end->impl_data = read_impl; + write_end->impl_data = write_impl; + + err = aws_event_loop_subscribe_to_io_events( + write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end); + if (err) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + close(pipe_fds[0]); + close(pipe_fds[1]); + + if (read_impl) { + aws_mem_release(allocator, read_impl); + } + + if (write_impl) { + aws_mem_release(allocator, write_impl); + } + + read_end->impl_data = NULL; + write_end->impl_data = NULL; + + return AWS_OP_ERR; +} + +int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) { + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (read_impl->is_subscribed) { + int err = aws_pipe_unsubscribe_from_readable_events(read_end); + if (err) { + return AWS_OP_ERR; + } + } + + /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */ + if (read_impl->did_user_callback_clean_up_read_end) { + *read_impl->did_user_callback_clean_up_read_end = true; + } + + close(read_impl->handle.data.fd); + + aws_mem_release(read_impl->alloc, read_impl); + AWS_ZERO_STRUCT(*read_end); + return AWS_OP_SUCCESS; +} + +struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) { + const struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + aws_raise_error(AWS_IO_BROKEN_PIPE); + return NULL; + } + + return read_impl->event_loop; +} + +struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) { + const struct write_end_impl *write_impl = write_end->impl_data; + if (!write_impl) { + aws_raise_error(AWS_IO_BROKEN_PIPE); + return NULL; + } + + return write_impl->event_loop; +} + +int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) { + AWS_ASSERT(dst_buffer && dst_buffer->buffer); + + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (num_bytes_read) { + *num_bytes_read = 0; + } + + size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len; + + ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read); + + if (read_val < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); + } + return s_raise_posix_error(errno); + } + + /* Success */ + dst_buffer->len += read_val; + + if (num_bytes_read) { + *num_bytes_read = read_val; + } + + return AWS_OP_SUCCESS; +} + +static void s_read_end_on_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + (void)handle; + + /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */ + struct aws_pipe_read_end *read_end = user_data; + struct read_end_impl *read_impl = read_end->impl_data; + AWS_ASSERT(read_impl); + AWS_ASSERT(read_impl->event_loop == event_loop); + AWS_ASSERT(&read_impl->handle == handle); + AWS_ASSERT(read_impl->is_subscribed); + AWS_ASSERT(events != 0); + AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL); + + /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */ + bool did_user_callback_clean_up_read_end = false; + read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end; + + /* If readable event received, tell user to try and read, even if "error" events have also occurred. */ + if (events & AWS_IO_EVENT_TYPE_READABLE) { + read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data); + + if (did_user_callback_clean_up_read_end) { + return; + } + + events &= ~AWS_IO_EVENT_TYPE_READABLE; + } + + if (events) { + /* Check that user didn't unsubscribe in the previous callback */ + if (read_impl->is_subscribed) { + read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data); + + if (did_user_callback_clean_up_read_end) { + return; + } + } + } + + read_impl->did_user_callback_clean_up_read_end = NULL; +} + +int aws_pipe_subscribe_to_readable_events( + struct aws_pipe_read_end *read_end, + aws_pipe_on_readable_fn *on_readable, + void *user_data) { + + AWS_ASSERT(on_readable); + + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (read_impl->is_subscribed) { + return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); + } + + read_impl->is_subscribed = true; + read_impl->on_readable_user_callback = on_readable; + read_impl->on_readable_user_data = user_data; + + int err = aws_event_loop_subscribe_to_io_events( + read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end); + if (err) { + read_impl->is_subscribed = false; + read_impl->on_readable_user_callback = NULL; + read_impl->on_readable_user_data = NULL; + + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; +} + +int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) { + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (!read_impl->is_subscribed) { + return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED); + } + + int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle); + if (err) { + return AWS_OP_ERR; + } + + read_impl->is_subscribed = false; + read_impl->on_readable_user_callback = NULL; + read_impl->on_readable_user_data = NULL; + + return AWS_OP_SUCCESS; +} + +/* Pop front write request, invoke its callback, and delete it. + * Returns whether the callback resulted in the write-end getting cleaned up */ +static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) { + struct write_end_impl *write_impl = write_end->impl_data; + + AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list)); + struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); + struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); + + struct aws_allocator *alloc = write_impl->alloc; + + /* Let the write-end know that a callback is in process, so the write-end can inform the callback + * whether it resulted in clean_up() being called. */ + bool write_end_cleaned_up_during_callback = false; + struct write_request *prev_invoking_request = write_impl->currently_invoking_write_callback; + write_impl->currently_invoking_write_callback = request; + + if (request->user_callback) { + request->user_callback(write_end, error_code, request->original_cursor, request->user_data); + write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end; + } + + if (!write_end_cleaned_up_during_callback) { + write_impl->currently_invoking_write_callback = prev_invoking_request; + } + + aws_mem_release(alloc, request); + + return write_end_cleaned_up_during_callback; +} + +/* Process write requests as long as the pipe remains writable */ +static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) { + struct write_end_impl *write_impl = write_end->impl_data; + AWS_ASSERT(write_impl); + + while (!aws_linked_list_empty(&write_impl->write_list)) { + struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list); + struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); + + int completed_error_code = AWS_ERROR_SUCCESS; + + if (request->cursor.len > 0) { + ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len); + + if (write_val < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + /* The pipe is no longer writable. Bail out */ + write_impl->is_writable = false; + return; + } + + /* A non-recoverable error occurred during this write */ + completed_error_code = s_translate_posix_error(errno); + + } else { + aws_byte_cursor_advance(&request->cursor, write_val); + + if (request->cursor.len > 0) { + /* There was a partial write, loop again to try and write the rest. */ + continue; + } + } + } + + /* If we got this far in the loop, then the write request is complete. + * Note that the callback may result in the pipe being cleaned up. */ + bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code); + if (write_end_cleaned_up) { + /* Bail out! Any remaining requests were canceled during clean_up() */ + return; + } + } +} + +/* Handle events on the write-end's file handle */ +static void s_write_end_on_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + (void)handle; + + /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */ + struct aws_pipe_write_end *write_end = user_data; + struct write_end_impl *write_impl = write_end->impl_data; + AWS_ASSERT(write_impl); + AWS_ASSERT(write_impl->event_loop == event_loop); + AWS_ASSERT(&write_impl->handle == handle); + + /* Only care about the writable event. */ + if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) { + return; + } + + write_impl->is_writable = true; + + s_write_end_process_requests(write_end); +} + +int aws_pipe_write( + struct aws_pipe_write_end *write_end, + struct aws_byte_cursor src_buffer, + aws_pipe_on_write_completed_fn *on_completed, + void *user_data) { + + AWS_ASSERT(src_buffer.ptr); + + struct write_end_impl *write_impl = write_end->impl_data; + if (!write_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + struct write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct write_request)); + if (!request) { + return AWS_OP_ERR; + } + + request->original_cursor = src_buffer; + request->cursor = src_buffer; + request->user_callback = on_completed; + request->user_data = user_data; + + aws_linked_list_push_back(&write_impl->write_list, &request->list_node); + + /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could + * happen if a this aws_pipe_write() call was made by another write's completion callback */ + if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) { + s_write_end_process_requests(write_end); + } + + return AWS_OP_SUCCESS; +} + +int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) { + struct write_end_impl *write_impl = write_end->impl_data; + if (!write_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle); + if (err) { + return AWS_OP_ERR; + } + + close(write_impl->handle.data.fd); + + /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */ + AWS_ZERO_STRUCT(*write_end); + + /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */ + if (write_impl->currently_invoking_write_callback) { + write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true; + } + + /* Force any outstanding write requests to complete with an error status. */ + while (!aws_linked_list_empty(&write_impl->write_list)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); + struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); + if (request->user_callback) { + request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data); + } + aws_mem_release(write_impl->alloc, request); + } + + aws_mem_release(write_impl->alloc, write_impl); + return AWS_OP_SUCCESS; +} |