diff options
author | orivej <orivej@yandex-team.ru> | 2022-02-10 16:44:49 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:49 +0300 |
commit | 718c552901d703c502ccbefdfc3c9028d608b947 (patch) | |
tree | 46534a98bbefcd7b1f3faa5b52c138ab27db75b7 /contrib/restricted/aws/aws-c-io/source/posix/socket.c | |
parent | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (diff) | |
download | ydb-718c552901d703c502ccbefdfc3c9028d608b947.tar.gz |
Restoring authorship annotation for <orivej@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/restricted/aws/aws-c-io/source/posix/socket.c')
-rw-r--r-- | contrib/restricted/aws/aws-c-io/source/posix/socket.c | 3554 |
1 files changed, 1777 insertions, 1777 deletions
diff --git a/contrib/restricted/aws/aws-c-io/source/posix/socket.c b/contrib/restricted/aws/aws-c-io/source/posix/socket.c index 5f11cdff52..7ac30b39c2 100644 --- a/contrib/restricted/aws/aws-c-io/source/posix/socket.c +++ b/contrib/restricted/aws/aws-c-io/source/posix/socket.c @@ -1,1777 +1,1777 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include <aws/io/socket.h> - -#include <aws/common/clock.h> -#include <aws/common/condition_variable.h> -#include <aws/common/mutex.h> -#include <aws/common/string.h> - -#include <aws/io/event_loop.h> -#include <aws/io/logging.h> - -#include <arpa/inet.h> -#include <aws/io/io.h> -#include <errno.h> -#include <fcntl.h> -#include <netinet/tcp.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <unistd.h> - -#if defined(__MACH__) -# define NO_SIGNAL SO_NOSIGPIPE -# define TCP_KEEPIDLE TCP_KEEPALIVE -#else -# define NO_SIGNAL MSG_NOSIGNAL -#endif - -/* This isn't defined on ancient linux distros (breaking the builds). - * However, if this is a prebuild, we purposely build on an ancient system, but - * we want the kernel calls to still be the same as a modern build since that's likely the target of the application - * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag - * gets passed as long as it does. - */ -#ifndef O_CLOEXEC -# define O_CLOEXEC 02000000 -#endif - -#ifdef USE_VSOCK -# if defined(__linux__) && defined(AF_VSOCK) -# include <linux/vm_sockets.h> -# else -# error "USE_VSOCK not supported on current platform" -# endif -#endif - -/* other than CONNECTED_READ | CONNECTED_WRITE - * a socket is only in one of these states at a time. */ -enum socket_state { - INIT = 0x01, - CONNECTING = 0x02, - CONNECTED_READ = 0x04, - CONNECTED_WRITE = 0x08, - BOUND = 0x10, - LISTENING = 0x20, - TIMEDOUT = 0x40, - ERROR = 0x80, - CLOSED, -}; - -static int s_convert_domain(enum aws_socket_domain domain) { - switch (domain) { - case AWS_SOCKET_IPV4: - return AF_INET; - case AWS_SOCKET_IPV6: - return AF_INET6; - case AWS_SOCKET_LOCAL: - return AF_UNIX; -#ifdef USE_VSOCK - case AWS_SOCKET_VSOCK: - return AF_VSOCK; -#endif - default: - AWS_ASSERT(0); - return AF_INET; - } -} - -static int s_convert_type(enum aws_socket_type type) { - switch (type) { - case AWS_SOCKET_STREAM: - return SOCK_STREAM; - case AWS_SOCKET_DGRAM: - return SOCK_DGRAM; - default: - AWS_ASSERT(0); - return SOCK_STREAM; - } -} - -static int s_determine_socket_error(int error) { - switch (error) { - case ECONNREFUSED: - return AWS_IO_SOCKET_CONNECTION_REFUSED; - case ETIMEDOUT: - return AWS_IO_SOCKET_TIMEOUT; - case EHOSTUNREACH: - case ENETUNREACH: - return AWS_IO_SOCKET_NO_ROUTE_TO_HOST; - case EADDRNOTAVAIL: - return AWS_IO_SOCKET_INVALID_ADDRESS; - case ENETDOWN: - return AWS_IO_SOCKET_NETWORK_DOWN; - case ECONNABORTED: - return AWS_IO_SOCKET_CONNECT_ABORTED; - case EADDRINUSE: - return AWS_IO_SOCKET_ADDRESS_IN_USE; - case ENOBUFS: - case ENOMEM: - return AWS_ERROR_OOM; - case EAGAIN: - return AWS_IO_READ_WOULD_BLOCK; - case EMFILE: - case ENFILE: - return AWS_ERROR_MAX_FDS_EXCEEDED; - case ENOENT: - case EINVAL: - return AWS_ERROR_FILE_INVALID_PATH; - case EAFNOSUPPORT: - return AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY; - case EACCES: - return AWS_ERROR_NO_PERMISSION; - default: - return AWS_IO_SOCKET_NOT_CONNECTED; - } -} - -static int s_create_socket(struct aws_socket *sock, const struct aws_socket_options *options) { - - int fd = socket(s_convert_domain(options->domain), s_convert_type(options->type), 0); - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: initializing with domain %d and type %d", - (void *)sock, - fd, - options->domain, - options->type); - if (fd != -1) { - int flags = fcntl(fd, F_GETFL, 0); - flags |= O_NONBLOCK | O_CLOEXEC; - int success = fcntl(fd, F_SETFL, flags); - (void)success; - sock->io_handle.data.fd = fd; - sock->io_handle.additional_data = NULL; - return aws_socket_set_options(sock, options); - } - - int aws_error = s_determine_socket_error(errno); - return aws_raise_error(aws_error); -} - -struct posix_socket_connect_args { - struct aws_task task; - struct aws_allocator *allocator; - struct aws_socket *socket; -}; - -struct posix_socket { - struct aws_linked_list write_queue; - struct posix_socket_connect_args *connect_args; - bool write_in_progress; - bool currently_subscribed; - bool continue_accept; - bool currently_in_event; - bool clean_yourself_up; - bool *close_happened; -}; - -static int s_socket_init( - struct aws_socket *socket, - struct aws_allocator *alloc, - const struct aws_socket_options *options, - int existing_socket_fd) { - AWS_ASSERT(options); - AWS_ZERO_STRUCT(*socket); - - struct posix_socket *posix_socket = aws_mem_calloc(alloc, 1, sizeof(struct posix_socket)); - if (!posix_socket) { - socket->impl = NULL; - return AWS_OP_ERR; - } - - socket->allocator = alloc; - socket->io_handle.data.fd = -1; - socket->state = INIT; - socket->options = *options; - - if (existing_socket_fd < 0) { - int err = s_create_socket(socket, options); - if (err) { - aws_mem_release(alloc, posix_socket); - socket->impl = NULL; - return AWS_OP_ERR; - } - } else { - socket->io_handle = (struct aws_io_handle){ - .data = {.fd = existing_socket_fd}, - .additional_data = NULL, - }; - aws_socket_set_options(socket, options); - } - - aws_linked_list_init(&posix_socket->write_queue); - posix_socket->write_in_progress = false; - posix_socket->currently_subscribed = false; - posix_socket->continue_accept = false; - posix_socket->currently_in_event = false; - posix_socket->clean_yourself_up = false; - posix_socket->connect_args = NULL; - posix_socket->close_happened = NULL; - socket->impl = posix_socket; - return AWS_OP_SUCCESS; -} - -int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) { - AWS_ASSERT(options); - return s_socket_init(socket, alloc, options, -1); -} - -void aws_socket_clean_up(struct aws_socket *socket) { - if (!socket->impl) { - /* protect from double clean */ - return; - } - if (aws_socket_is_open(socket)) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p fd=%d: is still open, closing...", (void *)socket, socket->io_handle.data.fd); - aws_socket_close(socket); - } - struct posix_socket *socket_impl = socket->impl; - - if (!socket_impl->currently_in_event) { - aws_mem_release(socket->allocator, socket->impl); - } else { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: is still pending io letting it dangle and cleaning up later.", - (void *)socket, - socket->io_handle.data.fd); - socket_impl->clean_yourself_up = true; - } - - AWS_ZERO_STRUCT(*socket); - socket->io_handle.data.fd = -1; -} - -static void s_on_connection_error(struct aws_socket *socket, int error); - -static int s_on_connection_success(struct aws_socket *socket) { - - struct aws_event_loop *event_loop = socket->event_loop; - struct posix_socket *socket_impl = socket->impl; - - if (socket_impl->currently_subscribed) { - aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); - socket_impl->currently_subscribed = false; - } - - socket->event_loop = NULL; - - int connect_result; - socklen_t result_length = sizeof(connect_result); - - if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to determine connection error %d", - (void *)socket, - socket->io_handle.data.fd, - errno); - int aws_error = s_determine_socket_error(errno); - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - return AWS_OP_ERR; - } - - if (connect_result) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connection error %d", - (void *)socket, - socket->io_handle.data.fd, - connect_result); - int aws_error = s_determine_socket_error(connect_result); - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - return AWS_OP_ERR; - } - - AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd); - - struct sockaddr_storage address; - AWS_ZERO_STRUCT(address); - socklen_t address_size = sizeof(address); - if (!getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size)) { - uint16_t port = 0; - - if (address.ss_family == AF_INET) { - struct sockaddr_in *s = (struct sockaddr_in *)&address; - port = ntohs(s->sin_port); - /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal - * once we add logging, we can log this if it fails. */ - if (inet_ntop( - AF_INET, &s->sin_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: local endpoint %s:%d", - (void *)socket, - socket->io_handle.data.fd, - socket->local_endpoint.address, - port); - } else { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: determining local endpoint failed", - (void *)socket, - socket->io_handle.data.fd); - } - } else if (address.ss_family == AF_INET6) { - struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address; - port = ntohs(s->sin6_port); - /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal - * once we add logging, we can log this if it fails. */ - if (inet_ntop( - AF_INET6, &s->sin6_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd %d: local endpoint %s:%d", - (void *)socket, - socket->io_handle.data.fd, - socket->local_endpoint.address, - port); - } else { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: determining local endpoint failed", - (void *)socket, - socket->io_handle.data.fd); - } - } - - socket->local_endpoint.port = port; - } else { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: getsockname() failed with error %d", - (void *)socket, - socket->io_handle.data.fd, - errno); - int aws_error = s_determine_socket_error(errno); - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - return AWS_OP_ERR; - } - - socket->state = CONNECTED_WRITE | CONNECTED_READ; - - if (aws_socket_assign_to_event_loop(socket, event_loop)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: assignment to event loop %p failed with error %d", - (void *)socket, - socket->io_handle.data.fd, - (void *)event_loop, - aws_last_error()); - s_on_connection_error(socket, aws_last_error()); - return AWS_OP_ERR; - } - - socket->connection_result_fn(socket, AWS_ERROR_SUCCESS, socket->connect_accept_user_data); - - return AWS_OP_SUCCESS; -} - -static void s_on_connection_error(struct aws_socket *socket, int error) { - socket->state = ERROR; - AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection failure", (void *)socket, socket->io_handle.data.fd); - if (socket->connection_result_fn) { - socket->connection_result_fn(socket, error, socket->connect_accept_user_data); - } else if (socket->accept_result_fn) { - socket->accept_result_fn(socket, error, NULL, socket->connect_accept_user_data); - } -} - -/* the next two callbacks compete based on which one runs first. if s_socket_connect_event - * comes back first, then we set socket_args->socket = NULL and continue on with the connection. - * if s_handle_socket_timeout() runs first, is sees socket_args->socket is NULL and just cleans up its memory. - * s_handle_socket_timeout() will always run so the memory for socket_connect_args is always cleaned up there. */ -static void s_socket_connect_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - - (void)event_loop; - (void)handle; - - struct posix_socket_connect_args *socket_args = (struct posix_socket_connect_args *)user_data; - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "fd=%d: connection activity handler triggered ", handle->data.fd); - - if (socket_args->socket) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: has not timed out yet proceeding with connection.", - (void *)socket_args->socket, - handle->data.fd); - - struct posix_socket *socket_impl = socket_args->socket->impl; - if (!(events & AWS_IO_EVENT_TYPE_ERROR || events & AWS_IO_EVENT_TYPE_CLOSED) && - (events & AWS_IO_EVENT_TYPE_READABLE || events & AWS_IO_EVENT_TYPE_WRITABLE)) { - struct aws_socket *socket = socket_args->socket; - socket_args->socket = NULL; - socket_impl->connect_args = NULL; - s_on_connection_success(socket); - return; - } - - int aws_error = aws_socket_get_error(socket_args->socket); - /* we'll get another notification. */ - if (aws_error == AWS_IO_READ_WOULD_BLOCK) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: spurious event, waiting for another notification.", - (void *)socket_args->socket, - handle->data.fd); - return; - } - - struct aws_socket *socket = socket_args->socket; - socket_args->socket = NULL; - socket_impl->connect_args = NULL; - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - } -} - -static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) { - (void)task; - (void)status; - - struct posix_socket_connect_args *socket_args = args; - - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task); - /* successful connection will have nulled out connect_args->socket */ - if (socket_args->socket) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: timed out, shutting down.", - (void *)socket_args->socket, - socket_args->socket->io_handle.data.fd); - - socket_args->socket->state = TIMEDOUT; - int error_code = AWS_IO_SOCKET_TIMEOUT; - - if (status == AWS_TASK_STATUS_RUN_READY) { - aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle); - } else { - error_code = AWS_IO_EVENT_LOOP_SHUTDOWN; - aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle); - } - socket_args->socket->event_loop = NULL; - struct posix_socket *socket_impl = socket_args->socket->impl; - socket_impl->currently_subscribed = false; - aws_raise_error(error_code); - struct aws_socket *socket = socket_args->socket; - /*socket close sets socket_args->socket to NULL and - * socket_impl->connect_args to NULL. */ - aws_socket_close(socket); - s_on_connection_error(socket, error_code); - } - - aws_mem_release(socket_args->allocator, socket_args); -} - -/* this is used simply for moving a connect_success callback when the connect finished immediately - * (like for unix domain sockets) into the event loop's thread. Also note, in that case there was no - * timeout task scheduled, so in this case the socket_args are cleaned up. */ -static void s_run_connect_success(struct aws_task *task, void *arg, enum aws_task_status status) { - (void)task; - struct posix_socket_connect_args *socket_args = arg; - - if (socket_args->socket) { - struct posix_socket *socket_impl = socket_args->socket->impl; - if (status == AWS_TASK_STATUS_RUN_READY) { - s_on_connection_success(socket_args->socket); - } else { - aws_raise_error(AWS_IO_SOCKET_CONNECT_ABORTED); - socket_args->socket->event_loop = NULL; - s_on_connection_error(socket_args->socket, AWS_IO_SOCKET_CONNECT_ABORTED); - } - socket_impl->connect_args = NULL; - } - - aws_mem_release(socket_args->allocator, socket_args); -} - -static inline int s_convert_pton_error(int pton_code) { - if (pton_code == 0) { - return AWS_IO_SOCKET_INVALID_ADDRESS; - } - - return s_determine_socket_error(errno); -} - -struct socket_address { - union sock_addr_types { - struct sockaddr_in addr_in; - struct sockaddr_in6 addr_in6; - struct sockaddr_un un_addr; -#ifdef USE_VSOCK - struct sockaddr_vm vm_addr; -#endif - } sock_addr_types; -}; - -#ifdef USE_VSOCK -/** Convert a string to a VSOCK CID. Respects the calling convetion of inet_pton: - * 0 on error, 1 on success. */ -static int parse_cid(const char *cid_str, unsigned int *value) { - if (cid_str == NULL || value == NULL) { - errno = EINVAL; - return 0; - } - /* strtoll returns 0 as both error and correct value */ - errno = 0; - /* unsigned long long to handle edge cases in convention explicitly */ - long long cid = strtoll(cid_str, NULL, 10); - if (errno != 0) { - return 0; - } - - /* -1U means any, so it's a valid value, but it needs to be converted to - * unsigned int. */ - if (cid == -1) { - *value = VMADDR_CID_ANY; - return 1; - } - - if (cid < 0 || cid > UINT_MAX) { - errno = ERANGE; - return 0; - } - - /* cast is safe here, edge cases already checked */ - *value = (unsigned int)cid; - return 1; -} -#endif - -int aws_socket_connect( - struct aws_socket *socket, - const struct aws_socket_endpoint *remote_endpoint, - struct aws_event_loop *event_loop, - aws_socket_on_connection_result_fn *on_connection_result, - void *user_data) { - AWS_ASSERT(event_loop); - AWS_ASSERT(!socket->event_loop); - - AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd); - - if (socket->event_loop) { - return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); - } - - if (socket->options.type != AWS_SOCKET_DGRAM) { - AWS_ASSERT(on_connection_result); - if (socket->state != INIT) { - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - } else { /* UDP socket */ - /* UDP sockets jump to CONNECT_READ if bind is called first */ - if (socket->state != CONNECTED_READ && socket->state != INIT) { - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - } - - size_t address_strlen; - if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { - return AWS_OP_ERR; - } - - struct socket_address address; - AWS_ZERO_STRUCT(address); - socklen_t sock_size = 0; - int pton_err = 1; - if (socket->options.domain == AWS_SOCKET_IPV4) { - pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); - address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port); - address.sock_addr_types.addr_in.sin_family = AF_INET; - sock_size = sizeof(address.sock_addr_types.addr_in); - } else if (socket->options.domain == AWS_SOCKET_IPV6) { - pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); - address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port); - address.sock_addr_types.addr_in6.sin6_family = AF_INET6; - sock_size = sizeof(address.sock_addr_types.addr_in6); - } else if (socket->options.domain == AWS_SOCKET_LOCAL) { - address.sock_addr_types.un_addr.sun_family = AF_UNIX; - strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN); - sock_size = sizeof(address.sock_addr_types.un_addr); -#ifdef USE_VSOCK - } else if (socket->options.domain == AWS_SOCKET_VSOCK) { - pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); - address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; - address.sock_addr_types.vm_addr.svm_port = (unsigned int)remote_endpoint->port; - sock_size = sizeof(address.sock_addr_types.vm_addr); -#endif - } else { - AWS_ASSERT(0); - return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); - } - - if (pton_err != 1) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to parse address %s:%d.", - (void *)socket, - socket->io_handle.data.fd, - remote_endpoint->address, - (int)remote_endpoint->port); - return aws_raise_error(s_convert_pton_error(pton_err)); - } - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connecting to endpoint %s:%d.", - (void *)socket, - socket->io_handle.data.fd, - remote_endpoint->address, - (int)remote_endpoint->port); - - socket->state = CONNECTING; - socket->remote_endpoint = *remote_endpoint; - socket->connect_accept_user_data = user_data; - socket->connection_result_fn = on_connection_result; - - struct posix_socket *socket_impl = socket->impl; - - socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args)); - if (!socket_impl->connect_args) { - return AWS_OP_ERR; - } - - socket_impl->connect_args->socket = socket; - socket_impl->connect_args->allocator = socket->allocator; - - socket_impl->connect_args->task.fn = s_handle_socket_timeout; - socket_impl->connect_args->task.arg = socket_impl->connect_args; - - int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); - socket->event_loop = event_loop; - - if (!error_code) { - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connected immediately, not scheduling timeout.", - (void *)socket, - socket->io_handle.data.fd); - socket_impl->connect_args->task.fn = s_run_connect_success; - /* the subscription for IO will happen once we setup the connection in the task. Since we already - * know the connection succeeded, we don't need to register for events yet. */ - aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task); - } - - if (error_code) { - error_code = errno; - if (error_code == EINPROGRESS || error_code == EALREADY) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.", - (void *)socket, - socket->io_handle.data.fd); - /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately - * and null out the connect args */ - struct aws_task *timeout_task = &socket_impl->connect_args->task; - - socket_impl->currently_subscribed = true; - /* This event is for when the connection finishes. (the fd will flip writable). */ - if (aws_event_loop_subscribe_to_io_events( - event_loop, - &socket->io_handle, - AWS_IO_EVENT_TYPE_WRITABLE, - s_socket_connect_event, - socket_impl->connect_args)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to register with event-loop %p.", - (void *)socket, - socket->io_handle.data.fd, - (void *)event_loop); - socket_impl->currently_subscribed = false; - socket->event_loop = NULL; - goto err_clean_up; - } - - /* schedule a task to run at the connect timeout interval, if this task runs before the connect - * happens, we consider that a timeout. */ - uint64_t timeout = 0; - aws_event_loop_current_clock_time(event_loop, &timeout); - timeout += aws_timestamp_convert( - socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: scheduling timeout task for %llu.", - (void *)socket, - socket->io_handle.data.fd, - (unsigned long long)timeout); - aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout); - } else { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connect failed with error code %d.", - (void *)socket, - socket->io_handle.data.fd, - error_code); - int aws_error = s_determine_socket_error(error_code); - aws_raise_error(aws_error); - socket->event_loop = NULL; - socket_impl->currently_subscribed = false; - goto err_clean_up; - } - } - return AWS_OP_SUCCESS; - -err_clean_up: - aws_mem_release(socket->allocator, socket_impl->connect_args); - socket_impl->connect_args = NULL; - return AWS_OP_ERR; -} - -int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint) { - if (socket->state != INIT) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: invalid state for bind operation.", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - size_t address_strlen; - if (aws_secure_strlen(local_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { - return AWS_OP_ERR; - } - - int error_code = -1; - - socket->local_endpoint = *local_endpoint; - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: binding to %s:%d.", - (void *)socket, - socket->io_handle.data.fd, - local_endpoint->address, - (int)local_endpoint->port); - - struct socket_address address; - AWS_ZERO_STRUCT(address); - socklen_t sock_size = 0; - int pton_err = 1; - if (socket->options.domain == AWS_SOCKET_IPV4) { - pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); - address.sock_addr_types.addr_in.sin_port = htons(local_endpoint->port); - address.sock_addr_types.addr_in.sin_family = AF_INET; - sock_size = sizeof(address.sock_addr_types.addr_in); - } else if (socket->options.domain == AWS_SOCKET_IPV6) { - pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); - address.sock_addr_types.addr_in6.sin6_port = htons(local_endpoint->port); - address.sock_addr_types.addr_in6.sin6_family = AF_INET6; - sock_size = sizeof(address.sock_addr_types.addr_in6); - } else if (socket->options.domain == AWS_SOCKET_LOCAL) { - address.sock_addr_types.un_addr.sun_family = AF_UNIX; - strncpy(address.sock_addr_types.un_addr.sun_path, local_endpoint->address, AWS_ADDRESS_MAX_LEN); - sock_size = sizeof(address.sock_addr_types.un_addr); -#ifdef USE_VSOCK - } else if (socket->options.domain == AWS_SOCKET_VSOCK) { - pton_err = parse_cid(local_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); - address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; - address.sock_addr_types.vm_addr.svm_port = (unsigned int)local_endpoint->port; - sock_size = sizeof(address.sock_addr_types.vm_addr); -#endif - } else { - AWS_ASSERT(0); - return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); - } - - if (pton_err != 1) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to parse address %s:%d.", - (void *)socket, - socket->io_handle.data.fd, - local_endpoint->address, - (int)local_endpoint->port); - return aws_raise_error(s_convert_pton_error(pton_err)); - } - - error_code = bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); - - if (!error_code) { - if (socket->options.type == AWS_SOCKET_STREAM) { - socket->state = BOUND; - } else { - /* e.g. UDP is now readable */ - socket->state = CONNECTED_READ; - } - AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully bound", (void *)socket, socket->io_handle.data.fd); - - return AWS_OP_SUCCESS; - } - - socket->state = ERROR; - error_code = errno; - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: bind failed with error code %d", - (void *)socket, - socket->io_handle.data.fd, - error_code); - - int aws_error = s_determine_socket_error(error_code); - return aws_raise_error(aws_error); -} - -int aws_socket_listen(struct aws_socket *socket, int backlog_size) { - if (socket->state != BOUND) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: invalid state for listen operation. You must call bind first.", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - int error_code = listen(socket->io_handle.data.fd, backlog_size); - - if (!error_code) { - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully listening", (void *)socket, socket->io_handle.data.fd); - socket->state = LISTENING; - return AWS_OP_SUCCESS; - } - - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: listen failed with error code %d", - (void *)socket, - socket->io_handle.data.fd, - error_code); - error_code = errno; - socket->state = ERROR; - - return aws_raise_error(s_determine_socket_error(error_code)); -} - -/* this is called by the event loop handler that was installed in start_accept(). It runs once the FD goes readable, - * accepts as many as it can and then returns control to the event loop. */ -static void s_socket_accept_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - - (void)event_loop; - - struct aws_socket *socket = user_data; - struct posix_socket *socket_impl = socket->impl; - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd); - - if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) { - int in_fd = 0; - while (socket_impl->continue_accept && in_fd != -1) { - struct sockaddr_storage in_addr; - socklen_t in_len = sizeof(struct sockaddr_storage); - - in_fd = accept(handle->data.fd, (struct sockaddr *)&in_addr, &in_len); - if (in_fd == -1) { - int error = errno; - - if (error == EAGAIN || error == EWOULDBLOCK) { - break; - } - - int aws_error = aws_socket_get_error(socket); - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); - break; - } - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p fd=%d: incoming connection", (void *)socket, socket->io_handle.data.fd); - - struct aws_socket *new_sock = aws_mem_acquire(socket->allocator, sizeof(struct aws_socket)); - - if (!new_sock) { - close(in_fd); - s_on_connection_error(socket, aws_last_error()); - continue; - } - - if (s_socket_init(new_sock, socket->allocator, &socket->options, in_fd)) { - aws_mem_release(socket->allocator, new_sock); - s_on_connection_error(socket, aws_last_error()); - continue; - } - - new_sock->local_endpoint = socket->local_endpoint; - new_sock->state = CONNECTED_READ | CONNECTED_WRITE; - uint16_t port = 0; - - /* get the info on the incoming socket's address */ - if (in_addr.ss_family == AF_INET) { - struct sockaddr_in *s = (struct sockaddr_in *)&in_addr; - port = ntohs(s->sin_port); - /* this came from the kernel, a.) it won't fail. b.) even if it does - * its not fatal. come back and add logging later. */ - if (!inet_ntop( - AF_INET, - &s->sin_addr, - new_sock->remote_endpoint.address, - sizeof(new_sock->remote_endpoint.address))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d:. Failed to determine remote address.", - (void *)socket, - socket->io_handle.data.fd) - } - new_sock->options.domain = AWS_SOCKET_IPV4; - } else if (in_addr.ss_family == AF_INET6) { - /* this came from the kernel, a.) it won't fail. b.) even if it does - * its not fatal. come back and add logging later. */ - struct sockaddr_in6 *s = (struct sockaddr_in6 *)&in_addr; - port = ntohs(s->sin6_port); - if (!inet_ntop( - AF_INET6, - &s->sin6_addr, - new_sock->remote_endpoint.address, - sizeof(new_sock->remote_endpoint.address))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d:. Failed to determine remote address.", - (void *)socket, - socket->io_handle.data.fd) - } - new_sock->options.domain = AWS_SOCKET_IPV6; - } else if (in_addr.ss_family == AF_UNIX) { - new_sock->remote_endpoint = socket->local_endpoint; - new_sock->options.domain = AWS_SOCKET_LOCAL; - } - - new_sock->remote_endpoint.port = port; - - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: connected to %s:%d, incoming fd %d", - (void *)socket, - socket->io_handle.data.fd, - new_sock->remote_endpoint.address, - new_sock->remote_endpoint.port, - in_fd); - - int flags = fcntl(in_fd, F_GETFL, 0); - - flags |= O_NONBLOCK | O_CLOEXEC; - fcntl(in_fd, F_SETFL, flags); - - bool close_occurred = false; - socket_impl->close_happened = &close_occurred; - socket->accept_result_fn(socket, AWS_ERROR_SUCCESS, new_sock, socket->connect_accept_user_data); - - if (close_occurred) { - return; - } - - socket_impl->close_happened = NULL; - } - } - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: finished processing incoming connections, " - "waiting on event-loop notification", - (void *)socket, - socket->io_handle.data.fd); -} - -int aws_socket_start_accept( - struct aws_socket *socket, - struct aws_event_loop *accept_loop, - aws_socket_on_accept_result_fn *on_accept_result, - void *user_data) { - AWS_ASSERT(on_accept_result); - AWS_ASSERT(accept_loop); - - if (socket->event_loop) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: is already assigned to event-loop %p.", - (void *)socket, - socket->io_handle.data.fd, - (void *)socket->event_loop); - return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); - } - - if (socket->state != LISTENING) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: invalid state for start_accept operation. You must call listen first.", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - socket->accept_result_fn = on_accept_result; - socket->connect_accept_user_data = user_data; - socket->event_loop = accept_loop; - struct posix_socket *socket_impl = socket->impl; - socket_impl->continue_accept = true; - socket_impl->currently_subscribed = true; - - if (aws_event_loop_subscribe_to_io_events( - socket->event_loop, &socket->io_handle, AWS_IO_EVENT_TYPE_READABLE, s_socket_accept_event, socket)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: failed to subscribe to event-loop %p.", - (void *)socket, - socket->io_handle.data.fd, - (void *)socket->event_loop); - socket_impl->continue_accept = false; - socket_impl->currently_subscribed = false; - socket->event_loop = NULL; - - return AWS_OP_ERR; - } - - return AWS_OP_SUCCESS; -} - -struct stop_accept_args { - struct aws_task task; - struct aws_mutex mutex; - struct aws_condition_variable condition_variable; - struct aws_socket *socket; - int ret_code; - bool invoked; -}; - -static bool s_stop_accept_pred(void *arg) { - struct stop_accept_args *stop_accept_args = arg; - return stop_accept_args->invoked; -} - -static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) { - (void)task; - (void)status; - - struct stop_accept_args *stop_accept_args = arg; - aws_mutex_lock(&stop_accept_args->mutex); - stop_accept_args->ret_code = AWS_OP_SUCCESS; - if (aws_socket_stop_accept(stop_accept_args->socket)) { - stop_accept_args->ret_code = aws_last_error(); - } - stop_accept_args->invoked = true; - aws_condition_variable_notify_one(&stop_accept_args->condition_variable); - aws_mutex_unlock(&stop_accept_args->mutex); -} - -int aws_socket_stop_accept(struct aws_socket *socket) { - if (socket->state != LISTENING) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: is not in a listening state, can't stop_accept.", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, "id=%p fd=%d: stopping accepting new connections", (void *)socket, socket->io_handle.data.fd); - - if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { - struct stop_accept_args args = {.mutex = AWS_MUTEX_INIT, - .condition_variable = AWS_CONDITION_VARIABLE_INIT, - .invoked = false, - .socket = socket, - .ret_code = AWS_OP_SUCCESS, - .task = {.fn = s_stop_accept_task}}; - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: stopping accepting new connections from a different thread than " - "the socket is running from. Blocking until it shuts down.", - (void *)socket, - socket->io_handle.data.fd); - /* Look.... I know what I'm doing.... trust me, I'm an engineer. - * We wait on the completion before 'args' goes out of scope. - * NOLINTNEXTLINE */ - args.task.arg = &args; - aws_mutex_lock(&args.mutex); - aws_event_loop_schedule_task_now(socket->event_loop, &args.task); - aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_stop_accept_pred, &args); - aws_mutex_unlock(&args.mutex); - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: stop accept task finished running.", - (void *)socket, - socket->io_handle.data.fd); - - if (args.ret_code) { - return aws_raise_error(args.ret_code); - } - return AWS_OP_SUCCESS; - } - - int ret_val = AWS_OP_SUCCESS; - struct posix_socket *socket_impl = socket->impl; - if (socket_impl->currently_subscribed) { - ret_val = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); - socket_impl->currently_subscribed = false; - socket_impl->continue_accept = false; - socket->event_loop = NULL; - } - - return ret_val; -} - -int aws_socket_set_options(struct aws_socket *socket, const struct aws_socket_options *options) { - if (socket->options.domain != options->domain || socket->options.type != options->type) { - return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); - } - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setting socket options to: keep-alive %d, keep idle %d, keep-alive interval %d, keep-alive probe " - "count %d.", - (void *)socket, - socket->io_handle.data.fd, - (int)options->keepalive, - (int)options->keep_alive_timeout_sec, - (int)options->keep_alive_interval_sec, - (int)options->keep_alive_max_failed_probes); - - socket->options = *options; - - int option_value = 1; - if (AWS_UNLIKELY( - setsockopt(socket->io_handle.data.fd, SOL_SOCKET, NO_SIGNAL, &option_value, sizeof(option_value)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for NO_SIGNAL failed with errno %d. If you are having SIGPIPE signals thrown, " - "you may" - " want to install a signal trap in your application layer.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - - int reuse = 1; - if (AWS_UNLIKELY(setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for SO_REUSEADDR failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - - if (options->type == AWS_SOCKET_STREAM && options->domain != AWS_SOCKET_LOCAL) { - if (socket->options.keepalive) { - int keep_alive = 1; - if (AWS_UNLIKELY( - setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(int)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for enabling SO_KEEPALIVE failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - } - - if (socket->options.keep_alive_interval_sec && socket->options.keep_alive_timeout_sec) { - int ival_in_secs = socket->options.keep_alive_interval_sec; - if (AWS_UNLIKELY(setsockopt( - socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPIDLE, &ival_in_secs, sizeof(ival_in_secs)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for enabling TCP_KEEPIDLE for TCP failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - - ival_in_secs = socket->options.keep_alive_timeout_sec; - if (AWS_UNLIKELY(setsockopt( - socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPINTVL, &ival_in_secs, sizeof(ival_in_secs)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for enabling TCP_KEEPINTVL for TCP failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - } - - if (socket->options.keep_alive_max_failed_probes) { - int max_probes = socket->options.keep_alive_max_failed_probes; - if (AWS_UNLIKELY( - setsockopt(socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPCNT, &max_probes, sizeof(max_probes)))) { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: setsockopt() for enabling TCP_KEEPCNT for TCP failed with errno %d.", - (void *)socket, - socket->io_handle.data.fd, - errno); - } - } - } - - return AWS_OP_SUCCESS; -} - -struct write_request { - struct aws_byte_cursor cursor_cpy; - aws_socket_on_write_completed_fn *written_fn; - void *write_user_data; - struct aws_linked_list_node node; - size_t original_buffer_len; -}; - -struct posix_socket_close_args { - struct aws_mutex mutex; - struct aws_condition_variable condition_variable; - struct aws_socket *socket; - bool invoked; - int ret_code; -}; - -static bool s_close_predicate(void *arg) { - struct posix_socket_close_args *close_args = arg; - return close_args->invoked; -} - -static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) { - (void)task; - (void)status; - - struct posix_socket_close_args *close_args = arg; - aws_mutex_lock(&close_args->mutex); - close_args->ret_code = AWS_OP_SUCCESS; - - if (aws_socket_close(close_args->socket)) { - close_args->ret_code = aws_last_error(); - } - - close_args->invoked = true; - aws_condition_variable_notify_one(&close_args->condition_variable); - aws_mutex_unlock(&close_args->mutex); -} - -int aws_socket_close(struct aws_socket *socket) { - struct posix_socket *socket_impl = socket->impl; - AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: closing", (void *)socket, socket->io_handle.data.fd); - if (socket->event_loop) { - /* don't freak out on me, this almost never happens, and never occurs inside a channel - * it only gets hit from a listening socket shutting down or from a unit test. */ - if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: closing from a different thread than " - "the socket is running from. Blocking until it closes down.", - (void *)socket, - socket->io_handle.data.fd); - /* the only time we allow this kind of thing is when you're a listener.*/ - if (socket->state != LISTENING) { - return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); - } - - struct posix_socket_close_args args = { - .mutex = AWS_MUTEX_INIT, - .condition_variable = AWS_CONDITION_VARIABLE_INIT, - .socket = socket, - .ret_code = AWS_OP_SUCCESS, - .invoked = false, - }; - - struct aws_task close_task = { - .fn = s_close_task, - .arg = &args, - }; - - aws_mutex_lock(&args.mutex); - aws_event_loop_schedule_task_now(socket->event_loop, &close_task); - aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_close_predicate, &args); - aws_mutex_unlock(&args.mutex); - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, "id=%p fd=%d: close task completed.", (void *)socket, socket->io_handle.data.fd); - if (args.ret_code) { - return aws_raise_error(args.ret_code); - } - - return AWS_OP_SUCCESS; - } - - if (socket_impl->currently_subscribed) { - if (socket->state & LISTENING) { - aws_socket_stop_accept(socket); - } else { - int err_code = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); - - if (err_code) { - return AWS_OP_ERR; - } - } - socket_impl->currently_subscribed = false; - socket->event_loop = NULL; - } - } - - if (socket_impl->close_happened) { - *socket_impl->close_happened = true; - } - - if (socket_impl->connect_args) { - socket_impl->connect_args->socket = NULL; - socket_impl->connect_args = NULL; - } - - if (aws_socket_is_open(socket)) { - close(socket->io_handle.data.fd); - socket->io_handle.data.fd = -1; - socket->state = CLOSED; - - /* after close, just go ahead and clear out the pending writes queue - * and tell the user they were cancelled. */ - while (!aws_linked_list_empty(&socket_impl->write_queue)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); - struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); - - write_request->written_fn( - socket, AWS_IO_SOCKET_CLOSED, write_request->original_buffer_len, write_request->write_user_data); - aws_mem_release(socket->allocator, write_request); - } - } - - return AWS_OP_SUCCESS; -} - -int aws_socket_shutdown_dir(struct aws_socket *socket, enum aws_channel_direction dir) { - int how = dir == AWS_CHANNEL_DIR_READ ? 0 : 1; - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p fd=%d: shutting down in direction %d", (void *)socket, socket->io_handle.data.fd, dir); - if (shutdown(socket->io_handle.data.fd, how)) { - int aws_error = s_determine_socket_error(errno); - return aws_raise_error(aws_error); - } - - if (dir == AWS_CHANNEL_DIR_READ) { - socket->state &= ~CONNECTED_READ; - } else { - socket->state &= ~CONNECTED_WRITE; - } - - return AWS_OP_SUCCESS; -} - -/* this gets called in two scenarios. - * 1st scenario, someone called aws_socket_write() and we want to try writing now, so an error can be returned - * immediately if something bad has happened to the socket. In this case, `parent_request` is set. - * 2nd scenario, the event loop notified us that the socket went writable. In this case `parent_request` is NULL */ -static int s_process_write_requests(struct aws_socket *socket, struct write_request *parent_request) { - struct posix_socket *socket_impl = socket->impl; - struct aws_allocator *allocator = socket->allocator; - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: processing write requests.", (void *)socket, socket->io_handle.data.fd); - - /* there's a potential deadlock where we notify the user that we wrote some data, the user - * says, "cool, now I can write more and then immediately calls aws_socket_write(). We need to make sure - * that we don't allow reentrancy in that case. */ - socket_impl->write_in_progress = true; - - if (parent_request) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: processing write requests, called from aws_socket_write", - (void *)socket, - socket->io_handle.data.fd); - socket_impl->currently_in_event = true; - } else { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: processing write requests, invoked by the event-loop", - (void *)socket, - socket->io_handle.data.fd); - } - - bool purge = false; - int aws_error = AWS_OP_SUCCESS; - bool parent_request_failed = false; - - /* if a close call happens in the middle, this queue will have been cleaned out from under us. */ - while (!aws_linked_list_empty(&socket_impl->write_queue)) { - struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue); - struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: dequeued write request of size %llu, remaining to write %llu", - (void *)socket, - socket->io_handle.data.fd, - (unsigned long long)write_request->original_buffer_len, - (unsigned long long)write_request->cursor_cpy.len); - - ssize_t written = - send(socket->io_handle.data.fd, write_request->cursor_cpy.ptr, write_request->cursor_cpy.len, NO_SIGNAL); - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: send written size %d", - (void *)socket, - socket->io_handle.data.fd, - (int)written); - - if (written < 0) { - int error = errno; - if (error == EAGAIN) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd); - break; - } - - if (error == EPIPE) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: already closed before write", - (void *)socket, - socket->io_handle.data.fd); - aws_error = AWS_IO_SOCKET_CLOSED; - aws_raise_error(aws_error); - purge = true; - break; - } - - purge = true; - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: write error with error code %d", - (void *)socket, - socket->io_handle.data.fd, - error); - aws_error = s_determine_socket_error(error); - aws_raise_error(aws_error); - break; - } - - size_t remaining_to_write = write_request->cursor_cpy.len; - - aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written); - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: remaining write request to write %llu", - (void *)socket, - socket->io_handle.data.fd, - (unsigned long long)write_request->cursor_cpy.len); - - if ((size_t)written == remaining_to_write) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: write request completed", (void *)socket, socket->io_handle.data.fd); - - aws_linked_list_remove(node); - write_request->written_fn( - socket, AWS_OP_SUCCESS, write_request->original_buffer_len, write_request->write_user_data); - aws_mem_release(allocator, write_request); - } - } - - if (purge) { - while (!aws_linked_list_empty(&socket_impl->write_queue)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); - struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); - - /* If this fn was invoked directly from aws_socket_write(), don't invoke the error callback - * as the user will be able to rely on the return value from aws_socket_write() */ - if (write_request == parent_request) { - parent_request_failed = true; - } else { - write_request->written_fn(socket, aws_error, 0, write_request->write_user_data); - } - - aws_mem_release(socket->allocator, write_request); - } - } - - socket_impl->write_in_progress = false; - - if (parent_request) { - socket_impl->currently_in_event = false; - } - - if (socket_impl->clean_yourself_up) { - aws_mem_release(allocator, socket_impl); - } - - /* Only report error if aws_socket_write() invoked this function and its write_request failed */ - if (!parent_request_failed) { - return AWS_OP_SUCCESS; - } - - aws_raise_error(aws_error); - return AWS_OP_ERR; -} - -static void s_on_socket_io_event( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - int events, - void *user_data) { - (void)event_loop; - (void)handle; - /* this is to handle a race condition when an error kicks off a cleanup, or the user decides - * to close the socket based on something they read (SSL validation failed for example). - * if clean_up happens when currently_in_event is true, socket_impl is kept dangling but currently - * subscribed is set to false. */ - struct aws_socket *socket = user_data; - struct posix_socket *socket_impl = socket->impl; - struct aws_allocator *allocator = socket->allocator; - - socket_impl->currently_in_event = true; - - if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) { - aws_raise_error(AWS_IO_SOCKET_CLOSED); - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd); - if (socket->readable_fn) { - socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data); - } - goto end_check; - } - - if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_ERROR) { - int aws_error = aws_socket_get_error(socket); - aws_raise_error(aws_error); - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: error event occurred", (void *)socket, socket->io_handle.data.fd); - if (socket->readable_fn) { - socket->readable_fn(socket, aws_error, socket->readable_user_data); - } - goto end_check; - } - - if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) { - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd); - if (socket->readable_fn) { - socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data); - } - } - /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not - * have been cleaned up, so this next branch is safe. */ - if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) { - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd); - s_process_write_requests(socket, NULL); - } - -end_check: - socket_impl->currently_in_event = false; - - if (socket_impl->clean_yourself_up) { - aws_mem_release(allocator, socket_impl); - } -} - -int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct aws_event_loop *event_loop) { - if (!socket->event_loop) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: assigning to event loop %p", - (void *)socket, - socket->io_handle.data.fd, - (void *)event_loop); - socket->event_loop = event_loop; - struct posix_socket *socket_impl = socket->impl; - socket_impl->currently_subscribed = true; - if (aws_event_loop_subscribe_to_io_events( - event_loop, - &socket->io_handle, - AWS_IO_EVENT_TYPE_WRITABLE | AWS_IO_EVENT_TYPE_READABLE, - s_on_socket_io_event, - socket)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: assigning to event loop %p failed with error %d", - (void *)socket, - socket->io_handle.data.fd, - (void *)event_loop, - aws_last_error()); - socket_impl->currently_subscribed = false; - socket->event_loop = NULL; - return AWS_OP_ERR; - } - - return AWS_OP_SUCCESS; - } - - return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); -} - -struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket) { - return socket->event_loop; -} - -int aws_socket_subscribe_to_readable_events( - struct aws_socket *socket, - aws_socket_on_readable_fn *on_readable, - void *user_data) { - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, " id=%p fd=%d: subscribing to readable events", (void *)socket, socket->io_handle.data.fd); - if (!(socket->state & CONNECTED_READ)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: can't subscribe to readable events since the socket is not connected", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); - } - - if (socket->readable_fn) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: can't subscribe to readable events since it is already subscribed", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); - } - - AWS_ASSERT(on_readable); - socket->readable_user_data = user_data; - socket->readable_fn = on_readable; - - return AWS_OP_SUCCESS; -} - -int aws_socket_read(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read) { - AWS_ASSERT(amount_read); - - if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: cannot read from a different thread than event loop %p", - (void *)socket, - socket->io_handle.data.fd, - (void *)socket->event_loop); - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (!(socket->state & CONNECTED_READ)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: cannot read because it is not connected", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); - } - - ssize_t read_val = read(socket->io_handle.data.fd, buffer->buffer + buffer->len, buffer->capacity - buffer->len); - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p fd=%d: read of %d", (void *)socket, socket->io_handle.data.fd, (int)read_val); - - if (read_val > 0) { - *amount_read = (size_t)read_val; - buffer->len += *amount_read; - return AWS_OP_SUCCESS; - } - - /* read_val of 0 means EOF which we'll treat as AWS_IO_SOCKET_CLOSED */ - if (read_val == 0) { - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, "id=%p fd=%d: zero read, socket is closed", (void *)socket, socket->io_handle.data.fd); - *amount_read = 0; - - if (buffer->capacity - buffer->len > 0) { - return aws_raise_error(AWS_IO_SOCKET_CLOSED); - } - - return AWS_OP_SUCCESS; - } - - int error = errno; -#if defined(EWOULDBLOCK) - if (error == EAGAIN || error == EWOULDBLOCK) { -#else - if (error == EAGAIN) { -#endif - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: read would block", (void *)socket, socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); - } - - if (error == EPIPE) { - AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket is closed.", (void *)socket, socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_CLOSED); - } - - if (error == ETIMEDOUT) { - AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket timed out.", (void *)socket, socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_TIMEOUT); - } - - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: read failed with error: %s", - (void *)socket, - socket->io_handle.data.fd, - strerror(error)); - return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); -} - -int aws_socket_write( - struct aws_socket *socket, - const struct aws_byte_cursor *cursor, - aws_socket_on_write_completed_fn *written_fn, - void *user_data) { - if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { - return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); - } - - if (!(socket->state & CONNECTED_WRITE)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: cannot write to because it is not connected", - (void *)socket, - socket->io_handle.data.fd); - return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); - } - - AWS_ASSERT(written_fn); - struct posix_socket *socket_impl = socket->impl; - struct write_request *write_request = aws_mem_calloc(socket->allocator, 1, sizeof(struct write_request)); - - if (!write_request) { - return AWS_OP_ERR; - } - - write_request->original_buffer_len = cursor->len; - write_request->written_fn = written_fn; - write_request->write_user_data = user_data; - write_request->cursor_cpy = *cursor; - aws_linked_list_push_back(&socket_impl->write_queue, &write_request->node); - - /* avoid reentrancy when a user calls write after receiving their completion callback. */ - if (!socket_impl->write_in_progress) { - return s_process_write_requests(socket, write_request); - } - - return AWS_OP_SUCCESS; -} - -int aws_socket_get_error(struct aws_socket *socket) { - int connect_result; - socklen_t result_length = sizeof(connect_result); - - if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { - return AWS_OP_ERR; - } - - if (connect_result) { - return s_determine_socket_error(connect_result); - } - - return AWS_OP_SUCCESS; -} - -bool aws_socket_is_open(struct aws_socket *socket) { - return socket->io_handle.data.fd >= 0; -} +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/io/socket.h> + +#include <aws/common/clock.h> +#include <aws/common/condition_variable.h> +#include <aws/common/mutex.h> +#include <aws/common/string.h> + +#include <aws/io/event_loop.h> +#include <aws/io/logging.h> + +#include <arpa/inet.h> +#include <aws/io/io.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/tcp.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#if defined(__MACH__) +# define NO_SIGNAL SO_NOSIGPIPE +# define TCP_KEEPIDLE TCP_KEEPALIVE +#else +# define NO_SIGNAL MSG_NOSIGNAL +#endif + +/* This isn't defined on ancient linux distros (breaking the builds). + * However, if this is a prebuild, we purposely build on an ancient system, but + * we want the kernel calls to still be the same as a modern build since that's likely the target of the application + * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag + * gets passed as long as it does. + */ +#ifndef O_CLOEXEC +# define O_CLOEXEC 02000000 +#endif + +#ifdef USE_VSOCK +# if defined(__linux__) && defined(AF_VSOCK) +# include <linux/vm_sockets.h> +# else +# error "USE_VSOCK not supported on current platform" +# endif +#endif + +/* other than CONNECTED_READ | CONNECTED_WRITE + * a socket is only in one of these states at a time. */ +enum socket_state { + INIT = 0x01, + CONNECTING = 0x02, + CONNECTED_READ = 0x04, + CONNECTED_WRITE = 0x08, + BOUND = 0x10, + LISTENING = 0x20, + TIMEDOUT = 0x40, + ERROR = 0x80, + CLOSED, +}; + +static int s_convert_domain(enum aws_socket_domain domain) { + switch (domain) { + case AWS_SOCKET_IPV4: + return AF_INET; + case AWS_SOCKET_IPV6: + return AF_INET6; + case AWS_SOCKET_LOCAL: + return AF_UNIX; +#ifdef USE_VSOCK + case AWS_SOCKET_VSOCK: + return AF_VSOCK; +#endif + default: + AWS_ASSERT(0); + return AF_INET; + } +} + +static int s_convert_type(enum aws_socket_type type) { + switch (type) { + case AWS_SOCKET_STREAM: + return SOCK_STREAM; + case AWS_SOCKET_DGRAM: + return SOCK_DGRAM; + default: + AWS_ASSERT(0); + return SOCK_STREAM; + } +} + +static int s_determine_socket_error(int error) { + switch (error) { + case ECONNREFUSED: + return AWS_IO_SOCKET_CONNECTION_REFUSED; + case ETIMEDOUT: + return AWS_IO_SOCKET_TIMEOUT; + case EHOSTUNREACH: + case ENETUNREACH: + return AWS_IO_SOCKET_NO_ROUTE_TO_HOST; + case EADDRNOTAVAIL: + return AWS_IO_SOCKET_INVALID_ADDRESS; + case ENETDOWN: + return AWS_IO_SOCKET_NETWORK_DOWN; + case ECONNABORTED: + return AWS_IO_SOCKET_CONNECT_ABORTED; + case EADDRINUSE: + return AWS_IO_SOCKET_ADDRESS_IN_USE; + case ENOBUFS: + case ENOMEM: + return AWS_ERROR_OOM; + case EAGAIN: + return AWS_IO_READ_WOULD_BLOCK; + case EMFILE: + case ENFILE: + return AWS_ERROR_MAX_FDS_EXCEEDED; + case ENOENT: + case EINVAL: + return AWS_ERROR_FILE_INVALID_PATH; + case EAFNOSUPPORT: + return AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY; + case EACCES: + return AWS_ERROR_NO_PERMISSION; + default: + return AWS_IO_SOCKET_NOT_CONNECTED; + } +} + +static int s_create_socket(struct aws_socket *sock, const struct aws_socket_options *options) { + + int fd = socket(s_convert_domain(options->domain), s_convert_type(options->type), 0); + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: initializing with domain %d and type %d", + (void *)sock, + fd, + options->domain, + options->type); + if (fd != -1) { + int flags = fcntl(fd, F_GETFL, 0); + flags |= O_NONBLOCK | O_CLOEXEC; + int success = fcntl(fd, F_SETFL, flags); + (void)success; + sock->io_handle.data.fd = fd; + sock->io_handle.additional_data = NULL; + return aws_socket_set_options(sock, options); + } + + int aws_error = s_determine_socket_error(errno); + return aws_raise_error(aws_error); +} + +struct posix_socket_connect_args { + struct aws_task task; + struct aws_allocator *allocator; + struct aws_socket *socket; +}; + +struct posix_socket { + struct aws_linked_list write_queue; + struct posix_socket_connect_args *connect_args; + bool write_in_progress; + bool currently_subscribed; + bool continue_accept; + bool currently_in_event; + bool clean_yourself_up; + bool *close_happened; +}; + +static int s_socket_init( + struct aws_socket *socket, + struct aws_allocator *alloc, + const struct aws_socket_options *options, + int existing_socket_fd) { + AWS_ASSERT(options); + AWS_ZERO_STRUCT(*socket); + + struct posix_socket *posix_socket = aws_mem_calloc(alloc, 1, sizeof(struct posix_socket)); + if (!posix_socket) { + socket->impl = NULL; + return AWS_OP_ERR; + } + + socket->allocator = alloc; + socket->io_handle.data.fd = -1; + socket->state = INIT; + socket->options = *options; + + if (existing_socket_fd < 0) { + int err = s_create_socket(socket, options); + if (err) { + aws_mem_release(alloc, posix_socket); + socket->impl = NULL; + return AWS_OP_ERR; + } + } else { + socket->io_handle = (struct aws_io_handle){ + .data = {.fd = existing_socket_fd}, + .additional_data = NULL, + }; + aws_socket_set_options(socket, options); + } + + aws_linked_list_init(&posix_socket->write_queue); + posix_socket->write_in_progress = false; + posix_socket->currently_subscribed = false; + posix_socket->continue_accept = false; + posix_socket->currently_in_event = false; + posix_socket->clean_yourself_up = false; + posix_socket->connect_args = NULL; + posix_socket->close_happened = NULL; + socket->impl = posix_socket; + return AWS_OP_SUCCESS; +} + +int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) { + AWS_ASSERT(options); + return s_socket_init(socket, alloc, options, -1); +} + +void aws_socket_clean_up(struct aws_socket *socket) { + if (!socket->impl) { + /* protect from double clean */ + return; + } + if (aws_socket_is_open(socket)) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: is still open, closing...", (void *)socket, socket->io_handle.data.fd); + aws_socket_close(socket); + } + struct posix_socket *socket_impl = socket->impl; + + if (!socket_impl->currently_in_event) { + aws_mem_release(socket->allocator, socket->impl); + } else { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: is still pending io letting it dangle and cleaning up later.", + (void *)socket, + socket->io_handle.data.fd); + socket_impl->clean_yourself_up = true; + } + + AWS_ZERO_STRUCT(*socket); + socket->io_handle.data.fd = -1; +} + +static void s_on_connection_error(struct aws_socket *socket, int error); + +static int s_on_connection_success(struct aws_socket *socket) { + + struct aws_event_loop *event_loop = socket->event_loop; + struct posix_socket *socket_impl = socket->impl; + + if (socket_impl->currently_subscribed) { + aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); + socket_impl->currently_subscribed = false; + } + + socket->event_loop = NULL; + + int connect_result; + socklen_t result_length = sizeof(connect_result); + + if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to determine connection error %d", + (void *)socket, + socket->io_handle.data.fd, + errno); + int aws_error = s_determine_socket_error(errno); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + return AWS_OP_ERR; + } + + if (connect_result) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connection error %d", + (void *)socket, + socket->io_handle.data.fd, + connect_result); + int aws_error = s_determine_socket_error(connect_result); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + return AWS_OP_ERR; + } + + AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd); + + struct sockaddr_storage address; + AWS_ZERO_STRUCT(address); + socklen_t address_size = sizeof(address); + if (!getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size)) { + uint16_t port = 0; + + if (address.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&address; + port = ntohs(s->sin_port); + /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal + * once we add logging, we can log this if it fails. */ + if (inet_ntop( + AF_INET, &s->sin_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: local endpoint %s:%d", + (void *)socket, + socket->io_handle.data.fd, + socket->local_endpoint.address, + port); + } else { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: determining local endpoint failed", + (void *)socket, + socket->io_handle.data.fd); + } + } else if (address.ss_family == AF_INET6) { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address; + port = ntohs(s->sin6_port); + /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal + * once we add logging, we can log this if it fails. */ + if (inet_ntop( + AF_INET6, &s->sin6_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd %d: local endpoint %s:%d", + (void *)socket, + socket->io_handle.data.fd, + socket->local_endpoint.address, + port); + } else { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: determining local endpoint failed", + (void *)socket, + socket->io_handle.data.fd); + } + } + + socket->local_endpoint.port = port; + } else { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: getsockname() failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + errno); + int aws_error = s_determine_socket_error(errno); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + return AWS_OP_ERR; + } + + socket->state = CONNECTED_WRITE | CONNECTED_READ; + + if (aws_socket_assign_to_event_loop(socket, event_loop)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: assignment to event loop %p failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop, + aws_last_error()); + s_on_connection_error(socket, aws_last_error()); + return AWS_OP_ERR; + } + + socket->connection_result_fn(socket, AWS_ERROR_SUCCESS, socket->connect_accept_user_data); + + return AWS_OP_SUCCESS; +} + +static void s_on_connection_error(struct aws_socket *socket, int error) { + socket->state = ERROR; + AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection failure", (void *)socket, socket->io_handle.data.fd); + if (socket->connection_result_fn) { + socket->connection_result_fn(socket, error, socket->connect_accept_user_data); + } else if (socket->accept_result_fn) { + socket->accept_result_fn(socket, error, NULL, socket->connect_accept_user_data); + } +} + +/* the next two callbacks compete based on which one runs first. if s_socket_connect_event + * comes back first, then we set socket_args->socket = NULL and continue on with the connection. + * if s_handle_socket_timeout() runs first, is sees socket_args->socket is NULL and just cleans up its memory. + * s_handle_socket_timeout() will always run so the memory for socket_connect_args is always cleaned up there. */ +static void s_socket_connect_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + (void)handle; + + struct posix_socket_connect_args *socket_args = (struct posix_socket_connect_args *)user_data; + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "fd=%d: connection activity handler triggered ", handle->data.fd); + + if (socket_args->socket) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: has not timed out yet proceeding with connection.", + (void *)socket_args->socket, + handle->data.fd); + + struct posix_socket *socket_impl = socket_args->socket->impl; + if (!(events & AWS_IO_EVENT_TYPE_ERROR || events & AWS_IO_EVENT_TYPE_CLOSED) && + (events & AWS_IO_EVENT_TYPE_READABLE || events & AWS_IO_EVENT_TYPE_WRITABLE)) { + struct aws_socket *socket = socket_args->socket; + socket_args->socket = NULL; + socket_impl->connect_args = NULL; + s_on_connection_success(socket); + return; + } + + int aws_error = aws_socket_get_error(socket_args->socket); + /* we'll get another notification. */ + if (aws_error == AWS_IO_READ_WOULD_BLOCK) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: spurious event, waiting for another notification.", + (void *)socket_args->socket, + handle->data.fd); + return; + } + + struct aws_socket *socket = socket_args->socket; + socket_args->socket = NULL; + socket_impl->connect_args = NULL; + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + } +} + +static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) { + (void)task; + (void)status; + + struct posix_socket_connect_args *socket_args = args; + + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task); + /* successful connection will have nulled out connect_args->socket */ + if (socket_args->socket) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: timed out, shutting down.", + (void *)socket_args->socket, + socket_args->socket->io_handle.data.fd); + + socket_args->socket->state = TIMEDOUT; + int error_code = AWS_IO_SOCKET_TIMEOUT; + + if (status == AWS_TASK_STATUS_RUN_READY) { + aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle); + } else { + error_code = AWS_IO_EVENT_LOOP_SHUTDOWN; + aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle); + } + socket_args->socket->event_loop = NULL; + struct posix_socket *socket_impl = socket_args->socket->impl; + socket_impl->currently_subscribed = false; + aws_raise_error(error_code); + struct aws_socket *socket = socket_args->socket; + /*socket close sets socket_args->socket to NULL and + * socket_impl->connect_args to NULL. */ + aws_socket_close(socket); + s_on_connection_error(socket, error_code); + } + + aws_mem_release(socket_args->allocator, socket_args); +} + +/* this is used simply for moving a connect_success callback when the connect finished immediately + * (like for unix domain sockets) into the event loop's thread. Also note, in that case there was no + * timeout task scheduled, so in this case the socket_args are cleaned up. */ +static void s_run_connect_success(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + struct posix_socket_connect_args *socket_args = arg; + + if (socket_args->socket) { + struct posix_socket *socket_impl = socket_args->socket->impl; + if (status == AWS_TASK_STATUS_RUN_READY) { + s_on_connection_success(socket_args->socket); + } else { + aws_raise_error(AWS_IO_SOCKET_CONNECT_ABORTED); + socket_args->socket->event_loop = NULL; + s_on_connection_error(socket_args->socket, AWS_IO_SOCKET_CONNECT_ABORTED); + } + socket_impl->connect_args = NULL; + } + + aws_mem_release(socket_args->allocator, socket_args); +} + +static inline int s_convert_pton_error(int pton_code) { + if (pton_code == 0) { + return AWS_IO_SOCKET_INVALID_ADDRESS; + } + + return s_determine_socket_error(errno); +} + +struct socket_address { + union sock_addr_types { + struct sockaddr_in addr_in; + struct sockaddr_in6 addr_in6; + struct sockaddr_un un_addr; +#ifdef USE_VSOCK + struct sockaddr_vm vm_addr; +#endif + } sock_addr_types; +}; + +#ifdef USE_VSOCK +/** Convert a string to a VSOCK CID. Respects the calling convetion of inet_pton: + * 0 on error, 1 on success. */ +static int parse_cid(const char *cid_str, unsigned int *value) { + if (cid_str == NULL || value == NULL) { + errno = EINVAL; + return 0; + } + /* strtoll returns 0 as both error and correct value */ + errno = 0; + /* unsigned long long to handle edge cases in convention explicitly */ + long long cid = strtoll(cid_str, NULL, 10); + if (errno != 0) { + return 0; + } + + /* -1U means any, so it's a valid value, but it needs to be converted to + * unsigned int. */ + if (cid == -1) { + *value = VMADDR_CID_ANY; + return 1; + } + + if (cid < 0 || cid > UINT_MAX) { + errno = ERANGE; + return 0; + } + + /* cast is safe here, edge cases already checked */ + *value = (unsigned int)cid; + return 1; +} +#endif + +int aws_socket_connect( + struct aws_socket *socket, + const struct aws_socket_endpoint *remote_endpoint, + struct aws_event_loop *event_loop, + aws_socket_on_connection_result_fn *on_connection_result, + void *user_data) { + AWS_ASSERT(event_loop); + AWS_ASSERT(!socket->event_loop); + + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd); + + if (socket->event_loop) { + return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); + } + + if (socket->options.type != AWS_SOCKET_DGRAM) { + AWS_ASSERT(on_connection_result); + if (socket->state != INIT) { + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + } else { /* UDP socket */ + /* UDP sockets jump to CONNECT_READ if bind is called first */ + if (socket->state != CONNECTED_READ && socket->state != INIT) { + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + } + + size_t address_strlen; + if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { + return AWS_OP_ERR; + } + + struct socket_address address; + AWS_ZERO_STRUCT(address); + socklen_t sock_size = 0; + int pton_err = 1; + if (socket->options.domain == AWS_SOCKET_IPV4) { + pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); + address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port); + address.sock_addr_types.addr_in.sin_family = AF_INET; + sock_size = sizeof(address.sock_addr_types.addr_in); + } else if (socket->options.domain == AWS_SOCKET_IPV6) { + pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); + address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port); + address.sock_addr_types.addr_in6.sin6_family = AF_INET6; + sock_size = sizeof(address.sock_addr_types.addr_in6); + } else if (socket->options.domain == AWS_SOCKET_LOCAL) { + address.sock_addr_types.un_addr.sun_family = AF_UNIX; + strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN); + sock_size = sizeof(address.sock_addr_types.un_addr); +#ifdef USE_VSOCK + } else if (socket->options.domain == AWS_SOCKET_VSOCK) { + pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); + address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; + address.sock_addr_types.vm_addr.svm_port = (unsigned int)remote_endpoint->port; + sock_size = sizeof(address.sock_addr_types.vm_addr); +#endif + } else { + AWS_ASSERT(0); + return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); + } + + if (pton_err != 1) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to parse address %s:%d.", + (void *)socket, + socket->io_handle.data.fd, + remote_endpoint->address, + (int)remote_endpoint->port); + return aws_raise_error(s_convert_pton_error(pton_err)); + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connecting to endpoint %s:%d.", + (void *)socket, + socket->io_handle.data.fd, + remote_endpoint->address, + (int)remote_endpoint->port); + + socket->state = CONNECTING; + socket->remote_endpoint = *remote_endpoint; + socket->connect_accept_user_data = user_data; + socket->connection_result_fn = on_connection_result; + + struct posix_socket *socket_impl = socket->impl; + + socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args)); + if (!socket_impl->connect_args) { + return AWS_OP_ERR; + } + + socket_impl->connect_args->socket = socket; + socket_impl->connect_args->allocator = socket->allocator; + + socket_impl->connect_args->task.fn = s_handle_socket_timeout; + socket_impl->connect_args->task.arg = socket_impl->connect_args; + + int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); + socket->event_loop = event_loop; + + if (!error_code) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connected immediately, not scheduling timeout.", + (void *)socket, + socket->io_handle.data.fd); + socket_impl->connect_args->task.fn = s_run_connect_success; + /* the subscription for IO will happen once we setup the connection in the task. Since we already + * know the connection succeeded, we don't need to register for events yet. */ + aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task); + } + + if (error_code) { + error_code = errno; + if (error_code == EINPROGRESS || error_code == EALREADY) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.", + (void *)socket, + socket->io_handle.data.fd); + /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately + * and null out the connect args */ + struct aws_task *timeout_task = &socket_impl->connect_args->task; + + socket_impl->currently_subscribed = true; + /* This event is for when the connection finishes. (the fd will flip writable). */ + if (aws_event_loop_subscribe_to_io_events( + event_loop, + &socket->io_handle, + AWS_IO_EVENT_TYPE_WRITABLE, + s_socket_connect_event, + socket_impl->connect_args)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to register with event-loop %p.", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop); + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + goto err_clean_up; + } + + /* schedule a task to run at the connect timeout interval, if this task runs before the connect + * happens, we consider that a timeout. */ + uint64_t timeout = 0; + aws_event_loop_current_clock_time(event_loop, &timeout); + timeout += aws_timestamp_convert( + socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: scheduling timeout task for %llu.", + (void *)socket, + socket->io_handle.data.fd, + (unsigned long long)timeout); + aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout); + } else { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connect failed with error code %d.", + (void *)socket, + socket->io_handle.data.fd, + error_code); + int aws_error = s_determine_socket_error(error_code); + aws_raise_error(aws_error); + socket->event_loop = NULL; + socket_impl->currently_subscribed = false; + goto err_clean_up; + } + } + return AWS_OP_SUCCESS; + +err_clean_up: + aws_mem_release(socket->allocator, socket_impl->connect_args); + socket_impl->connect_args = NULL; + return AWS_OP_ERR; +} + +int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint) { + if (socket->state != INIT) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: invalid state for bind operation.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + size_t address_strlen; + if (aws_secure_strlen(local_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) { + return AWS_OP_ERR; + } + + int error_code = -1; + + socket->local_endpoint = *local_endpoint; + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: binding to %s:%d.", + (void *)socket, + socket->io_handle.data.fd, + local_endpoint->address, + (int)local_endpoint->port); + + struct socket_address address; + AWS_ZERO_STRUCT(address); + socklen_t sock_size = 0; + int pton_err = 1; + if (socket->options.domain == AWS_SOCKET_IPV4) { + pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); + address.sock_addr_types.addr_in.sin_port = htons(local_endpoint->port); + address.sock_addr_types.addr_in.sin_family = AF_INET; + sock_size = sizeof(address.sock_addr_types.addr_in); + } else if (socket->options.domain == AWS_SOCKET_IPV6) { + pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); + address.sock_addr_types.addr_in6.sin6_port = htons(local_endpoint->port); + address.sock_addr_types.addr_in6.sin6_family = AF_INET6; + sock_size = sizeof(address.sock_addr_types.addr_in6); + } else if (socket->options.domain == AWS_SOCKET_LOCAL) { + address.sock_addr_types.un_addr.sun_family = AF_UNIX; + strncpy(address.sock_addr_types.un_addr.sun_path, local_endpoint->address, AWS_ADDRESS_MAX_LEN); + sock_size = sizeof(address.sock_addr_types.un_addr); +#ifdef USE_VSOCK + } else if (socket->options.domain == AWS_SOCKET_VSOCK) { + pton_err = parse_cid(local_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid); + address.sock_addr_types.vm_addr.svm_family = AF_VSOCK; + address.sock_addr_types.vm_addr.svm_port = (unsigned int)local_endpoint->port; + sock_size = sizeof(address.sock_addr_types.vm_addr); +#endif + } else { + AWS_ASSERT(0); + return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); + } + + if (pton_err != 1) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to parse address %s:%d.", + (void *)socket, + socket->io_handle.data.fd, + local_endpoint->address, + (int)local_endpoint->port); + return aws_raise_error(s_convert_pton_error(pton_err)); + } + + error_code = bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); + + if (!error_code) { + if (socket->options.type == AWS_SOCKET_STREAM) { + socket->state = BOUND; + } else { + /* e.g. UDP is now readable */ + socket->state = CONNECTED_READ; + } + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully bound", (void *)socket, socket->io_handle.data.fd); + + return AWS_OP_SUCCESS; + } + + socket->state = ERROR; + error_code = errno; + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: bind failed with error code %d", + (void *)socket, + socket->io_handle.data.fd, + error_code); + + int aws_error = s_determine_socket_error(error_code); + return aws_raise_error(aws_error); +} + +int aws_socket_listen(struct aws_socket *socket, int backlog_size) { + if (socket->state != BOUND) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: invalid state for listen operation. You must call bind first.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + int error_code = listen(socket->io_handle.data.fd, backlog_size); + + if (!error_code) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully listening", (void *)socket, socket->io_handle.data.fd); + socket->state = LISTENING; + return AWS_OP_SUCCESS; + } + + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: listen failed with error code %d", + (void *)socket, + socket->io_handle.data.fd, + error_code); + error_code = errno; + socket->state = ERROR; + + return aws_raise_error(s_determine_socket_error(error_code)); +} + +/* this is called by the event loop handler that was installed in start_accept(). It runs once the FD goes readable, + * accepts as many as it can and then returns control to the event loop. */ +static void s_socket_accept_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + + (void)event_loop; + + struct aws_socket *socket = user_data; + struct posix_socket *socket_impl = socket->impl; + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd); + + if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) { + int in_fd = 0; + while (socket_impl->continue_accept && in_fd != -1) { + struct sockaddr_storage in_addr; + socklen_t in_len = sizeof(struct sockaddr_storage); + + in_fd = accept(handle->data.fd, (struct sockaddr *)&in_addr, &in_len); + if (in_fd == -1) { + int error = errno; + + if (error == EAGAIN || error == EWOULDBLOCK) { + break; + } + + int aws_error = aws_socket_get_error(socket); + aws_raise_error(aws_error); + s_on_connection_error(socket, aws_error); + break; + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: incoming connection", (void *)socket, socket->io_handle.data.fd); + + struct aws_socket *new_sock = aws_mem_acquire(socket->allocator, sizeof(struct aws_socket)); + + if (!new_sock) { + close(in_fd); + s_on_connection_error(socket, aws_last_error()); + continue; + } + + if (s_socket_init(new_sock, socket->allocator, &socket->options, in_fd)) { + aws_mem_release(socket->allocator, new_sock); + s_on_connection_error(socket, aws_last_error()); + continue; + } + + new_sock->local_endpoint = socket->local_endpoint; + new_sock->state = CONNECTED_READ | CONNECTED_WRITE; + uint16_t port = 0; + + /* get the info on the incoming socket's address */ + if (in_addr.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&in_addr; + port = ntohs(s->sin_port); + /* this came from the kernel, a.) it won't fail. b.) even if it does + * its not fatal. come back and add logging later. */ + if (!inet_ntop( + AF_INET, + &s->sin_addr, + new_sock->remote_endpoint.address, + sizeof(new_sock->remote_endpoint.address))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d:. Failed to determine remote address.", + (void *)socket, + socket->io_handle.data.fd) + } + new_sock->options.domain = AWS_SOCKET_IPV4; + } else if (in_addr.ss_family == AF_INET6) { + /* this came from the kernel, a.) it won't fail. b.) even if it does + * its not fatal. come back and add logging later. */ + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&in_addr; + port = ntohs(s->sin6_port); + if (!inet_ntop( + AF_INET6, + &s->sin6_addr, + new_sock->remote_endpoint.address, + sizeof(new_sock->remote_endpoint.address))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d:. Failed to determine remote address.", + (void *)socket, + socket->io_handle.data.fd) + } + new_sock->options.domain = AWS_SOCKET_IPV6; + } else if (in_addr.ss_family == AF_UNIX) { + new_sock->remote_endpoint = socket->local_endpoint; + new_sock->options.domain = AWS_SOCKET_LOCAL; + } + + new_sock->remote_endpoint.port = port; + + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: connected to %s:%d, incoming fd %d", + (void *)socket, + socket->io_handle.data.fd, + new_sock->remote_endpoint.address, + new_sock->remote_endpoint.port, + in_fd); + + int flags = fcntl(in_fd, F_GETFL, 0); + + flags |= O_NONBLOCK | O_CLOEXEC; + fcntl(in_fd, F_SETFL, flags); + + bool close_occurred = false; + socket_impl->close_happened = &close_occurred; + socket->accept_result_fn(socket, AWS_ERROR_SUCCESS, new_sock, socket->connect_accept_user_data); + + if (close_occurred) { + return; + } + + socket_impl->close_happened = NULL; + } + } + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: finished processing incoming connections, " + "waiting on event-loop notification", + (void *)socket, + socket->io_handle.data.fd); +} + +int aws_socket_start_accept( + struct aws_socket *socket, + struct aws_event_loop *accept_loop, + aws_socket_on_accept_result_fn *on_accept_result, + void *user_data) { + AWS_ASSERT(on_accept_result); + AWS_ASSERT(accept_loop); + + if (socket->event_loop) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: is already assigned to event-loop %p.", + (void *)socket, + socket->io_handle.data.fd, + (void *)socket->event_loop); + return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); + } + + if (socket->state != LISTENING) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: invalid state for start_accept operation. You must call listen first.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + socket->accept_result_fn = on_accept_result; + socket->connect_accept_user_data = user_data; + socket->event_loop = accept_loop; + struct posix_socket *socket_impl = socket->impl; + socket_impl->continue_accept = true; + socket_impl->currently_subscribed = true; + + if (aws_event_loop_subscribe_to_io_events( + socket->event_loop, &socket->io_handle, AWS_IO_EVENT_TYPE_READABLE, s_socket_accept_event, socket)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: failed to subscribe to event-loop %p.", + (void *)socket, + socket->io_handle.data.fd, + (void *)socket->event_loop); + socket_impl->continue_accept = false; + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; +} + +struct stop_accept_args { + struct aws_task task; + struct aws_mutex mutex; + struct aws_condition_variable condition_variable; + struct aws_socket *socket; + int ret_code; + bool invoked; +}; + +static bool s_stop_accept_pred(void *arg) { + struct stop_accept_args *stop_accept_args = arg; + return stop_accept_args->invoked; +} + +static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + + struct stop_accept_args *stop_accept_args = arg; + aws_mutex_lock(&stop_accept_args->mutex); + stop_accept_args->ret_code = AWS_OP_SUCCESS; + if (aws_socket_stop_accept(stop_accept_args->socket)) { + stop_accept_args->ret_code = aws_last_error(); + } + stop_accept_args->invoked = true; + aws_condition_variable_notify_one(&stop_accept_args->condition_variable); + aws_mutex_unlock(&stop_accept_args->mutex); +} + +int aws_socket_stop_accept(struct aws_socket *socket) { + if (socket->state != LISTENING) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: is not in a listening state, can't stop_accept.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: stopping accepting new connections", (void *)socket, socket->io_handle.data.fd); + + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + struct stop_accept_args args = {.mutex = AWS_MUTEX_INIT, + .condition_variable = AWS_CONDITION_VARIABLE_INIT, + .invoked = false, + .socket = socket, + .ret_code = AWS_OP_SUCCESS, + .task = {.fn = s_stop_accept_task}}; + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: stopping accepting new connections from a different thread than " + "the socket is running from. Blocking until it shuts down.", + (void *)socket, + socket->io_handle.data.fd); + /* Look.... I know what I'm doing.... trust me, I'm an engineer. + * We wait on the completion before 'args' goes out of scope. + * NOLINTNEXTLINE */ + args.task.arg = &args; + aws_mutex_lock(&args.mutex); + aws_event_loop_schedule_task_now(socket->event_loop, &args.task); + aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_stop_accept_pred, &args); + aws_mutex_unlock(&args.mutex); + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: stop accept task finished running.", + (void *)socket, + socket->io_handle.data.fd); + + if (args.ret_code) { + return aws_raise_error(args.ret_code); + } + return AWS_OP_SUCCESS; + } + + int ret_val = AWS_OP_SUCCESS; + struct posix_socket *socket_impl = socket->impl; + if (socket_impl->currently_subscribed) { + ret_val = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); + socket_impl->currently_subscribed = false; + socket_impl->continue_accept = false; + socket->event_loop = NULL; + } + + return ret_val; +} + +int aws_socket_set_options(struct aws_socket *socket, const struct aws_socket_options *options) { + if (socket->options.domain != options->domain || socket->options.type != options->type) { + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); + } + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setting socket options to: keep-alive %d, keep idle %d, keep-alive interval %d, keep-alive probe " + "count %d.", + (void *)socket, + socket->io_handle.data.fd, + (int)options->keepalive, + (int)options->keep_alive_timeout_sec, + (int)options->keep_alive_interval_sec, + (int)options->keep_alive_max_failed_probes); + + socket->options = *options; + + int option_value = 1; + if (AWS_UNLIKELY( + setsockopt(socket->io_handle.data.fd, SOL_SOCKET, NO_SIGNAL, &option_value, sizeof(option_value)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for NO_SIGNAL failed with errno %d. If you are having SIGPIPE signals thrown, " + "you may" + " want to install a signal trap in your application layer.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + + int reuse = 1; + if (AWS_UNLIKELY(setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for SO_REUSEADDR failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + + if (options->type == AWS_SOCKET_STREAM && options->domain != AWS_SOCKET_LOCAL) { + if (socket->options.keepalive) { + int keep_alive = 1; + if (AWS_UNLIKELY( + setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(int)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling SO_KEEPALIVE failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + } + + if (socket->options.keep_alive_interval_sec && socket->options.keep_alive_timeout_sec) { + int ival_in_secs = socket->options.keep_alive_interval_sec; + if (AWS_UNLIKELY(setsockopt( + socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPIDLE, &ival_in_secs, sizeof(ival_in_secs)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling TCP_KEEPIDLE for TCP failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + + ival_in_secs = socket->options.keep_alive_timeout_sec; + if (AWS_UNLIKELY(setsockopt( + socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPINTVL, &ival_in_secs, sizeof(ival_in_secs)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling TCP_KEEPINTVL for TCP failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + } + + if (socket->options.keep_alive_max_failed_probes) { + int max_probes = socket->options.keep_alive_max_failed_probes; + if (AWS_UNLIKELY( + setsockopt(socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPCNT, &max_probes, sizeof(max_probes)))) { + AWS_LOGF_WARN( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: setsockopt() for enabling TCP_KEEPCNT for TCP failed with errno %d.", + (void *)socket, + socket->io_handle.data.fd, + errno); + } + } + } + + return AWS_OP_SUCCESS; +} + +struct write_request { + struct aws_byte_cursor cursor_cpy; + aws_socket_on_write_completed_fn *written_fn; + void *write_user_data; + struct aws_linked_list_node node; + size_t original_buffer_len; +}; + +struct posix_socket_close_args { + struct aws_mutex mutex; + struct aws_condition_variable condition_variable; + struct aws_socket *socket; + bool invoked; + int ret_code; +}; + +static bool s_close_predicate(void *arg) { + struct posix_socket_close_args *close_args = arg; + return close_args->invoked; +} + +static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + + struct posix_socket_close_args *close_args = arg; + aws_mutex_lock(&close_args->mutex); + close_args->ret_code = AWS_OP_SUCCESS; + + if (aws_socket_close(close_args->socket)) { + close_args->ret_code = aws_last_error(); + } + + close_args->invoked = true; + aws_condition_variable_notify_one(&close_args->condition_variable); + aws_mutex_unlock(&close_args->mutex); +} + +int aws_socket_close(struct aws_socket *socket) { + struct posix_socket *socket_impl = socket->impl; + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: closing", (void *)socket, socket->io_handle.data.fd); + if (socket->event_loop) { + /* don't freak out on me, this almost never happens, and never occurs inside a channel + * it only gets hit from a listening socket shutting down or from a unit test. */ + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: closing from a different thread than " + "the socket is running from. Blocking until it closes down.", + (void *)socket, + socket->io_handle.data.fd); + /* the only time we allow this kind of thing is when you're a listener.*/ + if (socket->state != LISTENING) { + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + + struct posix_socket_close_args args = { + .mutex = AWS_MUTEX_INIT, + .condition_variable = AWS_CONDITION_VARIABLE_INIT, + .socket = socket, + .ret_code = AWS_OP_SUCCESS, + .invoked = false, + }; + + struct aws_task close_task = { + .fn = s_close_task, + .arg = &args, + }; + + aws_mutex_lock(&args.mutex); + aws_event_loop_schedule_task_now(socket->event_loop, &close_task); + aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_close_predicate, &args); + aws_mutex_unlock(&args.mutex); + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: close task completed.", (void *)socket, socket->io_handle.data.fd); + if (args.ret_code) { + return aws_raise_error(args.ret_code); + } + + return AWS_OP_SUCCESS; + } + + if (socket_impl->currently_subscribed) { + if (socket->state & LISTENING) { + aws_socket_stop_accept(socket); + } else { + int err_code = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle); + + if (err_code) { + return AWS_OP_ERR; + } + } + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + } + } + + if (socket_impl->close_happened) { + *socket_impl->close_happened = true; + } + + if (socket_impl->connect_args) { + socket_impl->connect_args->socket = NULL; + socket_impl->connect_args = NULL; + } + + if (aws_socket_is_open(socket)) { + close(socket->io_handle.data.fd); + socket->io_handle.data.fd = -1; + socket->state = CLOSED; + + /* after close, just go ahead and clear out the pending writes queue + * and tell the user they were cancelled. */ + while (!aws_linked_list_empty(&socket_impl->write_queue)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); + struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); + + write_request->written_fn( + socket, AWS_IO_SOCKET_CLOSED, write_request->original_buffer_len, write_request->write_user_data); + aws_mem_release(socket->allocator, write_request); + } + } + + return AWS_OP_SUCCESS; +} + +int aws_socket_shutdown_dir(struct aws_socket *socket, enum aws_channel_direction dir) { + int how = dir == AWS_CHANNEL_DIR_READ ? 0 : 1; + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, "id=%p fd=%d: shutting down in direction %d", (void *)socket, socket->io_handle.data.fd, dir); + if (shutdown(socket->io_handle.data.fd, how)) { + int aws_error = s_determine_socket_error(errno); + return aws_raise_error(aws_error); + } + + if (dir == AWS_CHANNEL_DIR_READ) { + socket->state &= ~CONNECTED_READ; + } else { + socket->state &= ~CONNECTED_WRITE; + } + + return AWS_OP_SUCCESS; +} + +/* this gets called in two scenarios. + * 1st scenario, someone called aws_socket_write() and we want to try writing now, so an error can be returned + * immediately if something bad has happened to the socket. In this case, `parent_request` is set. + * 2nd scenario, the event loop notified us that the socket went writable. In this case `parent_request` is NULL */ +static int s_process_write_requests(struct aws_socket *socket, struct write_request *parent_request) { + struct posix_socket *socket_impl = socket->impl; + struct aws_allocator *allocator = socket->allocator; + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: processing write requests.", (void *)socket, socket->io_handle.data.fd); + + /* there's a potential deadlock where we notify the user that we wrote some data, the user + * says, "cool, now I can write more and then immediately calls aws_socket_write(). We need to make sure + * that we don't allow reentrancy in that case. */ + socket_impl->write_in_progress = true; + + if (parent_request) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: processing write requests, called from aws_socket_write", + (void *)socket, + socket->io_handle.data.fd); + socket_impl->currently_in_event = true; + } else { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: processing write requests, invoked by the event-loop", + (void *)socket, + socket->io_handle.data.fd); + } + + bool purge = false; + int aws_error = AWS_OP_SUCCESS; + bool parent_request_failed = false; + + /* if a close call happens in the middle, this queue will have been cleaned out from under us. */ + while (!aws_linked_list_empty(&socket_impl->write_queue)) { + struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue); + struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: dequeued write request of size %llu, remaining to write %llu", + (void *)socket, + socket->io_handle.data.fd, + (unsigned long long)write_request->original_buffer_len, + (unsigned long long)write_request->cursor_cpy.len); + + ssize_t written = + send(socket->io_handle.data.fd, write_request->cursor_cpy.ptr, write_request->cursor_cpy.len, NO_SIGNAL); + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: send written size %d", + (void *)socket, + socket->io_handle.data.fd, + (int)written); + + if (written < 0) { + int error = errno; + if (error == EAGAIN) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd); + break; + } + + if (error == EPIPE) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: already closed before write", + (void *)socket, + socket->io_handle.data.fd); + aws_error = AWS_IO_SOCKET_CLOSED; + aws_raise_error(aws_error); + purge = true; + break; + } + + purge = true; + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: write error with error code %d", + (void *)socket, + socket->io_handle.data.fd, + error); + aws_error = s_determine_socket_error(error); + aws_raise_error(aws_error); + break; + } + + size_t remaining_to_write = write_request->cursor_cpy.len; + + aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: remaining write request to write %llu", + (void *)socket, + socket->io_handle.data.fd, + (unsigned long long)write_request->cursor_cpy.len); + + if ((size_t)written == remaining_to_write) { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: write request completed", (void *)socket, socket->io_handle.data.fd); + + aws_linked_list_remove(node); + write_request->written_fn( + socket, AWS_OP_SUCCESS, write_request->original_buffer_len, write_request->write_user_data); + aws_mem_release(allocator, write_request); + } + } + + if (purge) { + while (!aws_linked_list_empty(&socket_impl->write_queue)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue); + struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node); + + /* If this fn was invoked directly from aws_socket_write(), don't invoke the error callback + * as the user will be able to rely on the return value from aws_socket_write() */ + if (write_request == parent_request) { + parent_request_failed = true; + } else { + write_request->written_fn(socket, aws_error, 0, write_request->write_user_data); + } + + aws_mem_release(socket->allocator, write_request); + } + } + + socket_impl->write_in_progress = false; + + if (parent_request) { + socket_impl->currently_in_event = false; + } + + if (socket_impl->clean_yourself_up) { + aws_mem_release(allocator, socket_impl); + } + + /* Only report error if aws_socket_write() invoked this function and its write_request failed */ + if (!parent_request_failed) { + return AWS_OP_SUCCESS; + } + + aws_raise_error(aws_error); + return AWS_OP_ERR; +} + +static void s_on_socket_io_event( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + void *user_data) { + (void)event_loop; + (void)handle; + /* this is to handle a race condition when an error kicks off a cleanup, or the user decides + * to close the socket based on something they read (SSL validation failed for example). + * if clean_up happens when currently_in_event is true, socket_impl is kept dangling but currently + * subscribed is set to false. */ + struct aws_socket *socket = user_data; + struct posix_socket *socket_impl = socket->impl; + struct aws_allocator *allocator = socket->allocator; + + socket_impl->currently_in_event = true; + + if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) { + aws_raise_error(AWS_IO_SOCKET_CLOSED); + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data); + } + goto end_check; + } + + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_ERROR) { + int aws_error = aws_socket_get_error(socket); + aws_raise_error(aws_error); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: error event occurred", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, aws_error, socket->readable_user_data); + } + goto end_check; + } + + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) { + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data); + } + } + /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not + * have been cleaned up, so this next branch is safe. */ + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) { + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd); + s_process_write_requests(socket, NULL); + } + +end_check: + socket_impl->currently_in_event = false; + + if (socket_impl->clean_yourself_up) { + aws_mem_release(allocator, socket_impl); + } +} + +int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct aws_event_loop *event_loop) { + if (!socket->event_loop) { + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: assigning to event loop %p", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop); + socket->event_loop = event_loop; + struct posix_socket *socket_impl = socket->impl; + socket_impl->currently_subscribed = true; + if (aws_event_loop_subscribe_to_io_events( + event_loop, + &socket->io_handle, + AWS_IO_EVENT_TYPE_WRITABLE | AWS_IO_EVENT_TYPE_READABLE, + s_on_socket_io_event, + socket)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: assigning to event loop %p failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + (void *)event_loop, + aws_last_error()); + socket_impl->currently_subscribed = false; + socket->event_loop = NULL; + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; + } + + return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED); +} + +struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket) { + return socket->event_loop; +} + +int aws_socket_subscribe_to_readable_events( + struct aws_socket *socket, + aws_socket_on_readable_fn *on_readable, + void *user_data) { + + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, " id=%p fd=%d: subscribing to readable events", (void *)socket, socket->io_handle.data.fd); + if (!(socket->state & CONNECTED_READ)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: can't subscribe to readable events since the socket is not connected", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + } + + if (socket->readable_fn) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: can't subscribe to readable events since it is already subscribed", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED); + } + + AWS_ASSERT(on_readable); + socket->readable_user_data = user_data; + socket->readable_fn = on_readable; + + return AWS_OP_SUCCESS; +} + +int aws_socket_read(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read) { + AWS_ASSERT(amount_read); + + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: cannot read from a different thread than event loop %p", + (void *)socket, + socket->io_handle.data.fd, + (void *)socket->event_loop); + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (!(socket->state & CONNECTED_READ)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: cannot read because it is not connected", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + } + + ssize_t read_val = read(socket->io_handle.data.fd, buffer->buffer + buffer->len, buffer->capacity - buffer->len); + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, "id=%p fd=%d: read of %d", (void *)socket, socket->io_handle.data.fd, (int)read_val); + + if (read_val > 0) { + *amount_read = (size_t)read_val; + buffer->len += *amount_read; + return AWS_OP_SUCCESS; + } + + /* read_val of 0 means EOF which we'll treat as AWS_IO_SOCKET_CLOSED */ + if (read_val == 0) { + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, "id=%p fd=%d: zero read, socket is closed", (void *)socket, socket->io_handle.data.fd); + *amount_read = 0; + + if (buffer->capacity - buffer->len > 0) { + return aws_raise_error(AWS_IO_SOCKET_CLOSED); + } + + return AWS_OP_SUCCESS; + } + + int error = errno; +#if defined(EWOULDBLOCK) + if (error == EAGAIN || error == EWOULDBLOCK) { +#else + if (error == EAGAIN) { +#endif + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: read would block", (void *)socket, socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); + } + + if (error == EPIPE) { + AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket is closed.", (void *)socket, socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_CLOSED); + } + + if (error == ETIMEDOUT) { + AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket timed out.", (void *)socket, socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_TIMEOUT); + } + + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: read failed with error: %s", + (void *)socket, + socket->io_handle.data.fd, + strerror(error)); + return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); +} + +int aws_socket_write( + struct aws_socket *socket, + const struct aws_byte_cursor *cursor, + aws_socket_on_write_completed_fn *written_fn, + void *user_data) { + if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) { + return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); + } + + if (!(socket->state & CONNECTED_WRITE)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: cannot write to because it is not connected", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); + } + + AWS_ASSERT(written_fn); + struct posix_socket *socket_impl = socket->impl; + struct write_request *write_request = aws_mem_calloc(socket->allocator, 1, sizeof(struct write_request)); + + if (!write_request) { + return AWS_OP_ERR; + } + + write_request->original_buffer_len = cursor->len; + write_request->written_fn = written_fn; + write_request->write_user_data = user_data; + write_request->cursor_cpy = *cursor; + aws_linked_list_push_back(&socket_impl->write_queue, &write_request->node); + + /* avoid reentrancy when a user calls write after receiving their completion callback. */ + if (!socket_impl->write_in_progress) { + return s_process_write_requests(socket, write_request); + } + + return AWS_OP_SUCCESS; +} + +int aws_socket_get_error(struct aws_socket *socket) { + int connect_result; + socklen_t result_length = sizeof(connect_result); + + if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) { + return AWS_OP_ERR; + } + + if (connect_result) { + return s_determine_socket_error(connect_result); + } + + return AWS_OP_SUCCESS; +} + +bool aws_socket_is_open(struct aws_socket *socket) { + return socket->io_handle.data.fd >= 0; +} |