diff options
author | orivej <orivej@yandex-team.ru> | 2022-02-10 16:44:49 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:49 +0300 |
commit | 718c552901d703c502ccbefdfc3c9028d608b947 (patch) | |
tree | 46534a98bbefcd7b1f3faa5b52c138ab27db75b7 /contrib/restricted/aws/aws-c-io/source/posix | |
parent | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (diff) | |
download | ydb-718c552901d703c502ccbefdfc3c9028d608b947.tar.gz |
Restoring authorship annotation for <orivej@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/restricted/aws/aws-c-io/source/posix')
-rw-r--r-- | contrib/restricted/aws/aws-c-io/source/posix/file_utils.c | 138 | ||||
-rw-r--r-- | contrib/restricted/aws/aws-c-io/source/posix/host_resolver.c | 236 | ||||
-rw-r--r-- | contrib/restricted/aws/aws-c-io/source/posix/pipe.c | 1166 | ||||
-rw-r--r-- | contrib/restricted/aws/aws-c-io/source/posix/shared_library.c | 132 | ||||
-rw-r--r-- | contrib/restricted/aws/aws-c-io/source/posix/socket.c | 3554 |
5 files changed, 2613 insertions, 2613 deletions
diff --git a/contrib/restricted/aws/aws-c-io/source/posix/file_utils.c b/contrib/restricted/aws/aws-c-io/source/posix/file_utils.c index fcb96260eb..03b5f6c734 100644 --- a/contrib/restricted/aws/aws-c-io/source/posix/file_utils.c +++ b/contrib/restricted/aws/aws-c-io/source/posix/file_utils.c @@ -1,69 +1,69 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include <aws/io/file_utils.h> - -#include <aws/common/environment.h> -#include <aws/common/string.h> - -#include <errno.h> -#include <sys/stat.h> -#include <unistd.h> - -char aws_get_platform_directory_separator(void) { - return '/'; -} - -AWS_STATIC_STRING_FROM_LITERAL(s_home_env_var, "HOME"); - -struct aws_string *aws_get_home_directory(struct aws_allocator *allocator) { - - /* ToDo: check getpwuid_r if environment check fails */ - struct aws_string *home_env_var_value = NULL; - if (aws_get_environment_value(allocator, s_home_env_var, &home_env_var_value) == 0 && home_env_var_value != NULL) { - return home_env_var_value; - } - - return NULL; -} - -bool aws_path_exists(const char *path) { - struct stat buffer; - return stat(path, &buffer) == 0; -} - -int aws_fseek(FILE *file, aws_off_t offset, int whence) { - - int result = -#if _FILE_OFFSET_BITS == 64 || _POSIX_C_SOURCE >= 200112L - fseeko(file, offset, whence); -#else - fseek(file, offset, whence); -#endif - - if (result != 0) { - return aws_translate_and_raise_io_error(errno); - } - - return AWS_OP_SUCCESS; -} - -int aws_file_get_length(FILE *file, int64_t *length) { - - struct stat file_stats; - - int fd = fileno(file); - if (fd == -1) { - return aws_raise_error(AWS_IO_INVALID_FILE_HANDLE); - } - - if (fstat(fd, &file_stats)) { - return aws_translate_and_raise_io_error(errno); - } - - *length = file_stats.st_size; - - return AWS_OP_SUCCESS; -} +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/io/file_utils.h> + +#include <aws/common/environment.h> +#include <aws/common/string.h> + +#include <errno.h> +#include <sys/stat.h> +#include <unistd.h> + +char aws_get_platform_directory_separator(void) { + return '/'; +} + +AWS_STATIC_STRING_FROM_LITERAL(s_home_env_var, "HOME"); + +struct aws_string *aws_get_home_directory(struct aws_allocator *allocator) { + + /* ToDo: check getpwuid_r if environment check fails */ + struct aws_string *home_env_var_value = NULL; + if (aws_get_environment_value(allocator, s_home_env_var, &home_env_var_value) == 0 && home_env_var_value != NULL) { + return home_env_var_value; + } + + return NULL; +} + +bool aws_path_exists(const char *path) { + struct stat buffer; + return stat(path, &buffer) == 0; +} + +int aws_fseek(FILE *file, aws_off_t offset, int whence) { + + int result = +#if _FILE_OFFSET_BITS == 64 || _POSIX_C_SOURCE >= 200112L + fseeko(file, offset, whence); +#else + fseek(file, offset, whence); +#endif + + if (result != 0) { + return aws_translate_and_raise_io_error(errno); + } + + return AWS_OP_SUCCESS; +} + +int aws_file_get_length(FILE *file, int64_t *length) { + + struct stat file_stats; + + int fd = fileno(file); + if (fd == -1) { + return aws_raise_error(AWS_IO_INVALID_FILE_HANDLE); + } + + if (fstat(fd, &file_stats)) { + return aws_translate_and_raise_io_error(errno); + } + + *length = file_stats.st_size; + + return AWS_OP_SUCCESS; +} diff --git a/contrib/restricted/aws/aws-c-io/source/posix/host_resolver.c b/contrib/restricted/aws/aws-c-io/source/posix/host_resolver.c index 6594723bb8..e9604107d7 100644 --- a/contrib/restricted/aws/aws-c-io/source/posix/host_resolver.c +++ b/contrib/restricted/aws/aws-c-io/source/posix/host_resolver.c @@ -1,118 +1,118 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include <aws/io/host_resolver.h> - -#include <aws/io/logging.h> - -#include <aws/common/string.h> - -#include <arpa/inet.h> -#include <netdb.h> -#include <sys/socket.h> -#include <sys/types.h> - -int aws_default_dns_resolve( - struct aws_allocator *allocator, - const struct aws_string *host_name, - struct aws_array_list *output_addresses, - void *user_data) { - - (void)user_data; - struct addrinfo *result = NULL; - struct addrinfo *iter = NULL; - /* max string length for ipv6. */ - socklen_t max_len = INET6_ADDRSTRLEN; - char address_buffer[max_len]; - - const char *hostname_cstr = aws_string_c_str(host_name); - AWS_LOGF_DEBUG(AWS_LS_IO_DNS, "static: resolving host %s", hostname_cstr); - - /* Android would prefer NO HINTS IF YOU DON'T MIND, SIR */ -#ifdef ANDROID - int err_code = getaddrinfo(hostname_cstr, NULL, NULL, &result); -#else - struct addrinfo hints; - AWS_ZERO_STRUCT(hints); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_ALL | AI_V4MAPPED; - - int err_code = getaddrinfo(hostname_cstr, NULL, &hints, &result); -#endif - - if (err_code) { - AWS_LOGF_ERROR(AWS_LS_IO_DNS, "static: getaddrinfo failed with error_code %d", err_code); - goto clean_up; - } - - for (iter = result; iter != NULL; iter = iter->ai_next) { - struct aws_host_address host_address; - - AWS_ZERO_ARRAY(address_buffer); - - if (iter->ai_family == AF_INET6) { - host_address.record_type = AWS_ADDRESS_RECORD_TYPE_AAAA; - inet_ntop(iter->ai_family, &((struct sockaddr_in6 *)iter->ai_addr)->sin6_addr, address_buffer, max_len); - } else { - host_address.record_type = AWS_ADDRESS_RECORD_TYPE_A; - inet_ntop(iter->ai_family, &((struct sockaddr_in *)iter->ai_addr)->sin_addr, address_buffer, max_len); - } - - size_t address_len = strlen(address_buffer); - const struct aws_string *address = - aws_string_new_from_array(allocator, (const uint8_t *)address_buffer, address_len); - - if (!address) { - goto clean_up; - } - - const struct aws_string *host_cpy = aws_string_new_from_string(allocator, host_name); - - if (!host_cpy) { - aws_string_destroy((void *)address); - goto clean_up; - } - - AWS_LOGF_DEBUG(AWS_LS_IO_DNS, "static: resolved record: %s", address_buffer); - - host_address.address = address; - host_address.weight = 0; - host_address.allocator = allocator; - host_address.use_count = 0; - host_address.connection_failure_count = 0; - host_address.host = host_cpy; - - if (aws_array_list_push_back(output_addresses, &host_address)) { - aws_host_address_clean_up(&host_address); - goto clean_up; - } - } - - freeaddrinfo(result); - return AWS_OP_SUCCESS; - -clean_up: - if (result) { - freeaddrinfo(result); - } - - if (err_code) { - switch (err_code) { - case EAI_FAIL: - case EAI_AGAIN: - return aws_raise_error(AWS_IO_DNS_QUERY_FAILED); - case EAI_MEMORY: - return aws_raise_error(AWS_ERROR_OOM); - case EAI_NONAME: - case EAI_SERVICE: - return aws_raise_error(AWS_IO_DNS_INVALID_NAME); - default: - return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); - } - } - - return AWS_OP_ERR; -} +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/io/host_resolver.h> + +#include <aws/io/logging.h> + +#include <aws/common/string.h> + +#include <arpa/inet.h> +#include <netdb.h> +#include <sys/socket.h> +#include <sys/types.h> + +int aws_default_dns_resolve( + struct aws_allocator *allocator, + const struct aws_string *host_name, + struct aws_array_list *output_addresses, + void *user_data) { + + (void)user_data; + struct addrinfo *result = NULL; + struct addrinfo *iter = NULL; + /* max string length for ipv6. */ + socklen_t max_len = INET6_ADDRSTRLEN; + char address_buffer[max_len]; + + const char *hostname_cstr = aws_string_c_str(host_name); + AWS_LOGF_DEBUG(AWS_LS_IO_DNS, "static: resolving host %s", hostname_cstr); + + /* Android would prefer NO HINTS IF YOU DON'T MIND, SIR */ +#ifdef ANDROID + int err_code = getaddrinfo(hostname_cstr, NULL, NULL, &result); +#else + struct addrinfo hints; + AWS_ZERO_STRUCT(hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_ALL | AI_V4MAPPED; + + int err_code = getaddrinfo(hostname_cstr, NULL, &hints, &result); +#endif + + if (err_code) { + AWS_LOGF_ERROR(AWS_LS_IO_DNS, "static: getaddrinfo failed with error_code %d", err_code); + goto clean_up; + } + + for (iter = result; iter != NULL; iter = iter->ai_next) { + struct aws_host_address host_address; + + AWS_ZERO_ARRAY(address_buffer); + + if (iter->ai_family == AF_INET6) { + host_address.record_type = AWS_ADDRESS_RECORD_TYPE_AAAA; + inet_ntop(iter->ai_family, &((struct sockaddr_in6 *)iter->ai_addr)->sin6_addr, address_buffer, max_len); + } else { + host_address.record_type = AWS_ADDRESS_RECORD_TYPE_A; + inet_ntop(iter->ai_family, &((struct sockaddr_in *)iter->ai_addr)->sin_addr, address_buffer, max_len); + } + + size_t address_len = strlen(address_buffer); + const struct aws_string *address = + aws_string_new_from_array(allocator, (const uint8_t *)address_buffer, address_len); + + if (!address) { + goto clean_up; + } + + const struct aws_string *host_cpy = aws_string_new_from_string(allocator, host_name); + + if (!host_cpy) { + aws_string_destroy((void *)address); + goto clean_up; + } + + AWS_LOGF_DEBUG(AWS_LS_IO_DNS, "static: resolved record: %s", address_buffer); + + host_address.address = address; + host_address.weight = 0; + host_address.allocator = allocator; + host_address.use_count = 0; + host_address.connection_failure_count = 0; + host_address.host = host_cpy; + + if (aws_array_list_push_back(output_addresses, &host_address)) { + aws_host_address_clean_up(&host_address); + goto clean_up; + } + } + + freeaddrinfo(result); + return AWS_OP_SUCCESS; + +clean_up: + if (result) { + freeaddrinfo(result); + } + + if (err_code) { + switch (err_code) { + case EAI_FAIL: + case EAI_AGAIN: + return aws_raise_error(AWS_IO_DNS_QUERY_FAILED); + case EAI_MEMORY: + return aws_raise_error(AWS_ERROR_OOM); + case EAI_NONAME: + case EAI_SERVICE: + return aws_raise_error(AWS_IO_DNS_INVALID_NAME); + default: + return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); + } + } + + return AWS_OP_ERR; +} diff --git a/contrib/restricted/aws/aws-c-io/source/posix/pipe.c b/contrib/restricted/aws/aws-c-io/source/posix/pipe.c index 141cd05cbe..049a15d690 100644 --- a/contrib/restricted/aws/aws-c-io/source/posix/pipe.c +++ b/contrib/restricted/aws/aws-c-io/source/posix/pipe.c @@ -1,583 +1,583 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include <aws/io/pipe.h> - -#include <aws/io/event_loop.h> - -#ifdef __GLIBC__ -# define __USE_GNU -#endif - -/* TODO: move this detection to CMAKE and a config header */ -#if !defined(COMPAT_MODE) && defined(__GLIBC__) && __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 9 -# define HAVE_PIPE2 1 -#else -# define HAVE_PIPE2 0 -#endif - -#include <errno.h> -#include <fcntl.h> -#include <unistd.h> - -/* This isn't defined on ancient linux distros (breaking the builds). - * However, if this is a prebuild, we purposely build on an ancient system, but - * we want the kernel calls to still be the same as a modern build since that's likely the target of the application - * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag - * gets passed as long as it does. - */ -#ifndef O_CLOEXEC -# define O_CLOEXEC 02000000 -#endif - -struct read_end_impl { - struct aws_allocator *alloc; - struct aws_io_handle handle; - struct aws_event_loop *event_loop; - aws_pipe_on_readable_fn *on_readable_user_callback; - void *on_readable_user_data; - - /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up. - * If clean_up() sees that the pointer is set, the bool it points to will get set true. */ - bool *did_user_callback_clean_up_read_end; - - bool is_subscribed; -}; - -struct write_request { - struct aws_byte_cursor original_cursor; - struct aws_byte_cursor cursor; /* tracks progress of write */ - size_t num_bytes_written; - aws_pipe_on_write_completed_fn *user_callback; - void *user_data; - struct aws_linked_list_node list_node; - - /* True if the write-end is cleaned up while the user callback is being invoked */ - bool did_user_callback_clean_up_write_end; -}; - -struct write_end_impl { - struct aws_allocator *alloc; - struct aws_io_handle handle; - struct aws_event_loop *event_loop; - struct aws_linked_list write_list; - - /* Valid while invoking user callback on a completed write request. */ - struct write_request *currently_invoking_write_callback; - - bool is_writable; - - /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated write_request around - * and re-using it whenever possible */ -}; - -static void s_write_end_on_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data); - -static int s_translate_posix_error(int err) { - AWS_ASSERT(err); - - switch (err) { - case EPIPE: - return AWS_IO_BROKEN_PIPE; - default: - return AWS_ERROR_SYS_CALL_FAILURE; - } -} - -static int s_raise_posix_error(int err) { - return aws_raise_error(s_translate_posix_error(err)); -} - -AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) { - int err; - -#if HAVE_PIPE2 - err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC); - if (err) { - return s_raise_posix_error(err); - } - - return AWS_OP_SUCCESS; -#else - err = pipe(pipe_fds); - if (err) { - return s_raise_posix_error(err); - } - - for (int i = 0; i < 2; ++i) { - int flags = fcntl(pipe_fds[i], F_GETFL); - if (flags == -1) { - s_raise_posix_error(err); - goto error; - } - - flags |= O_NONBLOCK | O_CLOEXEC; - if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) { - s_raise_posix_error(err); - goto error; - } - } - - return AWS_OP_SUCCESS; -error: - close(pipe_fds[0]); - close(pipe_fds[1]); - return AWS_OP_ERR; -#endif -} - -int aws_pipe_init( - struct aws_pipe_read_end *read_end, - struct aws_event_loop *read_end_event_loop, - struct aws_pipe_write_end *write_end, - struct aws_event_loop *write_end_event_loop, - struct aws_allocator *allocator) { - - AWS_ASSERT(read_end); - AWS_ASSERT(read_end_event_loop); - AWS_ASSERT(write_end); - AWS_ASSERT(write_end_event_loop); - AWS_ASSERT(allocator); - - AWS_ZERO_STRUCT(*read_end); - AWS_ZERO_STRUCT(*write_end); - - struct read_end_impl *read_impl = NULL; - struct write_end_impl *write_impl = NULL; - int err; - - /* Open pipe */ - int pipe_fds[2]; - err = aws_open_nonblocking_posix_pipe(pipe_fds); - if (err) { - return AWS_OP_ERR; - } - - /* Init read-end */ - read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl)); - if (!read_impl) { - goto error; - } - - read_impl->alloc = allocator; - read_impl->handle.data.fd = pipe_fds[0]; - read_impl->event_loop = read_end_event_loop; - - /* Init write-end */ - write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl)); - if (!write_impl) { - goto error; - } - - write_impl->alloc = allocator; - write_impl->handle.data.fd = pipe_fds[1]; - write_impl->event_loop = write_end_event_loop; - write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */ - aws_linked_list_init(&write_impl->write_list); - - read_end->impl_data = read_impl; - write_end->impl_data = write_impl; - - err = aws_event_loop_subscribe_to_io_events( - write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end); - if (err) { - goto error; - } - - return AWS_OP_SUCCESS; - -error: - close(pipe_fds[0]); - close(pipe_fds[1]); - - if (read_impl) { - aws_mem_release(allocator, read_impl); - } - - if (write_impl) { - aws_mem_release(allocator, write_impl); - } - - read_end->impl_data = NULL; - write_end->impl_data = NULL; - - return AWS_OP_ERR; -} - -int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) { - struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (read_impl->is_subscribed) { - int err = aws_pipe_unsubscribe_from_readable_events(read_end); - if (err) { - return AWS_OP_ERR; - } - } - - /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */ - if (read_impl->did_user_callback_clean_up_read_end) { - *read_impl->did_user_callback_clean_up_read_end = true; - } - - close(read_impl->handle.data.fd); - - aws_mem_release(read_impl->alloc, read_impl); - AWS_ZERO_STRUCT(*read_end); - return AWS_OP_SUCCESS; -} - -struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) { - const struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - aws_raise_error(AWS_IO_BROKEN_PIPE); - return NULL; - } - - return read_impl->event_loop; -} - -struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) { - const struct write_end_impl *write_impl = write_end->impl_data; - if (!write_impl) { - aws_raise_error(AWS_IO_BROKEN_PIPE); - return NULL; - } - - return write_impl->event_loop; -} - -int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) { - AWS_ASSERT(dst_buffer && dst_buffer->buffer); - - struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (num_bytes_read) { - *num_bytes_read = 0; - } - - size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len; - - ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read); - - if (read_val < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); - } - return s_raise_posix_error(errno); - } - - /* Success */ - dst_buffer->len += read_val; - - if (num_bytes_read) { - *num_bytes_read = read_val; - } - - return AWS_OP_SUCCESS; -} - -static void s_read_end_on_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - - (void)event_loop; - (void)handle; - - /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */ - struct aws_pipe_read_end *read_end = user_data; - struct read_end_impl *read_impl = read_end->impl_data; - AWS_ASSERT(read_impl); - AWS_ASSERT(read_impl->event_loop == event_loop); - AWS_ASSERT(&read_impl->handle == handle); - AWS_ASSERT(read_impl->is_subscribed); - AWS_ASSERT(events != 0); - AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL); - - /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */ - bool did_user_callback_clean_up_read_end = false; - read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end; - - /* If readable event received, tell user to try and read, even if "error" events have also occurred. */ - if (events & AWS_IO_EVENT_TYPE_READABLE) { - read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data); - - if (did_user_callback_clean_up_read_end) { - return; - } - - events &= ~AWS_IO_EVENT_TYPE_READABLE; - } - - if (events) { - /* Check that user didn't unsubscribe in the previous callback */ - if (read_impl->is_subscribed) { - read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data); - - if (did_user_callback_clean_up_read_end) { - return; - } - } - } - - read_impl->did_user_callback_clean_up_read_end = NULL; -} - -int aws_pipe_subscribe_to_readable_events( - struct aws_pipe_read_end *read_end, - aws_pipe_on_readable_fn *on_readable, - void *user_data) { - - AWS_ASSERT(on_readable); - - struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (read_impl->is_subscribed) { - return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); - } - - read_impl->is_subscribed = true; - read_impl->on_readable_user_callback = on_readable; - read_impl->on_readable_user_data = user_data; - - int err = aws_event_loop_subscribe_to_io_events( - read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end); - if (err) { - read_impl->is_subscribed = false; - read_impl->on_readable_user_callback = NULL; - read_impl->on_readable_user_data = NULL; - - return AWS_OP_ERR; - } - - return AWS_OP_SUCCESS; -} - -int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) { - struct read_end_impl *read_impl = read_end->impl_data; - if (!read_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (!read_impl->is_subscribed) { - return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED); - } - - int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle); - if (err) { - return AWS_OP_ERR; - } - - read_impl->is_subscribed = false; - read_impl->on_readable_user_callback = NULL; - read_impl->on_readable_user_data = NULL; - - return AWS_OP_SUCCESS; -} - -/* Pop front write request, invoke its callback, and delete it. - * Returns whether the callback resulted in the write-end getting cleaned up */ -static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) { - struct write_end_impl *write_impl = write_end->impl_data; - - AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list)); - struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); - struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); - - struct aws_allocator *alloc = write_impl->alloc; - - /* Let the write-end know that a callback is in process, so the write-end can inform the callback - * whether it resulted in clean_up() being called. */ - bool write_end_cleaned_up_during_callback = false; - struct write_request *prev_invoking_request = write_impl->currently_invoking_write_callback; - write_impl->currently_invoking_write_callback = request; - - if (request->user_callback) { - request->user_callback(write_end, error_code, request->original_cursor, request->user_data); - write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end; - } - - if (!write_end_cleaned_up_during_callback) { - write_impl->currently_invoking_write_callback = prev_invoking_request; - } - - aws_mem_release(alloc, request); - - return write_end_cleaned_up_during_callback; -} - -/* Process write requests as long as the pipe remains writable */ -static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) { - struct write_end_impl *write_impl = write_end->impl_data; - AWS_ASSERT(write_impl); - - while (!aws_linked_list_empty(&write_impl->write_list)) { - struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list); - struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); - - int completed_error_code = AWS_ERROR_SUCCESS; - - if (request->cursor.len > 0) { - ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len); - - if (write_val < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - /* The pipe is no longer writable. Bail out */ - write_impl->is_writable = false; - return; - } - - /* A non-recoverable error occurred during this write */ - completed_error_code = s_translate_posix_error(errno); - - } else { - aws_byte_cursor_advance(&request->cursor, write_val); - - if (request->cursor.len > 0) { - /* There was a partial write, loop again to try and write the rest. */ - continue; - } - } - } - - /* If we got this far in the loop, then the write request is complete. - * Note that the callback may result in the pipe being cleaned up. */ - bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code); - if (write_end_cleaned_up) { - /* Bail out! Any remaining requests were canceled during clean_up() */ - return; - } - } -} - -/* Handle events on the write-end's file handle */ -static void s_write_end_on_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - - (void)event_loop; - (void)handle; - - /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */ - struct aws_pipe_write_end *write_end = user_data; - struct write_end_impl *write_impl = write_end->impl_data; - AWS_ASSERT(write_impl); - AWS_ASSERT(write_impl->event_loop == event_loop); - AWS_ASSERT(&write_impl->handle == handle); - - /* Only care about the writable event. */ - if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) { - return; - } - - write_impl->is_writable = true; - - s_write_end_process_requests(write_end); -} - -int aws_pipe_write( - struct aws_pipe_write_end *write_end, - struct aws_byte_cursor src_buffer, - aws_pipe_on_write_completed_fn *on_completed, - void *user_data) { - - AWS_ASSERT(src_buffer.ptr); - - struct write_end_impl *write_impl = write_end->impl_data; - if (!write_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - struct write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct write_request)); - if (!request) { - return AWS_OP_ERR; - } - - request->original_cursor = src_buffer; - request->cursor = src_buffer; - request->user_callback = on_completed; - request->user_data = user_data; - - aws_linked_list_push_back(&write_impl->write_list, &request->list_node); - - /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could - * happen if a this aws_pipe_write() call was made by another write's completion callback */ - if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) { - s_write_end_process_requests(write_end); - } - - return AWS_OP_SUCCESS; -} - -int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) { - struct write_end_impl *write_impl = write_end->impl_data; - if (!write_impl) { - return aws_raise_error(AWS_IO_BROKEN_PIPE); - } - - if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle); - if (err) { - return AWS_OP_ERR; - } - - close(write_impl->handle.data.fd); - - /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */ - AWS_ZERO_STRUCT(*write_end); - - /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */ - if (write_impl->currently_invoking_write_callback) { - write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true; - } - - /* Force any outstanding write requests to complete with an error status. */ - while (!aws_linked_list_empty(&write_impl->write_list)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); - struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); - if (request->user_callback) { - request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data); - } - aws_mem_release(write_impl->alloc, request); - } - - aws_mem_release(write_impl->alloc, write_impl); - return AWS_OP_SUCCESS; -} +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/io/pipe.h> + +#include <aws/io/event_loop.h> + +#ifdef __GLIBC__ +# define __USE_GNU +#endif + +/* TODO: move this detection to CMAKE and a config header */ +#if !defined(COMPAT_MODE) && defined(__GLIBC__) && __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 9 +# define HAVE_PIPE2 1 +#else +# define HAVE_PIPE2 0 +#endif + +#include <errno.h> +#include <fcntl.h> +#include <unistd.h> + +/* This isn't defined on ancient linux distros (breaking the builds). + * However, if this is a prebuild, we purposely build on an ancient system, but + * we want the kernel calls to still be the same as a modern build since that's likely the target of the application + * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag + * gets passed as long as it does. + */ +#ifndef O_CLOEXEC +# define O_CLOEXEC 02000000 +#endif + +struct read_end_impl { + struct aws_allocator *alloc; + struct aws_io_handle handle; + struct aws_event_loop *event_loop; + aws_pipe_on_readable_fn *on_readable_user_callback; + void *on_readable_user_data; + + /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up. + * If clean_up() sees that the pointer is set, the bool it points to will get set true. */ + bool *did_user_callback_clean_up_read_end; + + bool is_subscribed; +}; + +struct write_request { + struct aws_byte_cursor original_cursor; + struct aws_byte_cursor cursor; /* tracks progress of write */ + size_t num_bytes_written; + aws_pipe_on_write_completed_fn *user_callback; + void *user_data; + struct aws_linked_list_node list_node; + + /* True if the write-end is cleaned up while the user callback is being invoked */ + bool did_user_callback_clean_up_write_end; +}; + +struct write_end_impl { + struct aws_allocator *alloc; + struct aws_io_handle handle; + struct aws_event_loop *event_loop; + struct aws_linked_list write_list; + + /* Valid while invoking user callback on a completed write request. */ + struct write_request *currently_invoking_write_callback; + + bool is_writable; + + /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated write_request around + * and re-using it whenever possible */ +}; + +static void s_write_end_on_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data); + +static int s_translate_posix_error(int err) { + AWS_ASSERT(err); + + switch (err) { + case EPIPE: + return AWS_IO_BROKEN_PIPE; + default: + return AWS_ERROR_SYS_CALL_FAILURE; + } +} + +static int s_raise_posix_error(int err) { + return aws_raise_error(s_translate_posix_error(err)); +} + +AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) { + int err; + +#if HAVE_PIPE2 + err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC); + if (err) { + return s_raise_posix_error(err); + } + + return AWS_OP_SUCCESS; +#else + err = pipe(pipe_fds); + if (err) { + return s_raise_posix_error(err); + } + + for (int i = 0; i < 2; ++i) { + int flags = fcntl(pipe_fds[i], F_GETFL); + if (flags == -1) { + s_raise_posix_error(err); + goto error; + } + + flags |= O_NONBLOCK | O_CLOEXEC; + if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) { + s_raise_posix_error(err); + goto error; + } + } + + return AWS_OP_SUCCESS; +error: + close(pipe_fds[0]); + close(pipe_fds[1]); + return AWS_OP_ERR; +#endif +} + +int aws_pipe_init( + struct aws_pipe_read_end *read_end, + struct aws_event_loop *read_end_event_loop, + struct aws_pipe_write_end *write_end, + struct aws_event_loop *write_end_event_loop, + struct aws_allocator *allocator) { + + AWS_ASSERT(read_end); + AWS_ASSERT(read_end_event_loop); + AWS_ASSERT(write_end); + AWS_ASSERT(write_end_event_loop); + AWS_ASSERT(allocator); + + AWS_ZERO_STRUCT(*read_end); + AWS_ZERO_STRUCT(*write_end); + + struct read_end_impl *read_impl = NULL; + struct write_end_impl *write_impl = NULL; + int err; + + /* Open pipe */ + int pipe_fds[2]; + err = aws_open_nonblocking_posix_pipe(pipe_fds); + if (err) { + return AWS_OP_ERR; + } + + /* Init read-end */ + read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl)); + if (!read_impl) { + goto error; + } + + read_impl->alloc = allocator; + read_impl->handle.data.fd = pipe_fds[0]; + read_impl->event_loop = read_end_event_loop; + + /* Init write-end */ + write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl)); + if (!write_impl) { + goto error; + } + + write_impl->alloc = allocator; + write_impl->handle.data.fd = pipe_fds[1]; + write_impl->event_loop = write_end_event_loop; + write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */ + aws_linked_list_init(&write_impl->write_list); + + read_end->impl_data = read_impl; + write_end->impl_data = write_impl; + + err = aws_event_loop_subscribe_to_io_events( + write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end); + if (err) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + close(pipe_fds[0]); + close(pipe_fds[1]); + + if (read_impl) { + aws_mem_release(allocator, read_impl); + } + + if (write_impl) { + aws_mem_release(allocator, write_impl); + } + + read_end->impl_data = NULL; + write_end->impl_data = NULL; + + return AWS_OP_ERR; +} + +int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) { + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (read_impl->is_subscribed) { + int err = aws_pipe_unsubscribe_from_readable_events(read_end); + if (err) { + return AWS_OP_ERR; + } + } + + /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */ + if (read_impl->did_user_callback_clean_up_read_end) { + *read_impl->did_user_callback_clean_up_read_end = true; + } + + close(read_impl->handle.data.fd); + + aws_mem_release(read_impl->alloc, read_impl); + AWS_ZERO_STRUCT(*read_end); + return AWS_OP_SUCCESS; +} + +struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) { + const struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + aws_raise_error(AWS_IO_BROKEN_PIPE); + return NULL; + } + + return read_impl->event_loop; +} + +struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) { + const struct write_end_impl *write_impl = write_end->impl_data; + if (!write_impl) { + aws_raise_error(AWS_IO_BROKEN_PIPE); + return NULL; + } + + return write_impl->event_loop; +} + +int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) { + AWS_ASSERT(dst_buffer && dst_buffer->buffer); + + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (num_bytes_read) { + *num_bytes_read = 0; + } + + size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len; + + ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read); + + if (read_val < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); + } + return s_raise_posix_error(errno); + } + + /* Success */ + dst_buffer->len += read_val; + + if (num_bytes_read) { + *num_bytes_read = read_val; + } + + return AWS_OP_SUCCESS; +} + +static void s_read_end_on_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + (void)handle; + + /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */ + struct aws_pipe_read_end *read_end = user_data; + struct read_end_impl *read_impl = read_end->impl_data; + AWS_ASSERT(read_impl); + AWS_ASSERT(read_impl->event_loop == event_loop); + AWS_ASSERT(&read_impl->handle == handle); + AWS_ASSERT(read_impl->is_subscribed); + AWS_ASSERT(events != 0); + AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL); + + /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */ + bool did_user_callback_clean_up_read_end = false; + read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end; + + /* If readable event received, tell user to try and read, even if "error" events have also occurred. */ + if (events & AWS_IO_EVENT_TYPE_READABLE) { + read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data); + + if (did_user_callback_clean_up_read_end) { + return; + } + + events &= ~AWS_IO_EVENT_TYPE_READABLE; + } + + if (events) { + /* Check that user didn't unsubscribe in the previous callback */ + if (read_impl->is_subscribed) { + read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data); + + if (did_user_callback_clean_up_read_end) { + return; + } + } + } + + read_impl->did_user_callback_clean_up_read_end = NULL; +} + +int aws_pipe_subscribe_to_readable_events( + struct aws_pipe_read_end *read_end, + aws_pipe_on_readable_fn *on_readable, + void *user_data) { + + AWS_ASSERT(on_readable); + + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (read_impl->is_subscribed) { + return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); + } + + read_impl->is_subscribed = true; + read_impl->on_readable_user_callback = on_readable; + read_impl->on_readable_user_data = user_data; + + int err = aws_event_loop_subscribe_to_io_events( + read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end); + if (err) { + read_impl->is_subscribed = false; + read_impl->on_readable_user_callback = NULL; + read_impl->on_readable_user_data = NULL; + + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; +} + +int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) { + struct read_end_impl *read_impl = read_end->impl_data; + if (!read_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (!read_impl->is_subscribed) { + return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED); + } + + int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle); + if (err) { + return AWS_OP_ERR; + } + + read_impl->is_subscribed = false; + read_impl->on_readable_user_callback = NULL; + read_impl->on_readable_user_data = NULL; + + return AWS_OP_SUCCESS; +} + +/* Pop front write request, invoke its callback, and delete it. + * Returns whether the callback resulted in the write-end getting cleaned up */ +static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) { + struct write_end_impl *write_impl = write_end->impl_data; + + AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list)); + struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); + struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); + + struct aws_allocator *alloc = write_impl->alloc; + + /* Let the write-end know that a callback is in process, so the write-end can inform the callback + * whether it resulted in clean_up() being called. */ + bool write_end_cleaned_up_during_callback = false; + struct write_request *prev_invoking_request = write_impl->currently_invoking_write_callback; + write_impl->currently_invoking_write_callback = request; + + if (request->user_callback) { + request->user_callback(write_end, error_code, request->original_cursor, request->user_data); + write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end; + } + + if (!write_end_cleaned_up_during_callback) { + write_impl->currently_invoking_write_callback = prev_invoking_request; + } + + aws_mem_release(alloc, request); + + return write_end_cleaned_up_during_callback; +} + +/* Process write requests as long as the pipe remains writable */ +static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) { + struct write_end_impl *write_impl = write_end->impl_data; + AWS_ASSERT(write_impl); + + while (!aws_linked_list_empty(&write_impl->write_list)) { + struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list); + struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); + + int completed_error_code = AWS_ERROR_SUCCESS; + + if (request->cursor.len > 0) { + ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len); + + if (write_val < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + /* The pipe is no longer writable. Bail out */ + write_impl->is_writable = false; + return; + } + + /* A non-recoverable error occurred during this write */ + completed_error_code = s_translate_posix_error(errno); + + } else { + aws_byte_cursor_advance(&request->cursor, write_val); + + if (request->cursor.len > 0) { + /* There was a partial write, loop again to try and write the rest. */ + continue; + } + } + } + + /* If we got this far in the loop, then the write request is complete. + * Note that the callback may result in the pipe being cleaned up. */ + bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code); + if (write_end_cleaned_up) { + /* Bail out! Any remaining requests were canceled during clean_up() */ + return; + } + } +} + +/* Handle events on the write-end's file handle */ +static void s_write_end_on_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + (void)handle; + + /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */ + struct aws_pipe_write_end *write_end = user_data; + struct write_end_impl *write_impl = write_end->impl_data; + AWS_ASSERT(write_impl); + AWS_ASSERT(write_impl->event_loop == event_loop); + AWS_ASSERT(&write_impl->handle == handle); + + /* Only care about the writable event. */ + if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) { + return; + } + + write_impl->is_writable = true; + + s_write_end_process_requests(write_end); +} + +int aws_pipe_write( + struct aws_pipe_write_end *write_end, + struct aws_byte_cursor src_buffer, + aws_pipe_on_write_completed_fn *on_completed, + void *user_data) { + + AWS_ASSERT(src_buffer.ptr); + + struct write_end_impl *write_impl = write_end->impl_data; + if (!write_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + struct write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct write_request)); + if (!request) { + return AWS_OP_ERR; + } + + request->original_cursor = src_buffer; + request->cursor = src_buffer; + request->user_callback = on_completed; + request->user_data = user_data; + + aws_linked_list_push_back(&write_impl->write_list, &request->list_node); + + /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could + * happen if a this aws_pipe_write() call was made by another write's completion callback */ + if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) { + s_write_end_process_requests(write_end); + } + + return AWS_OP_SUCCESS; +} + +int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) { + struct write_end_impl *write_impl = write_end->impl_data; + if (!write_impl) { + return aws_raise_error(AWS_IO_BROKEN_PIPE); + } + + if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle); + if (err) { + return AWS_OP_ERR; + } + + close(write_impl->handle.data.fd); + + /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */ + AWS_ZERO_STRUCT(*write_end); + + /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */ + if (write_impl->currently_invoking_write_callback) { + write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true; + } + + /* Force any outstanding write requests to complete with an error status. */ + while (!aws_linked_list_empty(&write_impl->write_list)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list); + struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node); + if (request->user_callback) { + request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data); + } + aws_mem_release(write_impl->alloc, request); + } + + aws_mem_release(write_impl->alloc, write_impl); + return AWS_OP_SUCCESS; +} diff --git a/contrib/restricted/aws/aws-c-io/source/posix/shared_library.c b/contrib/restricted/aws/aws-c-io/source/posix/shared_library.c index 751c99bc23..6261ea9ea8 100644 --- a/contrib/restricted/aws/aws-c-io/source/posix/shared_library.c +++ b/contrib/restricted/aws/aws-c-io/source/posix/shared_library.c @@ -1,66 +1,66 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include <aws/io/shared_library.h> - -#include <aws/io/logging.h> - -#include <dlfcn.h> - -static const char *s_null = "<NULL>"; -static const char *s_unknown_error = "<Unknown>"; - -int aws_shared_library_init(struct aws_shared_library *library, const char *library_path) { - AWS_ZERO_STRUCT(*library); - - library->library_handle = dlopen(library_path, RTLD_LAZY); - if (library->library_handle == NULL) { - const char *error = dlerror(); - AWS_LOGF_ERROR( - AWS_LS_IO_SHARED_LIBRARY, - "id=%p: Failed to load shared library at path \"%s\" with error: %s", - (void *)library, - library_path ? library_path : s_null, - error ? error : s_unknown_error); - return aws_raise_error(AWS_IO_SHARED_LIBRARY_LOAD_FAILURE); - } - - return AWS_OP_SUCCESS; -} - -void aws_shared_library_clean_up(struct aws_shared_library *library) { - if (library && library->library_handle) { - dlclose(library->library_handle); - library->library_handle = NULL; - } -} - -int aws_shared_library_find_function( - struct aws_shared_library *library, - const char *symbol_name, - aws_generic_function *function_address) { - if (library == NULL || library->library_handle == NULL) { - return aws_raise_error(AWS_IO_SHARED_LIBRARY_FIND_SYMBOL_FAILURE); - } - - /* - * Suggested work around for (undefined behavior) cast from void * to function pointer - * in POSIX.1-2003 standard, at least according to dlsym man page code sample. - */ - *(void **)(function_address) = dlsym(library->library_handle, symbol_name); - - if (*function_address == NULL) { - const char *error = dlerror(); - AWS_LOGF_ERROR( - AWS_LS_IO_SHARED_LIBRARY, - "id=%p: Failed to find shared library symbol \"%s\" with error: %s", - (void *)library, - symbol_name ? symbol_name : s_null, - error ? error : s_unknown_error); - return aws_raise_error(AWS_IO_SHARED_LIBRARY_FIND_SYMBOL_FAILURE); - } - - return AWS_OP_SUCCESS; -} +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/io/shared_library.h> + +#include <aws/io/logging.h> + +#include <dlfcn.h> + +static const char *s_null = "<NULL>"; +static const char *s_unknown_error = "<Unknown>"; + +int aws_shared_library_init(struct aws_shared_library *library, const char *library_path) { + AWS_ZERO_STRUCT(*library); + + library->library_handle = dlopen(library_path, RTLD_LAZY); + if (library->library_handle == NULL) { + const char *error = dlerror(); + AWS_LOGF_ERROR( + AWS_LS_IO_SHARED_LIBRARY, + "id=%p: Failed to load shared library at path \"%s\" with error: %s", + (void *)library, + library_path ? library_path : s_null, + error ? error : s_unknown_error); + return aws_raise_error(AWS_IO_SHARED_LIBRARY_LOAD_FAILURE); + } + + return AWS_OP_SUCCESS; +} + +void aws_shared_library_clean_up(struct aws_shared_library *library) { + if (library && library->library_handle) { + dlclose(library->library_handle); + library->library_handle = NULL; + } +} + +int aws_shared_library_find_function( + struct aws_shared_library *library, + const char *symbol_name, + aws_generic_function *function_address) { + if (library == NULL || library->library_handle == NULL) { + return aws_raise_error(AWS_IO_SHARED_LIBRARY_FIND_SYMBOL_FAILURE); + } + + /* + * Suggested work around for (undefined behavior) cast from void * to function pointer + * in POSIX.1-2003 standard, at least according to dlsym man page code sample. + */ + *(void **)(function_address) = dlsym(library->library_handle, symbol_name); + + if (*function_address == NULL) { + const char *error = dlerror(); + AWS_LOGF_ERROR( + AWS_LS_IO_SHARED_LIBRARY, + "id=%p: Failed to find shared library symbol \"%s\" with error: %s", + (void *)library, + symbol_name ? symbol_name : s_null, + error ? error : s_unknown_error); + return aws_raise_error(AWS_IO_SHARED_LIBRARY_FIND_SYMBOL_FAILURE); + } + + return AWS_OP_SUCCESS; +} diff --git a/contrib/restricted/aws/aws-c-io/source/posix/socket.c b/contrib/restricted/aws/aws-c-io/source/posix/socket.c index 5f11cdff52..7ac30b39c2 100644 --- a/contrib/restricted/aws/aws-c-io/source/posix/socket.c +++ b/contrib/restricted/aws/aws-c-io/source/posix/socket.c @@ -1,1777 +1,1777 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include <aws/io/socket.h> - -#include <aws/common/clock.h> -#include <aws/common/condition_variable.h> -#include <aws/common/mutex.h> -#include <aws/common/string.h> - -#include <aws/io/event_loop.h> -#include <aws/io/logging.h> - -#include <arpa/inet.h> -#include <aws/io/io.h> -#include <errno.h> -#include <fcntl.h> -#include <netinet/tcp.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <unistd.h> - -#if defined(__MACH__) -# define NO_SIGNAL SO_NOSIGPIPE -# define TCP_KEEPIDLE TCP_KEEPALIVE -#else -# define NO_SIGNAL MSG_NOSIGNAL -#endif - -/* This isn't defined on ancient linux distros (breaking the builds). - * However, if this is a prebuild, we purposely build on an ancient system, but - * we want the kernel calls to still be the same as a modern build since that's likely the target of the application - * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag - * gets passed as long as it does. - */ -#ifndef O_CLOEXEC -# define O_CLOEXEC 02000000 -#endif - -#ifdef USE_VSOCK -# if defined(__linux__) && defined(AF_VSOCK) -# include <linux/vm_sockets.h> -# else -# error "USE_VSOCK not supported on current platform" -# endif -#endif - -/* other than CONNECTED_READ | CONNECTED_WRITE - * a socket is only in one of these states at a time. */ -enum socket_state { - INIT = 0x01, - CONNECTING = 0x02, - CONNECTED_READ = 0x04, - CONNECTED_WRITE = 0x08, - BOUND = 0x10, - LISTENING = 0x20, - TIMEDOUT = 0x40, - ERROR = 0x80, - CLOSED, -}; - -static int s_convert_domain(enum aws_socket_domain domain) { - switch (domain) { - case AWS_SOCKET_IPV4: - return AF_INET; - case AWS_SOCKET_IPV6: - return AF_INET6; - case AWS_SOCKET_LOCAL: - return AF_UNIX; -#ifdef USE_VSOCK - case AWS_SOCKET_VSOCK: - return AF_VSOCK; -#endif - default: - AWS_ASSERT(0); - return AF_INET; - } -} - -static int s_convert_type(enum aws_socket_type type) { - switch (type) { - case AWS_SOCKET_STREAM: - return SOCK_STREAM; - case AWS_SOCKET_DGRAM: - return SOCK_DGRAM; - default: - AWS_ASSERT(0); - return SOCK_STREAM; - } -} - -static int s_determine_socket_error(int error) { - switch (error) { - case ECONNREFUSED: - return AWS_IO_SOCKET_CONNECTION_REFUSED; - case ETIMEDOUT: - return AWS_IO_SOCKET_TIMEOUT; - case EHOSTUNREACH: - case ENETUNREACH: - return AWS_IO_SOCKET_NO_ROUTE_TO_HOST; - case EADDRNOTAVAIL: - return AWS_IO_SOCKET_INVALID_ADDRESS; - case ENETDOWN: - return AWS_IO_SOCKET_NETWORK_DOWN; - case ECONNABORTED: - return AWS_IO_SOCKET_CONNECT_ABORTED; - case EADDRINUSE: - return AWS_IO_SOCKET_ADDRESS_IN_USE; - case ENOBUFS: - case ENOMEM: - return AWS_ERROR_OOM; - case EAGAIN: - return AWS_IO_READ_WOULD_BLOCK; - case EMFILE: - case ENFILE: - return AWS_ERROR_MAX_FDS_EXCEEDED; - case ENOENT: - case EINVAL: - return AWS_ERROR_FILE_INVALID_PATH; - case EAFNOSUPPORT: - return AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY; - case EACCES: - return AWS_ERROR_NO_PERMISSION; - default: - return AWS_IO_SOCKET_NOT_CONNECTED; - } -} - -static int s_create_socket(struct aws_socket *sock, const struct aws_socket_options *options) { - - int fd = socket(s_convert_domain(options->domain), s_convert_type(options->type), 0); - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: initializing with domain %d and type %d", - (void *)sock, - fd, - options->domain, - options->type); - if (fd != -1) { - int flags = fcntl(fd, F_GETFL, 0); - flags |= O_NONBLOCK | O_CLOEXEC; - int success = fcntl(fd, F_SETFL, flags); - (void)success; - sock->io_handle.data.fd = fd; - sock->io_handle.additional_data = NULL; - return aws_socket_set_options(sock, options); - } - - int aws_error = s_determine_socket_error(errno); - return aws_raise_error(aws_error); -} - -struct posix_socket_connect_args { - struct aws_task task; - struct aws_allocator *allocator; - struct aws_socket *socket; -}; - -struct posix_socket { - struct aws_linked_list write_queue; - struct posix_socket_connect_args *connect_args; - bool write_in_progress; - bool currently_subscribed; - bool continue_accept; - bool currently_in_event; - bool clean_yourself_up; - bool *close_happened; -}; - -static int s_socket_init( - struct aws_socket *socket, - struct aws_allocator *alloc, - const struct aws_socket_options *options, - int existing_socket_fd) { - AWS_ASSERT(options); - AWS_ZERO_STRUCT(*socket); - - struct posix_socket *posix_socket = aws_mem_calloc(alloc, 1, sizeof(struct posix_socket)); - if (!posix_socket) { - socket->impl = NULL; - return AWS_OP_ERR; - } - - socket->allocator = alloc; - socket->io_handle.data.fd = -1; - socket->state = INIT; - socket->options = *options; - - if (existing_socket_fd < 0) { - int err = s_create_socket(socket, options); - if (err) { - aws_mem_release(alloc, posix_socket); - socket->impl = NULL; - return AWS_OP_ERR; - } - } else { - socket->io_handle = (struct aws_io_handle){ - .data = {.fd = existing_socket_fd}, - .additional_data = NULL, - }; - aws_socket_set_options(socket, options); - } - - aws_linked_list_init(&posix_socket->write_queue); - posix_socket->write_in_progress = false; - posix_socket->currently_subscribed = false; - posix_socket->continue_accept = false; - posix_socket->currently_in_event = false; - posix_socket->clean_yourself_up = false; - posix_socket->connect_args = NULL; - posix_socket->close_happened = NULL; - socket->impl = posix_socket; - return AWS_OP_SUCCESS; -} - -int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) { - AWS_ASSERT(options); - return s_socket_init(socket, alloc, options, -1); -} - -void aws_socket_clean_up(struct aws_socket *socket) { - if (!socket->impl) { - /* protect from double clean */ - return; - } - if (aws_socket_is_open(socket)) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p fd=%d: is still open, closing...", (void *)socket, socket->io_handle.data.fd); - aws_socket_close(socket); - } - struct posix_socket *socket_impl = socket->impl; - - if (!socket_impl->currently_in_event) { - aws_mem_release(socket->allocator, socket->impl); - } else { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: is still pending io letting it dangle and cleaning up later.", - (void *)socket, - socket->io_handle.data.fd); - socket_impl->clean_yourself_up = true; - } - - AWS_ZERO_STRUCT(*socket); - socket->io_handle.data.fd = -1; -} - -static void s_on_connection_error(struct aws_socket *socket, int error); - -static int s_on_connection_success(struct aws_socket *socket) { - - struct aws_event_loop *event_loop = socket->event_loop; - struct posix_socket *socket_impl = socket->impl; - - if (socket_impl->currently_subscribed) { - aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); - socket_impl->currently_subscribed = false; - } - - socket->event_loop = NULL; - - int connect_result; - socklen_t result_length = sizeof(connect_result); - - if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to determine connection error %d", - (void *)socket, - socket->io_handle.data.fd, - errno); - int aws_error = s_determine_socket_error(errno); - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - return AWS_OP_ERR; - } - - if (connect_result) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connection error %d", - (void *)socket, - socket->io_handle.data.fd, - connect_result); - int aws_error = s_determine_socket_error(connect_result); - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - return AWS_OP_ERR; - } - - AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd); - - struct sockaddr_storage address; - AWS_ZERO_STRUCT(address); - socklen_t address_size = sizeof(address); - if (!getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size)) { - uint16_t port = 0; - - if (address.ss_family == AF_INET) { - struct sockaddr_in *s = (struct sockaddr_in *)&address; - port = ntohs(s->sin_port); - /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal - * once we add logging, we can log this if it fails. */ - if (inet_ntop( - AF_INET, &s->sin_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: local endpoint %s:%d", - (void *)socket, - socket->io_handle.data.fd, - socket->local_endpoint.address, - port); - } else { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: determining local endpoint failed", - (void *)socket, - socket->io_handle.data.fd); - } - } else if (address.ss_family == AF_INET6) { - struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address; - port = ntohs(s->sin6_port); - /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal - * once we add logging, we can log this if it fails. */ - if (inet_ntop( - AF_INET6, &s->sin6_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd %d: local endpoint %s:%d", - (void *)socket, - socket->io_handle.data.fd, - socket->local_endpoint.address, - port); - } else { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: determining local endpoint failed", - (void *)socket, - socket->io_handle.data.fd); - } - } - - socket->local_endpoint.port = port; - } else { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: getsockname() failed with error %d", - (void *)socket, - socket->io_handle.data.fd, - errno); - int aws_error = s_determine_socket_error(errno); - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - return AWS_OP_ERR; - } - - socket->state = CONNECTED_WRITE | CONNECTED_READ; - - if (aws_socket_assign_to_event_loop(socket, event_loop)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: assignment to event loop %p failed with error %d", - (void *)socket, - socket->io_handle.data.fd, - (void *)event_loop, - aws_last_error()); - s_on_connection_error(socket, aws_last_error()); - return AWS_OP_ERR; - } - - socket->connection_result_fn(socket, AWS_ERROR_SUCCESS, socket->connect_accept_user_data); - - return AWS_OP_SUCCESS; -} - -static void s_on_connection_error(struct aws_socket *socket, int error) { - socket->state = ERROR; - AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection failure", (void *)socket, socket->io_handle.data.fd); - if (socket->connection_result_fn) { - socket->connection_result_fn(socket, error, socket->connect_accept_user_data); - } else if (socket->accept_result_fn) { - socket->accept_result_fn(socket, error, NULL, socket->connect_accept_user_data); - } -} - -/* the next two callbacks compete based on which one runs first. if s_socket_connect_event - * comes back first, then we set socket_args->socket = NULL and continue on with the connection. - * if s_handle_socket_timeout() runs first, is sees socket_args->socket is NULL and just cleans up its memory. - * s_handle_socket_timeout() will always run so the memory for socket_connect_args is always cleaned up there. */ -static void s_socket_connect_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - - (void)event_loop; - (void)handle; - - struct posix_socket_connect_args *socket_args = (struct posix_socket_connect_args *)user_data; - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "fd=%d: connection activity handler triggered ", handle->data.fd); - - if (socket_args->socket) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: has not timed out yet proceeding with connection.", - (void *)socket_args->socket, - handle->data.fd); - - struct posix_socket *socket_impl = socket_args->socket->impl; - if (!(events & AWS_IO_EVENT_TYPE_ERROR || events & AWS_IO_EVENT_TYPE_CLOSED) && - (events & AWS_IO_EVENT_TYPE_READABLE || events & AWS_IO_EVENT_TYPE_WRITABLE)) { - struct aws_socket *socket = socket_args->socket; - socket_args->socket = NULL; - socket_impl->connect_args = NULL; - s_on_connection_success(socket); - return; - } - - int aws_error = aws_socket_get_error(socket_args->socket); - /* we'll get another notification. */ - if (aws_error == AWS_IO_READ_WOULD_BLOCK) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: spurious event, waiting for another notification.", - (void *)socket_args->socket, - handle->data.fd); - return; - } - - struct aws_socket *socket = socket_args->socket; - socket_args->socket = NULL; - socket_impl->connect_args = NULL; - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - } -} - -static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) { - (void)task; - (void)status; - - struct posix_socket_connect_args *socket_args = args; - - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task); - /* successful connection will have nulled out connect_args->socket */ - if (socket_args->socket) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: timed out, shutting down.", - (void *)socket_args->socket, - socket_args->socket->io_handle.data.fd); - - socket_args->socket->state = TIMEDOUT; - int error_code = AWS_IO_SOCKET_TIMEOUT; - - if (status == AWS_TASK_STATUS_RUN_READY) { - aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle); - } else { - error_code = AWS_IO_EVENT_LOOP_SHUTDOWN; - aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle); - } - socket_args->socket->event_loop = NULL; - struct posix_socket *socket_impl = socket_args->socket->impl; - socket_impl->currently_subscribed = false; - aws_raise_error(error_code); - struct aws_socket *socket = socket_args->socket; - /*socket close sets socket_args->socket to NULL and - * socket_impl->connect_args to NULL. */ - aws_socket_close(socket); - s_on_connection_error(socket, error_code); - } - - aws_mem_release(socket_args->allocator, socket_args); -} - -/* this is used simply for moving a connect_success callback when the connect finished immediately - * (like for unix domain sockets) into the event loop's thread. Also note, in that case there was no - * timeout task scheduled, so in this case the socket_args are cleaned up. */ -static void s_run_connect_success(struct aws_task *task, void *arg, enum aws_task_status status) { - (void)task; - struct posix_socket_connect_args *socket_args = arg; - - if (socket_args->socket) { - struct posix_socket *socket_impl = socket_args->socket->impl; - if (status == AWS_TASK_STATUS_RUN_READY) { - s_on_connection_success(socket_args->socket); - } else { - aws_raise_error(AWS_IO_SOCKET_CONNECT_ABORTED); - socket_args->socket->event_loop = NULL; - s_on_connection_error(socket_args->socket, AWS_IO_SOCKET_CONNECT_ABORTED); - } - socket_impl->connect_args = NULL; - } - - aws_mem_release(socket_args->allocator, socket_args); -} - -static inline int s_convert_pton_error(int pton_code) { - if (pton_code == 0) { - return AWS_IO_SOCKET_INVALID_ADDRESS; - } - - return s_determine_socket_error(errno); -} - -struct socket_address { - union sock_addr_types { - struct sockaddr_in addr_in; - struct sockaddr_in6 addr_in6; - struct sockaddr_un un_addr; -#ifdef USE_VSOCK - struct sockaddr_vm vm_addr; -#endif - } sock_addr_types; -}; - -#ifdef USE_VSOCK -/** Convert a string to a VSOCK CID. Respects the calling convetion of inet_pton: - * 0 on error, 1 on success. */ -static int parse_cid(const char *cid_str, unsigned int *value) { - if (cid_str == NULL || value == NULL) { - errno = EINVAL; - return 0; - } - /* strtoll returns 0 as both error and correct value */ - errno = 0; - /* unsigned long long to handle edge cases in convention explicitly */ - long long cid = strtoll(cid_str, NULL, 10); - if (errno != 0) { - return 0; - } - - /* -1U means any, so it's a valid value, but it needs to be converted to - * unsigned int. */ - if (cid == -1) { - *value = VMADDR_CID_ANY; - return 1; - } - - if (cid < 0 || cid > UINT_MAX) { - errno = ERANGE; - return 0; - } - - /* cast is safe here, edge cases already checked */ - *value = (unsigned int)cid; - return 1; -} -#endif - -int aws_socket_connect( - struct aws_socket *socket, - const struct aws_socket_endpoint *remote_endpoint, - struct aws_event_loop *event_loop, - aws_socket_on_connection_result_fn *on_connection_result, - void *user_data) { - AWS_ASSERT(event_loop); - AWS_ASSERT(!socket->event_loop); - - AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd); - - if (socket->event_loop) { - return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); - } - - if (socket->options.type != AWS_SOCKET_DGRAM) { - AWS_ASSERT(on_connection_result); - if (socket->state != INIT) { - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - } else { /* UDP socket */ - /* UDP sockets jump to CONNECT_READ if bind is called first */ - if (socket->state != CONNECTED_READ && socket->state != INIT) { - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - } - - size_t address_strlen; - if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { - return AWS_OP_ERR; - } - - struct socket_address address; - AWS_ZERO_STRUCT(address); - socklen_t sock_size = 0; - int pton_err = 1; - if (socket->options.domain == AWS_SOCKET_IPV4) { - pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); - address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port); - address.sock_addr_types.addr_in.sin_family = AF_INET; - sock_size = sizeof(address.sock_addr_types.addr_in); - } else if (socket->options.domain == AWS_SOCKET_IPV6) { - pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); - address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port); - address.sock_addr_types.addr_in6.sin6_family = AF_INET6; - sock_size = sizeof(address.sock_addr_types.addr_in6); - } else if (socket->options.domain == AWS_SOCKET_LOCAL) { - address.sock_addr_types.un_addr.sun_family = AF_UNIX; - strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN); - sock_size = sizeof(address.sock_addr_types.un_addr); -#ifdef USE_VSOCK - } else if (socket->options.domain == AWS_SOCKET_VSOCK) { - pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); - address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; - address.sock_addr_types.vm_addr.svm_port = (unsigned int)remote_endpoint->port; - sock_size = sizeof(address.sock_addr_types.vm_addr); -#endif - } else { - AWS_ASSERT(0); - return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); - } - - if (pton_err != 1) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to parse address %s:%d.", - (void *)socket, - socket->io_handle.data.fd, - remote_endpoint->address, - (int)remote_endpoint->port); - return aws_raise_error(s_convert_pton_error(pton_err)); - } - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connecting to endpoint %s:%d.", - (void *)socket, - socket->io_handle.data.fd, - remote_endpoint->address, - (int)remote_endpoint->port); - - socket->state = CONNECTING; - socket->remote_endpoint = *remote_endpoint; - socket->connect_accept_user_data = user_data; - socket->connection_result_fn = on_connection_result; - - struct posix_socket *socket_impl = socket->impl; - - socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args)); - if (!socket_impl->connect_args) { - return AWS_OP_ERR; - } - - socket_impl->connect_args->socket = socket; - socket_impl->connect_args->allocator = socket->allocator; - - socket_impl->connect_args->task.fn = s_handle_socket_timeout; - socket_impl->connect_args->task.arg = socket_impl->connect_args; - - int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); - socket->event_loop = event_loop; - - if (!error_code) { - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connected immediately, not scheduling timeout.", - (void *)socket, - socket->io_handle.data.fd); - socket_impl->connect_args->task.fn = s_run_connect_success; - /* the subscription for IO will happen once we setup the connection in the task. Since we already - * know the connection succeeded, we don't need to register for events yet. */ - aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task); - } - - if (error_code) { - error_code = errno; - if (error_code == EINPROGRESS || error_code == EALREADY) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.", - (void *)socket, - socket->io_handle.data.fd); - /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately - * and null out the connect args */ - struct aws_task *timeout_task = &socket_impl->connect_args->task; - - socket_impl->currently_subscribed = true; - /* This event is for when the connection finishes. (the fd will flip writable). */ - if (aws_event_loop_subscribe_to_io_events( - event_loop, - &socket->io_handle, - AWS_IO_EVENT_TYPE_WRITABLE, - s_socket_connect_event, - socket_impl->connect_args)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to register with event-loop %p.", - (void *)socket, - socket->io_handle.data.fd, - (void *)event_loop); - socket_impl->currently_subscribed = false; - socket->event_loop = NULL; - goto err_clean_up; - } - - /* schedule a task to run at the connect timeout interval, if this task runs before the connect - * happens, we consider that a timeout. */ - uint64_t timeout = 0; - aws_event_loop_current_clock_time(event_loop, &timeout); - timeout += aws_timestamp_convert( - socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: scheduling timeout task for %llu.", - (void *)socket, - socket->io_handle.data.fd, - (unsigned long long)timeout); - aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout); - } else { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connect failed with error code %d.", - (void *)socket, - socket->io_handle.data.fd, - error_code); - int aws_error = s_determine_socket_error(error_code); - aws_raise_error(aws_error); - socket->event_loop = NULL; - socket_impl->currently_subscribed = false; - goto err_clean_up; - } - } - return AWS_OP_SUCCESS; - -err_clean_up: - aws_mem_release(socket->allocator, socket_impl->connect_args); - socket_impl->connect_args = NULL; - return AWS_OP_ERR; -} - -int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint) { - if (socket->state != INIT) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: invalid state for bind operation.", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - size_t address_strlen; - if (aws_secure_strlen(local_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { - return AWS_OP_ERR; - } - - int error_code = -1; - - socket->local_endpoint = *local_endpoint; - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: binding to %s:%d.", - (void *)socket, - socket->io_handle.data.fd, - local_endpoint->address, - (int)local_endpoint->port); - - struct socket_address address; - AWS_ZERO_STRUCT(address); - socklen_t sock_size = 0; - int pton_err = 1; - if (socket->options.domain == AWS_SOCKET_IPV4) { - pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); - address.sock_addr_types.addr_in.sin_port = htons(local_endpoint->port); - address.sock_addr_types.addr_in.sin_family = AF_INET; - sock_size = sizeof(address.sock_addr_types.addr_in); - } else if (socket->options.domain == AWS_SOCKET_IPV6) { - pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); - address.sock_addr_types.addr_in6.sin6_port = htons(local_endpoint->port); - address.sock_addr_types.addr_in6.sin6_family = AF_INET6; - sock_size = sizeof(address.sock_addr_types.addr_in6); - } else if (socket->options.domain == AWS_SOCKET_LOCAL) { - address.sock_addr_types.un_addr.sun_family = AF_UNIX; - strncpy(address.sock_addr_types.un_addr.sun_path, local_endpoint->address, AWS_ADDRESS_MAX_LEN); - sock_size = sizeof(address.sock_addr_types.un_addr); -#ifdef USE_VSOCK - } else if (socket->options.domain == AWS_SOCKET_VSOCK) { - pton_err = parse_cid(local_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); - address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; - address.sock_addr_types.vm_addr.svm_port = (unsigned int)local_endpoint->port; - sock_size = sizeof(address.sock_addr_types.vm_addr); -#endif - } else { - AWS_ASSERT(0); - return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); - } - - if (pton_err != 1) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to parse address %s:%d.", - (void *)socket, - socket->io_handle.data.fd, - local_endpoint->address, - (int)local_endpoint->port); - return aws_raise_error(s_convert_pton_error(pton_err)); - } - - error_code = bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); - - if (!error_code) { - if (socket->options.type == AWS_SOCKET_STREAM) { - socket->state = BOUND; - } else { - /* e.g. UDP is now readable */ - socket->state = CONNECTED_READ; - } - AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully bound", (void *)socket, socket->io_handle.data.fd); - - return AWS_OP_SUCCESS; - } - - socket->state = ERROR; - error_code = errno; - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: bind failed with error code %d", - (void *)socket, - socket->io_handle.data.fd, - error_code); - - int aws_error = s_determine_socket_error(error_code); - return aws_raise_error(aws_error); -} - -int aws_socket_listen(struct aws_socket *socket, int backlog_size) { - if (socket->state != BOUND) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: invalid state for listen operation. You must call bind first.", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - int error_code = listen(socket->io_handle.data.fd, backlog_size); - - if (!error_code) { - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully listening", (void *)socket, socket->io_handle.data.fd); - socket->state = LISTENING; - return AWS_OP_SUCCESS; - } - - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: listen failed with error code %d", - (void *)socket, - socket->io_handle.data.fd, - error_code); - error_code = errno; - socket->state = ERROR; - - return aws_raise_error(s_determine_socket_error(error_code)); -} - -/* this is called by the event loop handler that was installed in start_accept(). It runs once the FD goes readable, - * accepts as many as it can and then returns control to the event loop. */ -static void s_socket_accept_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - - (void)event_loop; - - struct aws_socket *socket = user_data; - struct posix_socket *socket_impl = socket->impl; - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd); - - if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) { - int in_fd = 0; - while (socket_impl->continue_accept && in_fd != -1) { - struct sockaddr_storage in_addr; - socklen_t in_len = sizeof(struct sockaddr_storage); - - in_fd = accept(handle->data.fd, (struct sockaddr *)&in_addr, &in_len); - if (in_fd == -1) { - int error = errno; - - if (error == EAGAIN || error == EWOULDBLOCK) { - break; - } - - int aws_error = aws_socket_get_error(socket); - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - break; - } - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p fd=%d: incoming connection", (void *)socket, socket->io_handle.data.fd); - - struct aws_socket *new_sock = aws_mem_acquire(socket->allocator, sizeof(struct aws_socket)); - - if (!new_sock) { - close(in_fd); - s_on_connection_error(socket, aws_last_error()); - continue; - } - - if (s_socket_init(new_sock, socket->allocator, &socket->options, in_fd)) { - aws_mem_release(socket->allocator, new_sock); - s_on_connection_error(socket, aws_last_error()); - continue; - } - - new_sock->local_endpoint = socket->local_endpoint; - new_sock->state = CONNECTED_READ | CONNECTED_WRITE; - uint16_t port = 0; - - /* get the info on the incoming socket's address */ - if (in_addr.ss_family == AF_INET) { - struct sockaddr_in *s = (struct sockaddr_in *)&in_addr; - port = ntohs(s->sin_port); - /* this came from the kernel, a.) it won't fail. b.) even if it does - * its not fatal. come back and add logging later. */ - if (!inet_ntop( - AF_INET, - &s->sin_addr, - new_sock->remote_endpoint.address, - sizeof(new_sock->remote_endpoint.address))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d:. Failed to determine remote address.", - (void *)socket, - socket->io_handle.data.fd) - } - new_sock->options.domain = AWS_SOCKET_IPV4; - } else if (in_addr.ss_family == AF_INET6) { - /* this came from the kernel, a.) it won't fail. b.) even if it does - * its not fatal. come back and add logging later. */ - struct sockaddr_in6 *s = (struct sockaddr_in6 *)&in_addr; - port = ntohs(s->sin6_port); - if (!inet_ntop( - AF_INET6, - &s->sin6_addr, - new_sock->remote_endpoint.address, - sizeof(new_sock->remote_endpoint.address))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d:. Failed to determine remote address.", - (void *)socket, - socket->io_handle.data.fd) - } - new_sock->options.domain = AWS_SOCKET_IPV6; - } else if (in_addr.ss_family == AF_UNIX) { - new_sock->remote_endpoint = socket->local_endpoint; - new_sock->options.domain = AWS_SOCKET_LOCAL; - } - - new_sock->remote_endpoint.port = port; - - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connected to %s:%d, incoming fd %d", - (void *)socket, - socket->io_handle.data.fd, - new_sock->remote_endpoint.address, - new_sock->remote_endpoint.port, - in_fd); - - int flags = fcntl(in_fd, F_GETFL, 0); - - flags |= O_NONBLOCK | O_CLOEXEC; - fcntl(in_fd, F_SETFL, flags); - - bool close_occurred = false; - socket_impl->close_happened = &close_occurred; - socket->accept_result_fn(socket, AWS_ERROR_SUCCESS, new_sock, socket->connect_accept_user_data); - - if (close_occurred) { - return; - } - - socket_impl->close_happened = NULL; - } - } - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: finished processing incoming connections, " - "waiting on event-loop notification", - (void *)socket, - socket->io_handle.data.fd); -} - -int aws_socket_start_accept( - struct aws_socket *socket, - struct aws_event_loop *accept_loop, - aws_socket_on_accept_result_fn *on_accept_result, - void *user_data) { - AWS_ASSERT(on_accept_result); - AWS_ASSERT(accept_loop); - - if (socket->event_loop) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: is already assigned to event-loop %p.", - (void *)socket, - socket->io_handle.data.fd, - (void *)socket->event_loop); - return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); - } - - if (socket->state != LISTENING) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: invalid state for start_accept operation. You must call listen first.", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - socket->accept_result_fn = on_accept_result; - socket->connect_accept_user_data = user_data; - socket->event_loop = accept_loop; - struct posix_socket *socket_impl = socket->impl; - socket_impl->continue_accept = true; - socket_impl->currently_subscribed = true; - - if (aws_event_loop_subscribe_to_io_events( - socket->event_loop, &socket->io_handle, AWS_IO_EVENT_TYPE_READABLE, s_socket_accept_event, socket)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to subscribe to event-loop %p.", - (void *)socket, - socket->io_handle.data.fd, - (void *)socket->event_loop); - socket_impl->continue_accept = false; - socket_impl->currently_subscribed = false; - socket->event_loop = NULL; - - return AWS_OP_ERR; - } - - return AWS_OP_SUCCESS; -} - -struct stop_accept_args { - struct aws_task task; - struct aws_mutex mutex; - struct aws_condition_variable condition_variable; - struct aws_socket *socket; - int ret_code; - bool invoked; -}; - -static bool s_stop_accept_pred(void *arg) { - struct stop_accept_args *stop_accept_args = arg; - return stop_accept_args->invoked; -} - -static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) { - (void)task; - (void)status; - - struct stop_accept_args *stop_accept_args = arg; - aws_mutex_lock(&stop_accept_args->mutex); - stop_accept_args->ret_code = AWS_OP_SUCCESS; - if (aws_socket_stop_accept(stop_accept_args->socket)) { - stop_accept_args->ret_code = aws_last_error(); - } - stop_accept_args->invoked = true; - aws_condition_variable_notify_one(&stop_accept_args->condition_variable); - aws_mutex_unlock(&stop_accept_args->mutex); -} - -int aws_socket_stop_accept(struct aws_socket *socket) { - if (socket->state != LISTENING) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: is not in a listening state, can't stop_accept.", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, "id=%p fd=%d: stopping accepting new connections", (void *)socket, socket->io_handle.data.fd); - - if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { - struct stop_accept_args args = {.mutex = AWS_MUTEX_INIT, - .condition_variable = AWS_CONDITION_VARIABLE_INIT, - .invoked = false, - .socket = socket, - .ret_code = AWS_OP_SUCCESS, - .task = {.fn = s_stop_accept_task}}; - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: stopping accepting new connections from a different thread than " - "the socket is running from. Blocking until it shuts down.", - (void *)socket, - socket->io_handle.data.fd); - /* Look.... I know what I'm doing.... trust me, I'm an engineer. - * We wait on the completion before 'args' goes out of scope. - * NOLINTNEXTLINE */ - args.task.arg = &args; - aws_mutex_lock(&args.mutex); - aws_event_loop_schedule_task_now(socket->event_loop, &args.task); - aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_stop_accept_pred, &args); - aws_mutex_unlock(&args.mutex); - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: stop accept task finished running.", - (void *)socket, - socket->io_handle.data.fd); - - if (args.ret_code) { - return aws_raise_error(args.ret_code); - } - return AWS_OP_SUCCESS; - } - - int ret_val = AWS_OP_SUCCESS; - struct posix_socket *socket_impl = socket->impl; - if (socket_impl->currently_subscribed) { - ret_val = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); - socket_impl->currently_subscribed = false; - socket_impl->continue_accept = false; - socket->event_loop = NULL; - } - - return ret_val; -} - -int aws_socket_set_options(struct aws_socket *socket, const struct aws_socket_options *options) { - if (socket->options.domain != options->domain || socket->options.type != options->type) { - return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); - } - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setting socket options to: keep-alive %d, keep idle %d, keep-alive interval %d, keep-alive probe " - "count %d.", - (void *)socket, - socket->io_handle.data.fd, - (int)options->keepalive, - (int)options->keep_alive_timeout_sec, - (int)options->keep_alive_interval_sec, - (int)options->keep_alive_max_failed_probes); - - socket->options = *options; - - int option_value = 1; - if (AWS_UNLIKELY( - setsockopt(socket->io_handle.data.fd, SOL_SOCKET, NO_SIGNAL, &option_value, sizeof(option_value)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for NO_SIGNAL failed with errno %d. If you are having SIGPIPE signals thrown, " - "you may" - " want to install a signal trap in your application layer.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - - int reuse = 1; - if (AWS_UNLIKELY(setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for SO_REUSEADDR failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - - if (options->type == AWS_SOCKET_STREAM && options->domain != AWS_SOCKET_LOCAL) { - if (socket->options.keepalive) { - int keep_alive = 1; - if (AWS_UNLIKELY( - setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(int)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for enabling SO_KEEPALIVE failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - } - - if (socket->options.keep_alive_interval_sec && socket->options.keep_alive_timeout_sec) { - int ival_in_secs = socket->options.keep_alive_interval_sec; - if (AWS_UNLIKELY(setsockopt( - socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPIDLE, &ival_in_secs, sizeof(ival_in_secs)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for enabling TCP_KEEPIDLE for TCP failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - - ival_in_secs = socket->options.keep_alive_timeout_sec; - if (AWS_UNLIKELY(setsockopt( - socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPINTVL, &ival_in_secs, sizeof(ival_in_secs)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for enabling TCP_KEEPINTVL for TCP failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - } - - if (socket->options.keep_alive_max_failed_probes) { - int max_probes = socket->options.keep_alive_max_failed_probes; - if (AWS_UNLIKELY( - setsockopt(socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPCNT, &max_probes, sizeof(max_probes)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for enabling TCP_KEEPCNT for TCP failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - } - } - - return AWS_OP_SUCCESS; -} - -struct write_request { - struct aws_byte_cursor cursor_cpy; - aws_socket_on_write_completed_fn *written_fn; - void *write_user_data; - struct aws_linked_list_node node; - size_t original_buffer_len; -}; - -struct posix_socket_close_args { - struct aws_mutex mutex; - struct aws_condition_variable condition_variable; - struct aws_socket *socket; - bool invoked; - int ret_code; -}; - -static bool s_close_predicate(void *arg) { - struct posix_socket_close_args *close_args = arg; - return close_args->invoked; -} - -static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) { - (void)task; - (void)status; - - struct posix_socket_close_args *close_args = arg; - aws_mutex_lock(&close_args->mutex); - close_args->ret_code = AWS_OP_SUCCESS; - - if (aws_socket_close(close_args->socket)) { - close_args->ret_code = aws_last_error(); - } - - close_args->invoked = true; - aws_condition_variable_notify_one(&close_args->condition_variable); - aws_mutex_unlock(&close_args->mutex); -} - -int aws_socket_close(struct aws_socket *socket) { - struct posix_socket *socket_impl = socket->impl; - AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: closing", (void *)socket, socket->io_handle.data.fd); - if (socket->event_loop) { - /* don't freak out on me, this almost never happens, and never occurs inside a channel - * it only gets hit from a listening socket shutting down or from a unit test. */ - if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: closing from a different thread than " - "the socket is running from. Blocking until it closes down.", - (void *)socket, - socket->io_handle.data.fd); - /* the only time we allow this kind of thing is when you're a listener.*/ - if (socket->state != LISTENING) { - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - struct posix_socket_close_args args = { - .mutex = AWS_MUTEX_INIT, - .condition_variable = AWS_CONDITION_VARIABLE_INIT, - .socket = socket, - .ret_code = AWS_OP_SUCCESS, - .invoked = false, - }; - - struct aws_task close_task = { - .fn = s_close_task, - .arg = &args, - }; - - aws_mutex_lock(&args.mutex); - aws_event_loop_schedule_task_now(socket->event_loop, &close_task); - aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_close_predicate, &args); - aws_mutex_unlock(&args.mutex); - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, "id=%p fd=%d: close task completed.", (void *)socket, socket->io_handle.data.fd); - if (args.ret_code) { - return aws_raise_error(args.ret_code); - } - - return AWS_OP_SUCCESS; - } - - if (socket_impl->currently_subscribed) { - if (socket->state & LISTENING) { - aws_socket_stop_accept(socket); - } else { - int err_code = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); - - if (err_code) { - return AWS_OP_ERR; - } - } - socket_impl->currently_subscribed = false; - socket->event_loop = NULL; - } - } - - if (socket_impl->close_happened) { - *socket_impl->close_happened = true; - } - - if (socket_impl->connect_args) { - socket_impl->connect_args->socket = NULL; - socket_impl->connect_args = NULL; - } - - if (aws_socket_is_open(socket)) { - close(socket->io_handle.data.fd); - socket->io_handle.data.fd = -1; - socket->state = CLOSED; - - /* after close, just go ahead and clear out the pending writes queue - * and tell the user they were cancelled. */ - while (!aws_linked_list_empty(&socket_impl->write_queue)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); - struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); - - write_request->written_fn( - socket, AWS_IO_SOCKET_CLOSED, write_request->original_buffer_len, write_request->write_user_data); - aws_mem_release(socket->allocator, write_request); - } - } - - return AWS_OP_SUCCESS; -} - -int aws_socket_shutdown_dir(struct aws_socket *socket, enum aws_channel_direction dir) { - int how = dir == AWS_CHANNEL_DIR_READ ? 0 : 1; - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p fd=%d: shutting down in direction %d", (void *)socket, socket->io_handle.data.fd, dir); - if (shutdown(socket->io_handle.data.fd, how)) { - int aws_error = s_determine_socket_error(errno); - return aws_raise_error(aws_error); - } - - if (dir == AWS_CHANNEL_DIR_READ) { - socket->state &= ~CONNECTED_READ; - } else { - socket->state &= ~CONNECTED_WRITE; - } - - return AWS_OP_SUCCESS; -} - -/* this gets called in two scenarios. - * 1st scenario, someone called aws_socket_write() and we want to try writing now, so an error can be returned - * immediately if something bad has happened to the socket. In this case, `parent_request` is set. - * 2nd scenario, the event loop notified us that the socket went writable. In this case `parent_request` is NULL */ -static int s_process_write_requests(struct aws_socket *socket, struct write_request *parent_request) { - struct posix_socket *socket_impl = socket->impl; - struct aws_allocator *allocator = socket->allocator; - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: processing write requests.", (void *)socket, socket->io_handle.data.fd); - - /* there's a potential deadlock where we notify the user that we wrote some data, the user - * says, "cool, now I can write more and then immediately calls aws_socket_write(). We need to make sure - * that we don't allow reentrancy in that case. */ - socket_impl->write_in_progress = true; - - if (parent_request) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: processing write requests, called from aws_socket_write", - (void *)socket, - socket->io_handle.data.fd); - socket_impl->currently_in_event = true; - } else { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: processing write requests, invoked by the event-loop", - (void *)socket, - socket->io_handle.data.fd); - } - - bool purge = false; - int aws_error = AWS_OP_SUCCESS; - bool parent_request_failed = false; - - /* if a close call happens in the middle, this queue will have been cleaned out from under us. */ - while (!aws_linked_list_empty(&socket_impl->write_queue)) { - struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue); - struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: dequeued write request of size %llu, remaining to write %llu", - (void *)socket, - socket->io_handle.data.fd, - (unsigned long long)write_request->original_buffer_len, - (unsigned long long)write_request->cursor_cpy.len); - - ssize_t written = - send(socket->io_handle.data.fd, write_request->cursor_cpy.ptr, write_request->cursor_cpy.len, NO_SIGNAL); - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: send written size %d", - (void *)socket, - socket->io_handle.data.fd, - (int)written); - - if (written < 0) { - int error = errno; - if (error == EAGAIN) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd); - break; - } - - if (error == EPIPE) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: already closed before write", - (void *)socket, - socket->io_handle.data.fd); - aws_error = AWS_IO_SOCKET_CLOSED; - aws_raise_error(aws_error); - purge = true; - break; - } - - purge = true; - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: write error with error code %d", - (void *)socket, - socket->io_handle.data.fd, - error); - aws_error = s_determine_socket_error(error); - aws_raise_error(aws_error); - break; - } - - size_t remaining_to_write = write_request->cursor_cpy.len; - - aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written); - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: remaining write request to write %llu", - (void *)socket, - socket->io_handle.data.fd, - (unsigned long long)write_request->cursor_cpy.len); - - if ((size_t)written == remaining_to_write) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: write request completed", (void *)socket, socket->io_handle.data.fd); - - aws_linked_list_remove(node); - write_request->written_fn( - socket, AWS_OP_SUCCESS, write_request->original_buffer_len, write_request->write_user_data); - aws_mem_release(allocator, write_request); - } - } - - if (purge) { - while (!aws_linked_list_empty(&socket_impl->write_queue)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); - struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); - - /* If this fn was invoked directly from aws_socket_write(), don't invoke the error callback - * as the user will be able to rely on the return value from aws_socket_write() */ - if (write_request == parent_request) { - parent_request_failed = true; - } else { - write_request->written_fn(socket, aws_error, 0, write_request->write_user_data); - } - - aws_mem_release(socket->allocator, write_request); - } - } - - socket_impl->write_in_progress = false; - - if (parent_request) { - socket_impl->currently_in_event = false; - } - - if (socket_impl->clean_yourself_up) { - aws_mem_release(allocator, socket_impl); - } - - /* Only report error if aws_socket_write() invoked this function and its write_request failed */ - if (!parent_request_failed) { - return AWS_OP_SUCCESS; - } - - aws_raise_error(aws_error); - return AWS_OP_ERR; -} - -static void s_on_socket_io_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - (void)event_loop; - (void)handle; - /* this is to handle a race condition when an error kicks off a cleanup, or the user decides - * to close the socket based on something they read (SSL validation failed for example). - * if clean_up happens when currently_in_event is true, socket_impl is kept dangling but currently - * subscribed is set to false. */ - struct aws_socket *socket = user_data; - struct posix_socket *socket_impl = socket->impl; - struct aws_allocator *allocator = socket->allocator; - - socket_impl->currently_in_event = true; - - if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) { - aws_raise_error(AWS_IO_SOCKET_CLOSED); - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd); - if (socket->readable_fn) { - socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data); - } - goto end_check; - } - - if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_ERROR) { - int aws_error = aws_socket_get_error(socket); - aws_raise_error(aws_error); - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: error event occurred", (void *)socket, socket->io_handle.data.fd); - if (socket->readable_fn) { - socket->readable_fn(socket, aws_error, socket->readable_user_data); - } - goto end_check; - } - - if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) { - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd); - if (socket->readable_fn) { - socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data); - } - } - /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not - * have been cleaned up, so this next branch is safe. */ - if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) { - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd); - s_process_write_requests(socket, NULL); - } - -end_check: - socket_impl->currently_in_event = false; - - if (socket_impl->clean_yourself_up) { - aws_mem_release(allocator, socket_impl); - } -} - -int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct aws_event_loop *event_loop) { - if (!socket->event_loop) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: assigning to event loop %p", - (void *)socket, - socket->io_handle.data.fd, - (void *)event_loop); - socket->event_loop = event_loop; - struct posix_socket *socket_impl = socket->impl; - socket_impl->currently_subscribed = true; - if (aws_event_loop_subscribe_to_io_events( - event_loop, - &socket->io_handle, - AWS_IO_EVENT_TYPE_WRITABLE | AWS_IO_EVENT_TYPE_READABLE, - s_on_socket_io_event, - socket)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: assigning to event loop %p failed with error %d", - (void *)socket, - socket->io_handle.data.fd, - (void *)event_loop, - aws_last_error()); - socket_impl->currently_subscribed = false; - socket->event_loop = NULL; - return AWS_OP_ERR; - } - - return AWS_OP_SUCCESS; - } - - return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); -} - -struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket) { - return socket->event_loop; -} - -int aws_socket_subscribe_to_readable_events( - struct aws_socket *socket, - aws_socket_on_readable_fn *on_readable, - void *user_data) { - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, " id=%p fd=%d: subscribing to readable events", (void *)socket, socket->io_handle.data.fd); - if (!(socket->state & CONNECTED_READ)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: can't subscribe to readable events since the socket is not connected", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); - } - - if (socket->readable_fn) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: can't subscribe to readable events since it is already subscribed", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); - } - - AWS_ASSERT(on_readable); - socket->readable_user_data = user_data; - socket->readable_fn = on_readable; - - return AWS_OP_SUCCESS; -} - -int aws_socket_read(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read) { - AWS_ASSERT(amount_read); - - if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: cannot read from a different thread than event loop %p", - (void *)socket, - socket->io_handle.data.fd, - (void *)socket->event_loop); - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (!(socket->state & CONNECTED_READ)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: cannot read because it is not connected", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); - } - - ssize_t read_val = read(socket->io_handle.data.fd, buffer->buffer + buffer->len, buffer->capacity - buffer->len); - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: read of %d", (void *)socket, socket->io_handle.data.fd, (int)read_val); - - if (read_val > 0) { - *amount_read = (size_t)read_val; - buffer->len += *amount_read; - return AWS_OP_SUCCESS; - } - - /* read_val of 0 means EOF which we'll treat as AWS_IO_SOCKET_CLOSED */ - if (read_val == 0) { - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, "id=%p fd=%d: zero read, socket is closed", (void *)socket, socket->io_handle.data.fd); - *amount_read = 0; - - if (buffer->capacity - buffer->len > 0) { - return aws_raise_error(AWS_IO_SOCKET_CLOSED); - } - - return AWS_OP_SUCCESS; - } - - int error = errno; -#if defined(EWOULDBLOCK) - if (error == EAGAIN || error == EWOULDBLOCK) { -#else - if (error == EAGAIN) { -#endif - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: read would block", (void *)socket, socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); - } - - if (error == EPIPE) { - AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket is closed.", (void *)socket, socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_CLOSED); - } - - if (error == ETIMEDOUT) { - AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket timed out.", (void *)socket, socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_TIMEOUT); - } - - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: read failed with error: %s", - (void *)socket, - socket->io_handle.data.fd, - strerror(error)); - return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); -} - -int aws_socket_write( - struct aws_socket *socket, - const struct aws_byte_cursor *cursor, - aws_socket_on_write_completed_fn *written_fn, - void *user_data) { - if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (!(socket->state & CONNECTED_WRITE)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: cannot write to because it is not connected", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); - } - - AWS_ASSERT(written_fn); - struct posix_socket *socket_impl = socket->impl; - struct write_request *write_request = aws_mem_calloc(socket->allocator, 1, sizeof(struct write_request)); - - if (!write_request) { - return AWS_OP_ERR; - } - - write_request->original_buffer_len = cursor->len; - write_request->written_fn = written_fn; - write_request->write_user_data = user_data; - write_request->cursor_cpy = *cursor; - aws_linked_list_push_back(&socket_impl->write_queue, &write_request->node); - - /* avoid reentrancy when a user calls write after receiving their completion callback. */ - if (!socket_impl->write_in_progress) { - return s_process_write_requests(socket, write_request); - } - - return AWS_OP_SUCCESS; -} - -int aws_socket_get_error(struct aws_socket *socket) { - int connect_result; - socklen_t result_length = sizeof(connect_result); - - if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { - return AWS_OP_ERR; - } - - if (connect_result) { - return s_determine_socket_error(connect_result); - } - - return AWS_OP_SUCCESS; -} - -bool aws_socket_is_open(struct aws_socket *socket) { - return socket->io_handle.data.fd >= 0; -} +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/io/socket.h> + +#include <aws/common/clock.h> +#include <aws/common/condition_variable.h> +#include <aws/common/mutex.h> +#include <aws/common/string.h> + +#include <aws/io/event_loop.h> +#include <aws/io/logging.h> + +#include <arpa/inet.h> +#include <aws/io/io.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/tcp.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#if defined(__MACH__) +# define NO_SIGNAL SO_NOSIGPIPE +# define TCP_KEEPIDLE TCP_KEEPALIVE +#else +# define NO_SIGNAL MSG_NOSIGNAL +#endif + +/* This isn't defined on ancient linux distros (breaking the builds). + * However, if this is a prebuild, we purposely build on an ancient system, but + * we want the kernel calls to still be the same as a modern build since that's likely the target of the application + * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag + * gets passed as long as it does. + */ +#ifndef O_CLOEXEC +# define O_CLOEXEC 02000000 +#endif + +#ifdef USE_VSOCK +# if defined(__linux__) && defined(AF_VSOCK) +# include <linux/vm_sockets.h> +# else +# error "USE_VSOCK not supported on current platform" +# endif +#endif + +/* other than CONNECTED_READ | CONNECTED_WRITE + * a socket is only in one of these states at a time. */ +enum socket_state { + INIT = 0x01, + CONNECTING = 0x02, + CONNECTED_READ = 0x04, + CONNECTED_WRITE = 0x08, + BOUND = 0x10, + LISTENING = 0x20, + TIMEDOUT = 0x40, + ERROR = 0x80, + CLOSED, +}; + +static int s_convert_domain(enum aws_socket_domain domain) { + switch (domain) { + case AWS_SOCKET_IPV4: + return AF_INET; + case AWS_SOCKET_IPV6: + return AF_INET6; + case AWS_SOCKET_LOCAL: + return AF_UNIX; +#ifdef USE_VSOCK + case AWS_SOCKET_VSOCK: + return AF_VSOCK; +#endif + default: + AWS_ASSERT(0); + return AF_INET; + } +} + +static int s_convert_type(enum aws_socket_type type) { + switch (type) { + case AWS_SOCKET_STREAM: + return SOCK_STREAM; + case AWS_SOCKET_DGRAM: + return SOCK_DGRAM; + default: + AWS_ASSERT(0); + return SOCK_STREAM; + } +} + +static int s_determine_socket_error(int error) { + switch (error) { + case ECONNREFUSED: + return AWS_IO_SOCKET_CONNECTION_REFUSED; + case ETIMEDOUT: + return AWS_IO_SOCKET_TIMEOUT; + case EHOSTUNREACH: + case ENETUNREACH: + return AWS_IO_SOCKET_NO_ROUTE_TO_HOST; + case EADDRNOTAVAIL: + return AWS_IO_SOCKET_INVALID_ADDRESS; + case ENETDOWN: + return AWS_IO_SOCKET_NETWORK_DOWN; + case ECONNABORTED: + return AWS_IO_SOCKET_CONNECT_ABORTED; + case EADDRINUSE: + return AWS_IO_SOCKET_ADDRESS_IN_USE; + case ENOBUFS: + case ENOMEM: + return AWS_ERROR_OOM; + case EAGAIN: + return AWS_IO_READ_WOULD_BLOCK; + case EMFILE: + case ENFILE: + return AWS_ERROR_MAX_FDS_EXCEEDED; + case ENOENT: + case EINVAL: + return AWS_ERROR_FILE_INVALID_PATH; + case EAFNOSUPPORT: + return AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY; + case EACCES: + return AWS_ERROR_NO_PERMISSION; + default: + return AWS_IO_SOCKET_NOT_CONNECTED; + } +} + +static int s_create_socket(struct aws_socket *sock, const struct aws_socket_options *options) { + + int fd = socket(s_convert_domain(options->domain), s_convert_type(options->type), 0); + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: initializing with domain %d and type %d", + (void *)sock, + fd, + options->domain, + options->type); + if (fd != -1) { + int flags = fcntl(fd, F_GETFL, 0); + flags |= O_NONBLOCK | O_CLOEXEC; + int success = fcntl(fd, F_SETFL, flags); + (void)success; + sock->io_handle.data.fd = fd; + sock->io_handle.additional_data = NULL; + return aws_socket_set_options(sock, options); + } + + int aws_error = s_determine_socket_error(errno); + return aws_raise_error(aws_error); +} + +struct posix_socket_connect_args { + struct aws_task task; + struct aws_allocator *allocator; + struct aws_socket *socket; +}; + +struct posix_socket { + struct aws_linked_list write_queue; + struct posix_socket_connect_args *connect_args; + bool write_in_progress; + bool currently_subscribed; + bool continue_accept; + bool currently_in_event; + bool clean_yourself_up; + bool *close_happened; +}; + +static int s_socket_init( + struct aws_socket *socket, + struct aws_allocator *alloc, + const struct aws_socket_options *options, + int existing_socket_fd) { + AWS_ASSERT(options); + AWS_ZERO_STRUCT(*socket); + + struct posix_socket *posix_socket = aws_mem_calloc(alloc, 1, sizeof(struct posix_socket)); + if (!posix_socket) { + socket->impl = NULL; + return AWS_OP_ERR; + } + + socket->allocator = alloc; + socket->io_handle.data.fd = -1; + socket->state = INIT; + socket->options = *options; + + if (existing_socket_fd < 0) { + int err = s_create_socket(socket, options); + if (err) { + aws_mem_release(alloc, posix_socket); + socket->impl = NULL; + return AWS_OP_ERR; + } + } else { + socket->io_handle = (struct aws_io_handle){ + .data = {.fd = existing_socket_fd}, + .additional_data = NULL, + }; + aws_socket_set_options(socket, options); + } + + aws_linked_list_init(&posix_socket->write_queue); + posix_socket->write_in_progress = false; + posix_socket->currently_subscribed = false; + posix_socket->continue_accept = false; + posix_socket->currently_in_event = false; + posix_socket->clean_yourself_up = false; + posix_socket->connect_args = NULL; + posix_socket->close_happened = NULL; + socket->impl = posix_socket; + return AWS_OP_SUCCESS; +} + +int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) { + AWS_ASSERT(options); + return s_socket_init(socket, alloc, options, -1); +} + +void aws_socket_clean_up(struct aws_socket *socket) { + if (!socket->impl) { + /* protect from double clean */ + return; + } + if (aws_socket_is_open(socket)) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: is still open, closing...", (void *)socket, socket->io_handle.data.fd); + aws_socket_close(socket); + } + struct posix_socket *socket_impl = socket->impl; + + if (!socket_impl->currently_in_event) { + aws_mem_release(socket->allocator, socket->impl); + } else { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: is still pending io letting it dangle and cleaning up later.", + (void *)socket, + socket->io_handle.data.fd); + socket_impl->clean_yourself_up = true; + } + + AWS_ZERO_STRUCT(*socket); + socket->io_handle.data.fd = -1; +} + +static void s_on_connection_error(struct aws_socket *socket, int error); + +static int s_on_connection_success(struct aws_socket *socket) { + + struct aws_event_loop *event_loop = socket->event_loop; + struct posix_socket *socket_impl = socket->impl; + + if (socket_impl->currently_subscribed) { + aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); + socket_impl->currently_subscribed = false; + } + + socket->event_loop = NULL; + + int connect_result; + socklen_t result_length = sizeof(connect_result); + + if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to determine connection error %d", + (void *)socket, + socket->io_handle.data.fd, + errno); + int aws_error = s_determine_socket_error(errno); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + return AWS_OP_ERR; + } + + if (connect_result) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connection error %d", + (void *)socket, + socket->io_handle.data.fd, + connect_result); + int aws_error = s_determine_socket_error(connect_result); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + return AWS_OP_ERR; + } + + AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd); + + struct sockaddr_storage address; + AWS_ZERO_STRUCT(address); + socklen_t address_size = sizeof(address); + if (!getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size)) { + uint16_t port = 0; + + if (address.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&address; + port = ntohs(s->sin_port); + /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal + * once we add logging, we can log this if it fails. */ + if (inet_ntop( + AF_INET, &s->sin_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: local endpoint %s:%d", + (void *)socket, + socket->io_handle.data.fd, + socket->local_endpoint.address, + port); + } else { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: determining local endpoint failed", + (void *)socket, + socket->io_handle.data.fd); + } + } else if (address.ss_family == AF_INET6) { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address; + port = ntohs(s->sin6_port); + /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal + * once we add logging, we can log this if it fails. */ + if (inet_ntop( + AF_INET6, &s->sin6_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd %d: local endpoint %s:%d", + (void *)socket, + socket->io_handle.data.fd, + socket->local_endpoint.address, + port); + } else { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: determining local endpoint failed", + (void *)socket, + socket->io_handle.data.fd); + } + } + + socket->local_endpoint.port = port; + } else { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: getsockname() failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + errno); + int aws_error = s_determine_socket_error(errno); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + return AWS_OP_ERR; + } + + socket->state = CONNECTED_WRITE | CONNECTED_READ; + + if (aws_socket_assign_to_event_loop(socket, event_loop)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: assignment to event loop %p failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop, + aws_last_error()); + s_on_connection_error(socket, aws_last_error()); + return AWS_OP_ERR; + } + + socket->connection_result_fn(socket, AWS_ERROR_SUCCESS, socket->connect_accept_user_data); + + return AWS_OP_SUCCESS; +} + +static void s_on_connection_error(struct aws_socket *socket, int error) { + socket->state = ERROR; + AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection failure", (void *)socket, socket->io_handle.data.fd); + if (socket->connection_result_fn) { + socket->connection_result_fn(socket, error, socket->connect_accept_user_data); + } else if (socket->accept_result_fn) { + socket->accept_result_fn(socket, error, NULL, socket->connect_accept_user_data); + } +} + +/* the next two callbacks compete based on which one runs first. if s_socket_connect_event + * comes back first, then we set socket_args->socket = NULL and continue on with the connection. + * if s_handle_socket_timeout() runs first, is sees socket_args->socket is NULL and just cleans up its memory. + * s_handle_socket_timeout() will always run so the memory for socket_connect_args is always cleaned up there. */ +static void s_socket_connect_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + (void)handle; + + struct posix_socket_connect_args *socket_args = (struct posix_socket_connect_args *)user_data; + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "fd=%d: connection activity handler triggered ", handle->data.fd); + + if (socket_args->socket) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: has not timed out yet proceeding with connection.", + (void *)socket_args->socket, + handle->data.fd); + + struct posix_socket *socket_impl = socket_args->socket->impl; + if (!(events & AWS_IO_EVENT_TYPE_ERROR || events & AWS_IO_EVENT_TYPE_CLOSED) && + (events & AWS_IO_EVENT_TYPE_READABLE || events & AWS_IO_EVENT_TYPE_WRITABLE)) { + struct aws_socket *socket = socket_args->socket; + socket_args->socket = NULL; + socket_impl->connect_args = NULL; + s_on_connection_success(socket); + return; + } + + int aws_error = aws_socket_get_error(socket_args->socket); + /* we'll get another notification. */ + if (aws_error == AWS_IO_READ_WOULD_BLOCK) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: spurious event, waiting for another notification.", + (void *)socket_args->socket, + handle->data.fd); + return; + } + + struct aws_socket *socket = socket_args->socket; + socket_args->socket = NULL; + socket_impl->connect_args = NULL; + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + } +} + +static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) { + (void)task; + (void)status; + + struct posix_socket_connect_args *socket_args = args; + + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task); + /* successful connection will have nulled out connect_args->socket */ + if (socket_args->socket) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: timed out, shutting down.", + (void *)socket_args->socket, + socket_args->socket->io_handle.data.fd); + + socket_args->socket->state = TIMEDOUT; + int error_code = AWS_IO_SOCKET_TIMEOUT; + + if (status == AWS_TASK_STATUS_RUN_READY) { + aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle); + } else { + error_code = AWS_IO_EVENT_LOOP_SHUTDOWN; + aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle); + } + socket_args->socket->event_loop = NULL; + struct posix_socket *socket_impl = socket_args->socket->impl; + socket_impl->currently_subscribed = false; + aws_raise_error(error_code); + struct aws_socket *socket = socket_args->socket; + /*socket close sets socket_args->socket to NULL and + * socket_impl->connect_args to NULL. */ + aws_socket_close(socket); + s_on_connection_error(socket, error_code); + } + + aws_mem_release(socket_args->allocator, socket_args); +} + +/* this is used simply for moving a connect_success callback when the connect finished immediately + * (like for unix domain sockets) into the event loop's thread. Also note, in that case there was no + * timeout task scheduled, so in this case the socket_args are cleaned up. */ +static void s_run_connect_success(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + struct posix_socket_connect_args *socket_args = arg; + + if (socket_args->socket) { + struct posix_socket *socket_impl = socket_args->socket->impl; + if (status == AWS_TASK_STATUS_RUN_READY) { + s_on_connection_success(socket_args->socket); + } else { + aws_raise_error(AWS_IO_SOCKET_CONNECT_ABORTED); + socket_args->socket->event_loop = NULL; + s_on_connection_error(socket_args->socket, AWS_IO_SOCKET_CONNECT_ABORTED); + } + socket_impl->connect_args = NULL; + } + + aws_mem_release(socket_args->allocator, socket_args); +} + +static inline int s_convert_pton_error(int pton_code) { + if (pton_code == 0) { + return AWS_IO_SOCKET_INVALID_ADDRESS; + } + + return s_determine_socket_error(errno); +} + +struct socket_address { + union sock_addr_types { + struct sockaddr_in addr_in; + struct sockaddr_in6 addr_in6; + struct sockaddr_un un_addr; +#ifdef USE_VSOCK + struct sockaddr_vm vm_addr; +#endif + } sock_addr_types; +}; + +#ifdef USE_VSOCK +/** Convert a string to a VSOCK CID. Respects the calling convetion of inet_pton: + * 0 on error, 1 on success. */ +static int parse_cid(const char *cid_str, unsigned int *value) { + if (cid_str == NULL || value == NULL) { + errno = EINVAL; + return 0; + } + /* strtoll returns 0 as both error and correct value */ + errno = 0; + /* unsigned long long to handle edge cases in convention explicitly */ + long long cid = strtoll(cid_str, NULL, 10); + if (errno != 0) { + return 0; + } + + /* -1U means any, so it's a valid value, but it needs to be converted to + * unsigned int. */ + if (cid == -1) { + *value = VMADDR_CID_ANY; + return 1; + } + + if (cid < 0 || cid > UINT_MAX) { + errno = ERANGE; + return 0; + } + + /* cast is safe here, edge cases already checked */ + *value = (unsigned int)cid; + return 1; +} +#endif + +int aws_socket_connect( + struct aws_socket *socket, + const struct aws_socket_endpoint *remote_endpoint, + struct aws_event_loop *event_loop, + aws_socket_on_connection_result_fn *on_connection_result, + void *user_data) { + AWS_ASSERT(event_loop); + AWS_ASSERT(!socket->event_loop); + + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd); + + if (socket->event_loop) { + return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); + } + + if (socket->options.type != AWS_SOCKET_DGRAM) { + AWS_ASSERT(on_connection_result); + if (socket->state != INIT) { + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + } else { /* UDP socket */ + /* UDP sockets jump to CONNECT_READ if bind is called first */ + if (socket->state != CONNECTED_READ && socket->state != INIT) { + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + } + + size_t address_strlen; + if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { + return AWS_OP_ERR; + } + + struct socket_address address; + AWS_ZERO_STRUCT(address); + socklen_t sock_size = 0; + int pton_err = 1; + if (socket->options.domain == AWS_SOCKET_IPV4) { + pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); + address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port); + address.sock_addr_types.addr_in.sin_family = AF_INET; + sock_size = sizeof(address.sock_addr_types.addr_in); + } else if (socket->options.domain == AWS_SOCKET_IPV6) { + pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); + address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port); + address.sock_addr_types.addr_in6.sin6_family = AF_INET6; + sock_size = sizeof(address.sock_addr_types.addr_in6); + } else if (socket->options.domain == AWS_SOCKET_LOCAL) { + address.sock_addr_types.un_addr.sun_family = AF_UNIX; + strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN); + sock_size = sizeof(address.sock_addr_types.un_addr); +#ifdef USE_VSOCK + } else if (socket->options.domain == AWS_SOCKET_VSOCK) { + pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); + address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; + address.sock_addr_types.vm_addr.svm_port = (unsigned int)remote_endpoint->port; + sock_size = sizeof(address.sock_addr_types.vm_addr); +#endif + } else { + AWS_ASSERT(0); + return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); + } + + if (pton_err != 1) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to parse address %s:%d.", + (void *)socket, + socket->io_handle.data.fd, + remote_endpoint->address, + (int)remote_endpoint->port); + return aws_raise_error(s_convert_pton_error(pton_err)); + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connecting to endpoint %s:%d.", + (void *)socket, + socket->io_handle.data.fd, + remote_endpoint->address, + (int)remote_endpoint->port); + + socket->state = CONNECTING; + socket->remote_endpoint = *remote_endpoint; + socket->connect_accept_user_data = user_data; + socket->connection_result_fn = on_connection_result; + + struct posix_socket *socket_impl = socket->impl; + + socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args)); + if (!socket_impl->connect_args) { + return AWS_OP_ERR; + } + + socket_impl->connect_args->socket = socket; + socket_impl->connect_args->allocator = socket->allocator; + + socket_impl->connect_args->task.fn = s_handle_socket_timeout; + socket_impl->connect_args->task.arg = socket_impl->connect_args; + + int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); + socket->event_loop = event_loop; + + if (!error_code) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connected immediately, not scheduling timeout.", + (void *)socket, + socket->io_handle.data.fd); + socket_impl->connect_args->task.fn = s_run_connect_success; + /* the subscription for IO will happen once we setup the connection in the task. Since we already + * know the connection succeeded, we don't need to register for events yet. */ + aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task); + } + + if (error_code) { + error_code = errno; + if (error_code == EINPROGRESS || error_code == EALREADY) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.", + (void *)socket, + socket->io_handle.data.fd); + /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately + * and null out the connect args */ + struct aws_task *timeout_task = &socket_impl->connect_args->task; + + socket_impl->currently_subscribed = true; + /* This event is for when the connection finishes. (the fd will flip writable). */ + if (aws_event_loop_subscribe_to_io_events( + event_loop, + &socket->io_handle, + AWS_IO_EVENT_TYPE_WRITABLE, + s_socket_connect_event, + socket_impl->connect_args)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to register with event-loop %p.", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop); + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + goto err_clean_up; + } + + /* schedule a task to run at the connect timeout interval, if this task runs before the connect + * happens, we consider that a timeout. */ + uint64_t timeout = 0; + aws_event_loop_current_clock_time(event_loop, &timeout); + timeout += aws_timestamp_convert( + socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: scheduling timeout task for %llu.", + (void *)socket, + socket->io_handle.data.fd, + (unsigned long long)timeout); + aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout); + } else { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connect failed with error code %d.", + (void *)socket, + socket->io_handle.data.fd, + error_code); + int aws_error = s_determine_socket_error(error_code); + aws_raise_error(aws_error); + socket->event_loop = NULL; + socket_impl->currently_subscribed = false; + goto err_clean_up; + } + } + return AWS_OP_SUCCESS; + +err_clean_up: + aws_mem_release(socket->allocator, socket_impl->connect_args); + socket_impl->connect_args = NULL; + return AWS_OP_ERR; +} + +int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint) { + if (socket->state != INIT) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: invalid state for bind operation.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + size_t address_strlen; + if (aws_secure_strlen(local_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { + return AWS_OP_ERR; + } + + int error_code = -1; + + socket->local_endpoint = *local_endpoint; + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: binding to %s:%d.", + (void *)socket, + socket->io_handle.data.fd, + local_endpoint->address, + (int)local_endpoint->port); + + struct socket_address address; + AWS_ZERO_STRUCT(address); + socklen_t sock_size = 0; + int pton_err = 1; + if (socket->options.domain == AWS_SOCKET_IPV4) { + pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); + address.sock_addr_types.addr_in.sin_port = htons(local_endpoint->port); + address.sock_addr_types.addr_in.sin_family = AF_INET; + sock_size = sizeof(address.sock_addr_types.addr_in); + } else if (socket->options.domain == AWS_SOCKET_IPV6) { + pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); + address.sock_addr_types.addr_in6.sin6_port = htons(local_endpoint->port); + address.sock_addr_types.addr_in6.sin6_family = AF_INET6; + sock_size = sizeof(address.sock_addr_types.addr_in6); + } else if (socket->options.domain == AWS_SOCKET_LOCAL) { + address.sock_addr_types.un_addr.sun_family = AF_UNIX; + strncpy(address.sock_addr_types.un_addr.sun_path, local_endpoint->address, AWS_ADDRESS_MAX_LEN); + sock_size = sizeof(address.sock_addr_types.un_addr); +#ifdef USE_VSOCK + } else if (socket->options.domain == AWS_SOCKET_VSOCK) { + pton_err = parse_cid(local_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); + address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; + address.sock_addr_types.vm_addr.svm_port = (unsigned int)local_endpoint->port; + sock_size = sizeof(address.sock_addr_types.vm_addr); +#endif + } else { + AWS_ASSERT(0); + return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); + } + + if (pton_err != 1) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to parse address %s:%d.", + (void *)socket, + socket->io_handle.data.fd, + local_endpoint->address, + (int)local_endpoint->port); + return aws_raise_error(s_convert_pton_error(pton_err)); + } + + error_code = bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); + + if (!error_code) { + if (socket->options.type == AWS_SOCKET_STREAM) { + socket->state = BOUND; + } else { + /* e.g. UDP is now readable */ + socket->state = CONNECTED_READ; + } + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully bound", (void *)socket, socket->io_handle.data.fd); + + return AWS_OP_SUCCESS; + } + + socket->state = ERROR; + error_code = errno; + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: bind failed with error code %d", + (void *)socket, + socket->io_handle.data.fd, + error_code); + + int aws_error = s_determine_socket_error(error_code); + return aws_raise_error(aws_error); +} + +int aws_socket_listen(struct aws_socket *socket, int backlog_size) { + if (socket->state != BOUND) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: invalid state for listen operation. You must call bind first.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + int error_code = listen(socket->io_handle.data.fd, backlog_size); + + if (!error_code) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully listening", (void *)socket, socket->io_handle.data.fd); + socket->state = LISTENING; + return AWS_OP_SUCCESS; + } + + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: listen failed with error code %d", + (void *)socket, + socket->io_handle.data.fd, + error_code); + error_code = errno; + socket->state = ERROR; + + return aws_raise_error(s_determine_socket_error(error_code)); +} + +/* this is called by the event loop handler that was installed in start_accept(). It runs once the FD goes readable, + * accepts as many as it can and then returns control to the event loop. */ +static void s_socket_accept_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + + struct aws_socket *socket = user_data; + struct posix_socket *socket_impl = socket->impl; + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd); + + if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) { + int in_fd = 0; + while (socket_impl->continue_accept && in_fd != -1) { + struct sockaddr_storage in_addr; + socklen_t in_len = sizeof(struct sockaddr_storage); + + in_fd = accept(handle->data.fd, (struct sockaddr *)&in_addr, &in_len); + if (in_fd == -1) { + int error = errno; + + if (error == EAGAIN || error == EWOULDBLOCK) { + break; + } + + int aws_error = aws_socket_get_error(socket); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + break; + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: incoming connection", (void *)socket, socket->io_handle.data.fd); + + struct aws_socket *new_sock = aws_mem_acquire(socket->allocator, sizeof(struct aws_socket)); + + if (!new_sock) { + close(in_fd); + s_on_connection_error(socket, aws_last_error()); + continue; + } + + if (s_socket_init(new_sock, socket->allocator, &socket->options, in_fd)) { + aws_mem_release(socket->allocator, new_sock); + s_on_connection_error(socket, aws_last_error()); + continue; + } + + new_sock->local_endpoint = socket->local_endpoint; + new_sock->state = CONNECTED_READ | CONNECTED_WRITE; + uint16_t port = 0; + + /* get the info on the incoming socket's address */ + if (in_addr.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&in_addr; + port = ntohs(s->sin_port); + /* this came from the kernel, a.) it won't fail. b.) even if it does + * its not fatal. come back and add logging later. */ + if (!inet_ntop( + AF_INET, + &s->sin_addr, + new_sock->remote_endpoint.address, + sizeof(new_sock->remote_endpoint.address))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d:. Failed to determine remote address.", + (void *)socket, + socket->io_handle.data.fd) + } + new_sock->options.domain = AWS_SOCKET_IPV4; + } else if (in_addr.ss_family == AF_INET6) { + /* this came from the kernel, a.) it won't fail. b.) even if it does + * its not fatal. come back and add logging later. */ + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&in_addr; + port = ntohs(s->sin6_port); + if (!inet_ntop( + AF_INET6, + &s->sin6_addr, + new_sock->remote_endpoint.address, + sizeof(new_sock->remote_endpoint.address))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d:. Failed to determine remote address.", + (void *)socket, + socket->io_handle.data.fd) + } + new_sock->options.domain = AWS_SOCKET_IPV6; + } else if (in_addr.ss_family == AF_UNIX) { + new_sock->remote_endpoint = socket->local_endpoint; + new_sock->options.domain = AWS_SOCKET_LOCAL; + } + + new_sock->remote_endpoint.port = port; + + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connected to %s:%d, incoming fd %d", + (void *)socket, + socket->io_handle.data.fd, + new_sock->remote_endpoint.address, + new_sock->remote_endpoint.port, + in_fd); + + int flags = fcntl(in_fd, F_GETFL, 0); + + flags |= O_NONBLOCK | O_CLOEXEC; + fcntl(in_fd, F_SETFL, flags); + + bool close_occurred = false; + socket_impl->close_happened = &close_occurred; + socket->accept_result_fn(socket, AWS_ERROR_SUCCESS, new_sock, socket->connect_accept_user_data); + + if (close_occurred) { + return; + } + + socket_impl->close_happened = NULL; + } + } + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: finished processing incoming connections, " + "waiting on event-loop notification", + (void *)socket, + socket->io_handle.data.fd); +} + +int aws_socket_start_accept( + struct aws_socket *socket, + struct aws_event_loop *accept_loop, + aws_socket_on_accept_result_fn *on_accept_result, + void *user_data) { + AWS_ASSERT(on_accept_result); + AWS_ASSERT(accept_loop); + + if (socket->event_loop) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: is already assigned to event-loop %p.", + (void *)socket, + socket->io_handle.data.fd, + (void *)socket->event_loop); + return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); + } + + if (socket->state != LISTENING) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: invalid state for start_accept operation. You must call listen first.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + socket->accept_result_fn = on_accept_result; + socket->connect_accept_user_data = user_data; + socket->event_loop = accept_loop; + struct posix_socket *socket_impl = socket->impl; + socket_impl->continue_accept = true; + socket_impl->currently_subscribed = true; + + if (aws_event_loop_subscribe_to_io_events( + socket->event_loop, &socket->io_handle, AWS_IO_EVENT_TYPE_READABLE, s_socket_accept_event, socket)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to subscribe to event-loop %p.", + (void *)socket, + socket->io_handle.data.fd, + (void *)socket->event_loop); + socket_impl->continue_accept = false; + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; +} + +struct stop_accept_args { + struct aws_task task; + struct aws_mutex mutex; + struct aws_condition_variable condition_variable; + struct aws_socket *socket; + int ret_code; + bool invoked; +}; + +static bool s_stop_accept_pred(void *arg) { + struct stop_accept_args *stop_accept_args = arg; + return stop_accept_args->invoked; +} + +static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + + struct stop_accept_args *stop_accept_args = arg; + aws_mutex_lock(&stop_accept_args->mutex); + stop_accept_args->ret_code = AWS_OP_SUCCESS; + if (aws_socket_stop_accept(stop_accept_args->socket)) { + stop_accept_args->ret_code = aws_last_error(); + } + stop_accept_args->invoked = true; + aws_condition_variable_notify_one(&stop_accept_args->condition_variable); + aws_mutex_unlock(&stop_accept_args->mutex); +} + +int aws_socket_stop_accept(struct aws_socket *socket) { + if (socket->state != LISTENING) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: is not in a listening state, can't stop_accept.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: stopping accepting new connections", (void *)socket, socket->io_handle.data.fd); + + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + struct stop_accept_args args = {.mutex = AWS_MUTEX_INIT, + .condition_variable = AWS_CONDITION_VARIABLE_INIT, + .invoked = false, + .socket = socket, + .ret_code = AWS_OP_SUCCESS, + .task = {.fn = s_stop_accept_task}}; + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: stopping accepting new connections from a different thread than " + "the socket is running from. Blocking until it shuts down.", + (void *)socket, + socket->io_handle.data.fd); + /* Look.... I know what I'm doing.... trust me, I'm an engineer. + * We wait on the completion before 'args' goes out of scope. + * NOLINTNEXTLINE */ + args.task.arg = &args; + aws_mutex_lock(&args.mutex); + aws_event_loop_schedule_task_now(socket->event_loop, &args.task); + aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_stop_accept_pred, &args); + aws_mutex_unlock(&args.mutex); + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: stop accept task finished running.", + (void *)socket, + socket->io_handle.data.fd); + + if (args.ret_code) { + return aws_raise_error(args.ret_code); + } + return AWS_OP_SUCCESS; + } + + int ret_val = AWS_OP_SUCCESS; + struct posix_socket *socket_impl = socket->impl; + if (socket_impl->currently_subscribed) { + ret_val = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); + socket_impl->currently_subscribed = false; + socket_impl->continue_accept = false; + socket->event_loop = NULL; + } + + return ret_val; +} + +int aws_socket_set_options(struct aws_socket *socket, const struct aws_socket_options *options) { + if (socket->options.domain != options->domain || socket->options.type != options->type) { + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setting socket options to: keep-alive %d, keep idle %d, keep-alive interval %d, keep-alive probe " + "count %d.", + (void *)socket, + socket->io_handle.data.fd, + (int)options->keepalive, + (int)options->keep_alive_timeout_sec, + (int)options->keep_alive_interval_sec, + (int)options->keep_alive_max_failed_probes); + + socket->options = *options; + + int option_value = 1; + if (AWS_UNLIKELY( + setsockopt(socket->io_handle.data.fd, SOL_SOCKET, NO_SIGNAL, &option_value, sizeof(option_value)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for NO_SIGNAL failed with errno %d. If you are having SIGPIPE signals thrown, " + "you may" + " want to install a signal trap in your application layer.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + + int reuse = 1; + if (AWS_UNLIKELY(setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for SO_REUSEADDR failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + + if (options->type == AWS_SOCKET_STREAM && options->domain != AWS_SOCKET_LOCAL) { + if (socket->options.keepalive) { + int keep_alive = 1; + if (AWS_UNLIKELY( + setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(int)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling SO_KEEPALIVE failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + } + + if (socket->options.keep_alive_interval_sec && socket->options.keep_alive_timeout_sec) { + int ival_in_secs = socket->options.keep_alive_interval_sec; + if (AWS_UNLIKELY(setsockopt( + socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPIDLE, &ival_in_secs, sizeof(ival_in_secs)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling TCP_KEEPIDLE for TCP failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + + ival_in_secs = socket->options.keep_alive_timeout_sec; + if (AWS_UNLIKELY(setsockopt( + socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPINTVL, &ival_in_secs, sizeof(ival_in_secs)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling TCP_KEEPINTVL for TCP failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + } + + if (socket->options.keep_alive_max_failed_probes) { + int max_probes = socket->options.keep_alive_max_failed_probes; + if (AWS_UNLIKELY( + setsockopt(socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPCNT, &max_probes, sizeof(max_probes)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling TCP_KEEPCNT for TCP failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + } + } + + return AWS_OP_SUCCESS; +} + +struct write_request { + struct aws_byte_cursor cursor_cpy; + aws_socket_on_write_completed_fn *written_fn; + void *write_user_data; + struct aws_linked_list_node node; + size_t original_buffer_len; +}; + +struct posix_socket_close_args { + struct aws_mutex mutex; + struct aws_condition_variable condition_variable; + struct aws_socket *socket; + bool invoked; + int ret_code; +}; + +static bool s_close_predicate(void *arg) { + struct posix_socket_close_args *close_args = arg; + return close_args->invoked; +} + +static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + + struct posix_socket_close_args *close_args = arg; + aws_mutex_lock(&close_args->mutex); + close_args->ret_code = AWS_OP_SUCCESS; + + if (aws_socket_close(close_args->socket)) { + close_args->ret_code = aws_last_error(); + } + + close_args->invoked = true; + aws_condition_variable_notify_one(&close_args->condition_variable); + aws_mutex_unlock(&close_args->mutex); +} + +int aws_socket_close(struct aws_socket *socket) { + struct posix_socket *socket_impl = socket->impl; + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: closing", (void *)socket, socket->io_handle.data.fd); + if (socket->event_loop) { + /* don't freak out on me, this almost never happens, and never occurs inside a channel + * it only gets hit from a listening socket shutting down or from a unit test. */ + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: closing from a different thread than " + "the socket is running from. Blocking until it closes down.", + (void *)socket, + socket->io_handle.data.fd); + /* the only time we allow this kind of thing is when you're a listener.*/ + if (socket->state != LISTENING) { + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + struct posix_socket_close_args args = { + .mutex = AWS_MUTEX_INIT, + .condition_variable = AWS_CONDITION_VARIABLE_INIT, + .socket = socket, + .ret_code = AWS_OP_SUCCESS, + .invoked = false, + }; + + struct aws_task close_task = { + .fn = s_close_task, + .arg = &args, + }; + + aws_mutex_lock(&args.mutex); + aws_event_loop_schedule_task_now(socket->event_loop, &close_task); + aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_close_predicate, &args); + aws_mutex_unlock(&args.mutex); + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: close task completed.", (void *)socket, socket->io_handle.data.fd); + if (args.ret_code) { + return aws_raise_error(args.ret_code); + } + + return AWS_OP_SUCCESS; + } + + if (socket_impl->currently_subscribed) { + if (socket->state & LISTENING) { + aws_socket_stop_accept(socket); + } else { + int err_code = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); + + if (err_code) { + return AWS_OP_ERR; + } + } + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + } + } + + if (socket_impl->close_happened) { + *socket_impl->close_happened = true; + } + + if (socket_impl->connect_args) { + socket_impl->connect_args->socket = NULL; + socket_impl->connect_args = NULL; + } + + if (aws_socket_is_open(socket)) { + close(socket->io_handle.data.fd); + socket->io_handle.data.fd = -1; + socket->state = CLOSED; + + /* after close, just go ahead and clear out the pending writes queue + * and tell the user they were cancelled. */ + while (!aws_linked_list_empty(&socket_impl->write_queue)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); + struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); + + write_request->written_fn( + socket, AWS_IO_SOCKET_CLOSED, write_request->original_buffer_len, write_request->write_user_data); + aws_mem_release(socket->allocator, write_request); + } + } + + return AWS_OP_SUCCESS; +} + +int aws_socket_shutdown_dir(struct aws_socket *socket, enum aws_channel_direction dir) { + int how = dir == AWS_CHANNEL_DIR_READ ? 0 : 1; + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: shutting down in direction %d", (void *)socket, socket->io_handle.data.fd, dir); + if (shutdown(socket->io_handle.data.fd, how)) { + int aws_error = s_determine_socket_error(errno); + return aws_raise_error(aws_error); + } + + if (dir == AWS_CHANNEL_DIR_READ) { + socket->state &= ~CONNECTED_READ; + } else { + socket->state &= ~CONNECTED_WRITE; + } + + return AWS_OP_SUCCESS; +} + +/* this gets called in two scenarios. + * 1st scenario, someone called aws_socket_write() and we want to try writing now, so an error can be returned + * immediately if something bad has happened to the socket. In this case, `parent_request` is set. + * 2nd scenario, the event loop notified us that the socket went writable. In this case `parent_request` is NULL */ +static int s_process_write_requests(struct aws_socket *socket, struct write_request *parent_request) { + struct posix_socket *socket_impl = socket->impl; + struct aws_allocator *allocator = socket->allocator; + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: processing write requests.", (void *)socket, socket->io_handle.data.fd); + + /* there's a potential deadlock where we notify the user that we wrote some data, the user + * says, "cool, now I can write more and then immediately calls aws_socket_write(). We need to make sure + * that we don't allow reentrancy in that case. */ + socket_impl->write_in_progress = true; + + if (parent_request) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: processing write requests, called from aws_socket_write", + (void *)socket, + socket->io_handle.data.fd); + socket_impl->currently_in_event = true; + } else { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: processing write requests, invoked by the event-loop", + (void *)socket, + socket->io_handle.data.fd); + } + + bool purge = false; + int aws_error = AWS_OP_SUCCESS; + bool parent_request_failed = false; + + /* if a close call happens in the middle, this queue will have been cleaned out from under us. */ + while (!aws_linked_list_empty(&socket_impl->write_queue)) { + struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue); + struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: dequeued write request of size %llu, remaining to write %llu", + (void *)socket, + socket->io_handle.data.fd, + (unsigned long long)write_request->original_buffer_len, + (unsigned long long)write_request->cursor_cpy.len); + + ssize_t written = + send(socket->io_handle.data.fd, write_request->cursor_cpy.ptr, write_request->cursor_cpy.len, NO_SIGNAL); + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: send written size %d", + (void *)socket, + socket->io_handle.data.fd, + (int)written); + + if (written < 0) { + int error = errno; + if (error == EAGAIN) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd); + break; + } + + if (error == EPIPE) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: already closed before write", + (void *)socket, + socket->io_handle.data.fd); + aws_error = AWS_IO_SOCKET_CLOSED; + aws_raise_error(aws_error); + purge = true; + break; + } + + purge = true; + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: write error with error code %d", + (void *)socket, + socket->io_handle.data.fd, + error); + aws_error = s_determine_socket_error(error); + aws_raise_error(aws_error); + break; + } + + size_t remaining_to_write = write_request->cursor_cpy.len; + + aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: remaining write request to write %llu", + (void *)socket, + socket->io_handle.data.fd, + (unsigned long long)write_request->cursor_cpy.len); + + if ((size_t)written == remaining_to_write) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: write request completed", (void *)socket, socket->io_handle.data.fd); + + aws_linked_list_remove(node); + write_request->written_fn( + socket, AWS_OP_SUCCESS, write_request->original_buffer_len, write_request->write_user_data); + aws_mem_release(allocator, write_request); + } + } + + if (purge) { + while (!aws_linked_list_empty(&socket_impl->write_queue)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); + struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); + + /* If this fn was invoked directly from aws_socket_write(), don't invoke the error callback + * as the user will be able to rely on the return value from aws_socket_write() */ + if (write_request == parent_request) { + parent_request_failed = true; + } else { + write_request->written_fn(socket, aws_error, 0, write_request->write_user_data); + } + + aws_mem_release(socket->allocator, write_request); + } + } + + socket_impl->write_in_progress = false; + + if (parent_request) { + socket_impl->currently_in_event = false; + } + + if (socket_impl->clean_yourself_up) { + aws_mem_release(allocator, socket_impl); + } + + /* Only report error if aws_socket_write() invoked this function and its write_request failed */ + if (!parent_request_failed) { + return AWS_OP_SUCCESS; + } + + aws_raise_error(aws_error); + return AWS_OP_ERR; +} + +static void s_on_socket_io_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + (void)event_loop; + (void)handle; + /* this is to handle a race condition when an error kicks off a cleanup, or the user decides + * to close the socket based on something they read (SSL validation failed for example). + * if clean_up happens when currently_in_event is true, socket_impl is kept dangling but currently + * subscribed is set to false. */ + struct aws_socket *socket = user_data; + struct posix_socket *socket_impl = socket->impl; + struct aws_allocator *allocator = socket->allocator; + + socket_impl->currently_in_event = true; + + if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) { + aws_raise_error(AWS_IO_SOCKET_CLOSED); + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data); + } + goto end_check; + } + + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_ERROR) { + int aws_error = aws_socket_get_error(socket); + aws_raise_error(aws_error); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: error event occurred", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, aws_error, socket->readable_user_data); + } + goto end_check; + } + + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) { + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data); + } + } + /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not + * have been cleaned up, so this next branch is safe. */ + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) { + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd); + s_process_write_requests(socket, NULL); + } + +end_check: + socket_impl->currently_in_event = false; + + if (socket_impl->clean_yourself_up) { + aws_mem_release(allocator, socket_impl); + } +} + +int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct aws_event_loop *event_loop) { + if (!socket->event_loop) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: assigning to event loop %p", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop); + socket->event_loop = event_loop; + struct posix_socket *socket_impl = socket->impl; + socket_impl->currently_subscribed = true; + if (aws_event_loop_subscribe_to_io_events( + event_loop, + &socket->io_handle, + AWS_IO_EVENT_TYPE_WRITABLE | AWS_IO_EVENT_TYPE_READABLE, + s_on_socket_io_event, + socket)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: assigning to event loop %p failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop, + aws_last_error()); + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; + } + + return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); +} + +struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket) { + return socket->event_loop; +} + +int aws_socket_subscribe_to_readable_events( + struct aws_socket *socket, + aws_socket_on_readable_fn *on_readable, + void *user_data) { + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, " id=%p fd=%d: subscribing to readable events", (void *)socket, socket->io_handle.data.fd); + if (!(socket->state & CONNECTED_READ)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: can't subscribe to readable events since the socket is not connected", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + } + + if (socket->readable_fn) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: can't subscribe to readable events since it is already subscribed", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); + } + + AWS_ASSERT(on_readable); + socket->readable_user_data = user_data; + socket->readable_fn = on_readable; + + return AWS_OP_SUCCESS; +} + +int aws_socket_read(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read) { + AWS_ASSERT(amount_read); + + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: cannot read from a different thread than event loop %p", + (void *)socket, + socket->io_handle.data.fd, + (void *)socket->event_loop); + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (!(socket->state & CONNECTED_READ)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: cannot read because it is not connected", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + } + + ssize_t read_val = read(socket->io_handle.data.fd, buffer->buffer + buffer->len, buffer->capacity - buffer->len); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: read of %d", (void *)socket, socket->io_handle.data.fd, (int)read_val); + + if (read_val > 0) { + *amount_read = (size_t)read_val; + buffer->len += *amount_read; + return AWS_OP_SUCCESS; + } + + /* read_val of 0 means EOF which we'll treat as AWS_IO_SOCKET_CLOSED */ + if (read_val == 0) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: zero read, socket is closed", (void *)socket, socket->io_handle.data.fd); + *amount_read = 0; + + if (buffer->capacity - buffer->len > 0) { + return aws_raise_error(AWS_IO_SOCKET_CLOSED); + } + + return AWS_OP_SUCCESS; + } + + int error = errno; +#if defined(EWOULDBLOCK) + if (error == EAGAIN || error == EWOULDBLOCK) { +#else + if (error == EAGAIN) { +#endif + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: read would block", (void *)socket, socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); + } + + if (error == EPIPE) { + AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket is closed.", (void *)socket, socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_CLOSED); + } + + if (error == ETIMEDOUT) { + AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket timed out.", (void *)socket, socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_TIMEOUT); + } + + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: read failed with error: %s", + (void *)socket, + socket->io_handle.data.fd, + strerror(error)); + return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); +} + +int aws_socket_write( + struct aws_socket *socket, + const struct aws_byte_cursor *cursor, + aws_socket_on_write_completed_fn *written_fn, + void *user_data) { + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (!(socket->state & CONNECTED_WRITE)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: cannot write to because it is not connected", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + } + + AWS_ASSERT(written_fn); + struct posix_socket *socket_impl = socket->impl; + struct write_request *write_request = aws_mem_calloc(socket->allocator, 1, sizeof(struct write_request)); + + if (!write_request) { + return AWS_OP_ERR; + } + + write_request->original_buffer_len = cursor->len; + write_request->written_fn = written_fn; + write_request->write_user_data = user_data; + write_request->cursor_cpy = *cursor; + aws_linked_list_push_back(&socket_impl->write_queue, &write_request->node); + + /* avoid reentrancy when a user calls write after receiving their completion callback. */ + if (!socket_impl->write_in_progress) { + return s_process_write_requests(socket, write_request); + } + + return AWS_OP_SUCCESS; +} + +int aws_socket_get_error(struct aws_socket *socket) { + int connect_result; + socklen_t result_length = sizeof(connect_result); + + if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { + return AWS_OP_ERR; + } + + if (connect_result) { + return s_determine_socket_error(connect_result); + } + + return AWS_OP_SUCCESS; +} + +bool aws_socket_is_open(struct aws_socket *socket) { + return socket->io_handle.data.fd >= 0; +} |