diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-06-03 13:58:54 +0300 |
---|---|---|
committer | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-06-03 13:58:54 +0300 |
commit | 7736b27526bd7bd3fb159a4bccaae54361a0eee9 (patch) | |
tree | acd09e5fdbe9c4db95583c783d51a4bbaeee29d1 | |
parent | ba0318dede5aa2acb00e573e58487fa8c7494c4f (diff) | |
download | ydb-7736b27526bd7bd3fb159a4bccaae54361a0eee9.tar.gz |
intermediate changes
ref:c1c5fe3098d793da0dfa5cd1b0c8a2008301dbf7
4 files changed, 218 insertions, 111 deletions
diff --git a/contrib/restricted/aws/aws-c-io/include/aws/io/channel.h b/contrib/restricted/aws/aws-c-io/include/aws/io/channel.h index 59b7f0d686..50dc5cce26 100644 --- a/contrib/restricted/aws/aws-c-io/include/aws/io/channel.h +++ b/contrib/restricted/aws/aws-c-io/include/aws/io/channel.h @@ -279,12 +279,33 @@ struct aws_io_message *aws_channel_acquire_message_from_pool( * This is the ideal way to move a task into the correct thread. It's also handy for context switches. * This function is safe to call from any thread. * + * If called from the channel's event loop, the task will get directly added to the run-now list. + * If called from outside the channel's event loop, the task will go into a cross-thread task queue. + * + * If tasks must be serialized relative to some source synchronization, you may not want to use this API + * because tasks submitted from the event loop thread can "jump ahead" of tasks submitted from external threads + * due to this optimization. If this is a problem, you can either refactor your submission logic or use + * the aws_channel_schedule_task_now_serialized variant which does not perform this optimization. + * * The task should not be cleaned up or modified until its function is executed. */ AWS_IO_API void aws_channel_schedule_task_now(struct aws_channel *channel, struct aws_channel_task *task); /** + * Schedules a task to run on the event loop as soon as possible. + * + * This variant always uses the cross thread queue rather than conditionally skipping it when already in + * the destination event loop. While not "optimal", this allows us to serialize task execution no matter where + * the task was submitted from: if you are submitting tasks from a critical section, the serialized order that you + * submit is guaranteed to be the order that they execute on the event loop. + * + * The task should not be cleaned up or modified until its function is executed. + */ +AWS_IO_API +void aws_channel_schedule_task_now_serialized(struct aws_channel *channel, struct aws_channel_task *task); + +/** * Schedules a task to run on the event loop at the specified time. * This is the ideal way to move a task into the correct thread. It's also handy for context switches. * Use aws_channel_current_clock_time() to get the current time in nanoseconds. diff --git a/contrib/restricted/aws/aws-c-io/include/aws/io/socket.h b/contrib/restricted/aws/aws-c-io/include/aws/io/socket.h index e433dc6230..b4f3200803 100644 --- a/contrib/restricted/aws/aws-c-io/include/aws/io/socket.h +++ b/contrib/restricted/aws/aws-c-io/include/aws/io/socket.h @@ -177,6 +177,12 @@ AWS_IO_API int aws_socket_connect( AWS_IO_API int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint); /** + * Get the local address which the socket is bound to. + * Raises an error if no address is bound. + */ +AWS_IO_API int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address); + +/** * TCP, LOCAL and VSOCK only. Sets up the socket to listen on the address bound to in `aws_socket_bind()`. */ AWS_IO_API int aws_socket_listen(struct aws_socket *socket, int backlog_size); diff --git a/contrib/restricted/aws/aws-c-io/source/channel.c b/contrib/restricted/aws/aws-c-io/source/channel.c index 5eb5c1e7a2..1cf6886fa1 100644 --- a/contrib/restricted/aws/aws-c-io/source/channel.c +++ b/contrib/restricted/aws/aws-c-io/source/channel.c @@ -540,46 +540,39 @@ void aws_channel_task_init( channel_task->type_tag = type_tag; } -/* Common functionality for scheduling "now" and "future" tasks. - * For "now" tasks, pass 0 for `run_at_nanos` */ -static void s_register_pending_task( +static void s_register_pending_task_in_event_loop( struct aws_channel *channel, struct aws_channel_task *channel_task, uint64_t run_at_nanos) { - /* Reset every property on channel task other than user's fn & arg.*/ - aws_task_init(&channel_task->wrapper_task, s_channel_task_run, channel, channel_task->type_tag); - channel_task->wrapper_task.timestamp = run_at_nanos; - aws_linked_list_node_reset(&channel_task->node); + AWS_LOGF_TRACE( + AWS_LS_IO_CHANNEL, + "id=%p: scheduling task with wrapper task id %p.", + (void *)channel, + (void *)&channel_task->wrapper_task); - if (aws_channel_thread_is_callers_thread(channel)) { - AWS_LOGF_TRACE( + /* If channel is shut down, run task immediately as canceled */ + if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) { + AWS_LOGF_DEBUG( AWS_LS_IO_CHANNEL, - "id=%p: scheduling task with wrapper task id %p.", + "id=%p: Running %s channel task immediately as canceled due to shut down channel", (void *)channel, - (void *)&channel_task->wrapper_task); - - /* If channel is shut down, run task immediately as canceled */ - if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) { - AWS_LOGF_DEBUG( - AWS_LS_IO_CHANNEL, - "id=%p: Running %s channel task immediately as canceled due to shut down channel", - (void *)channel, - channel_task->type_tag); - channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED); - return; - } - - aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node); - if (run_at_nanos == 0) { - aws_event_loop_schedule_task_now(channel->loop, &channel_task->wrapper_task); - } else { - aws_event_loop_schedule_task_future( - channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp); - } + channel_task->type_tag); + channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED); return; } + aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node); + if (run_at_nanos == 0) { + aws_event_loop_schedule_task_now(channel->loop, &channel_task->wrapper_task); + } else { + aws_event_loop_schedule_task_future( + channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp); + } +} + +static void s_register_pending_task_cross_thread(struct aws_channel *channel, struct aws_channel_task *channel_task) { + AWS_LOGF_TRACE( AWS_LS_IO_CHANNEL, "id=%p: scheduling task with wrapper task id %p from " @@ -609,10 +602,43 @@ static void s_register_pending_task( } } +static void s_reset_pending_channel_task( + struct aws_channel *channel, + struct aws_channel_task *channel_task, + uint64_t run_at_nanos) { + + /* Reset every property on channel task other than user's fn & arg.*/ + aws_task_init(&channel_task->wrapper_task, s_channel_task_run, channel, channel_task->type_tag); + channel_task->wrapper_task.timestamp = run_at_nanos; + aws_linked_list_node_reset(&channel_task->node); +} + +/* Common functionality for scheduling "now" and "future" tasks. + * For "now" tasks, pass 0 for `run_at_nanos` */ +static void s_register_pending_task( + struct aws_channel *channel, + struct aws_channel_task *channel_task, + uint64_t run_at_nanos) { + + s_reset_pending_channel_task(channel, channel_task, run_at_nanos); + + if (aws_channel_thread_is_callers_thread(channel)) { + s_register_pending_task_in_event_loop(channel, channel_task, run_at_nanos); + } else { + s_register_pending_task_cross_thread(channel, channel_task); + } +} + void aws_channel_schedule_task_now(struct aws_channel *channel, struct aws_channel_task *task) { s_register_pending_task(channel, task, 0); } +void aws_channel_schedule_task_now_serialized(struct aws_channel *channel, struct aws_channel_task *task) { + + s_reset_pending_channel_task(channel, task, 0); + s_register_pending_task_cross_thread(channel, task); +} + void aws_channel_schedule_task_future( struct aws_channel *channel, struct aws_channel_task *task, diff --git a/contrib/restricted/aws/aws-c-io/source/posix/socket.c b/contrib/restricted/aws/aws-c-io/source/posix/socket.c index 788ddaa986..95b03cee41 100644 --- a/contrib/restricted/aws/aws-c-io/source/posix/socket.c +++ b/contrib/restricted/aws/aws-c-io/source/posix/socket.c @@ -263,6 +263,98 @@ void aws_socket_clean_up(struct aws_socket *socket) { socket->io_handle.data.fd = -1; } +/* Update socket->local_endpoint based on the results of getsockname() */ +static int s_update_local_endpoint(struct aws_socket *socket) { + struct aws_socket_endpoint tmp_endpoint; + AWS_ZERO_STRUCT(tmp_endpoint); + + struct sockaddr_storage address; + AWS_ZERO_STRUCT(address); + socklen_t address_size = sizeof(address); + + if (getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size) != 0) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: getsockname() failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + errno); + int aws_error = s_determine_socket_error(errno); + return aws_raise_error(aws_error); + } + + if (address.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&address; + tmp_endpoint.port = ntohs(s->sin_port); + if (inet_ntop(AF_INET, &s->sin_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: inet_ntop() failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + errno); + int aws_error = s_determine_socket_error(errno); + return aws_raise_error(aws_error); + } + } else if (address.ss_family == AF_INET6) { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address; + tmp_endpoint.port = ntohs(s->sin6_port); + if (inet_ntop(AF_INET6, &s->sin6_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: inet_ntop() failed with error %d", + (void *)socket, + socket->io_handle.data.fd, + errno); + int aws_error = s_determine_socket_error(errno); + return aws_raise_error(aws_error); + } + } else if (address.ss_family == AF_UNIX) { + struct sockaddr_un *s = (struct sockaddr_un *)&address; + + /* Ensure there's a null-terminator. + * On some platforms it may be missing when the path gets very long. See: + * https://man7.org/linux/man-pages/man7/unix.7.html#BUGS + * But let's keep it simple, and not deal with that madness until someone demands it. */ + size_t sun_len; + if (aws_secure_strlen(s->sun_path, sizeof(tmp_endpoint.address), &sun_len)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: UNIX domain socket name is too long", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS); + } + memcpy(tmp_endpoint.address, s->sun_path, sun_len); +#if USE_VSOCK + } else if (address.ss_family == AF_VSOCK) { + struct sockaddr_vm *s = (struct sockaddr_vm *)&address; + + /* VSOCK port is 32bit, but aws_socket_endpoint.port is only 16bit. + * Hopefully this isn't an issue, since users can only pass in 16bit values. + * But if it becomes an issue, we'll need to make aws_socket_endpoint more flexible */ + if (s->svm_port > UINT16_MAX) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: aws_socket_endpoint can't deal with VSOCK port > UINT16_MAX", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS); + } + tmp_endpoint.port = (uint16_t)s->svm_port; + + snprintf(tmp_endpoint.address, sizeof(tmp_endpoint.address), "%" PRIu32, s->svm_cid); + return AWS_OP_SUCCESS; +#endif /* USE_VSOCK */ + } else { + AWS_ASSERT(0); + return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY); + } + + socket->local_endpoint = tmp_endpoint; + return AWS_OP_SUCCESS; +} + static void s_on_connection_error(struct aws_socket *socket, int error); static int s_on_connection_success(struct aws_socket *socket) { @@ -308,67 +400,8 @@ static int s_on_connection_success(struct aws_socket *socket) { AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd); - struct sockaddr_storage address; - AWS_ZERO_STRUCT(address); - socklen_t address_size = sizeof(address); - if (!getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size)) { - uint16_t port = 0; - - if (address.ss_family == AF_INET) { - struct sockaddr_in *s = (struct sockaddr_in *)&address; - port = ntohs(s->sin_port); - /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal - * once we add logging, we can log this if it fails. */ - if (inet_ntop( - AF_INET, &s->sin_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: local endpoint %s:%d", - (void *)socket, - socket->io_handle.data.fd, - socket->local_endpoint.address, - port); - } else { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: determining local endpoint failed", - (void *)socket, - socket->io_handle.data.fd); - } - } else if (address.ss_family == AF_INET6) { - struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address; - port = ntohs(s->sin6_port); - /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal - * once we add logging, we can log this if it fails. */ - if (inet_ntop( - AF_INET6, &s->sin6_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) { - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, - "id=%p fd %d: local endpoint %s:%d", - (void *)socket, - socket->io_handle.data.fd, - socket->local_endpoint.address, - port); - } else { - AWS_LOGF_WARN( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: determining local endpoint failed", - (void *)socket, - socket->io_handle.data.fd); - } - } - - socket->local_endpoint.port = port; - } else { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p fd=%d: getsockname() failed with error %d", - (void *)socket, - socket->io_handle.data.fd, - errno); - int aws_error = s_determine_socket_error(errno); - aws_raise_error(aws_error); - s_on_connection_error(socket, aws_error); + if (s_update_local_endpoint(socket)) { + s_on_connection_error(socket, aws_last_error()); return AWS_OP_ERR; } @@ -761,9 +794,6 @@ int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint return AWS_OP_ERR; } - int error_code = -1; - - socket->local_endpoint = *local_endpoint; AWS_LOGF_INFO( AWS_LS_IO_SOCKET, "id=%p fd=%d: binding to %s:%d.", @@ -813,31 +843,55 @@ int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint return aws_raise_error(s_convert_pton_error(pton_err)); } - error_code = bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size); + if (bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size) != 0) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: bind failed with error code %d", + (void *)socket, + socket->io_handle.data.fd, + errno); - if (!error_code) { - if (socket->options.type == AWS_SOCKET_STREAM) { - socket->state = BOUND; - } else { - /* e.g. UDP is now readable */ - socket->state = CONNECTED_READ; - } - AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully bound", (void *)socket, socket->io_handle.data.fd); + aws_raise_error(s_determine_socket_error(errno)); + goto error; + } - return AWS_OP_SUCCESS; + if (s_update_local_endpoint(socket)) { + goto error; } - socket->state = ERROR; - error_code = errno; - AWS_LOGF_ERROR( + if (socket->options.type == AWS_SOCKET_STREAM) { + socket->state = BOUND; + } else { + /* e.g. UDP is now readable */ + socket->state = CONNECTED_READ; + } + + AWS_LOGF_DEBUG( AWS_LS_IO_SOCKET, - "id=%p fd=%d: bind failed with error code %d", + "id=%p fd=%d: successfully bound to %s:%d", (void *)socket, socket->io_handle.data.fd, - error_code); + socket->local_endpoint.address, + socket->local_endpoint.port); - int aws_error = s_determine_socket_error(error_code); - return aws_raise_error(aws_error); + return AWS_OP_SUCCESS; + +error: + socket->state = ERROR; + return AWS_OP_ERR; +} + +int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address) { + if (socket->local_endpoint.address[0] == 0) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: Socket has no local address. Socket must be bound first.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); + } + *out_address = socket->local_endpoint; + return AWS_OP_SUCCESS; } int aws_socket_listen(struct aws_socket *socket, int backlog_size) { |