aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-06-03 13:58:54 +0300
committerarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-06-03 13:58:54 +0300
commit7736b27526bd7bd3fb159a4bccaae54361a0eee9 (patch)
treeacd09e5fdbe9c4db95583c783d51a4bbaeee29d1
parentba0318dede5aa2acb00e573e58487fa8c7494c4f (diff)
downloadydb-7736b27526bd7bd3fb159a4bccaae54361a0eee9.tar.gz
intermediate changes
ref:c1c5fe3098d793da0dfa5cd1b0c8a2008301dbf7
-rw-r--r--contrib/restricted/aws/aws-c-io/include/aws/io/channel.h21
-rw-r--r--contrib/restricted/aws/aws-c-io/include/aws/io/socket.h6
-rw-r--r--contrib/restricted/aws/aws-c-io/source/channel.c86
-rw-r--r--contrib/restricted/aws/aws-c-io/source/posix/socket.c216
4 files changed, 218 insertions, 111 deletions
diff --git a/contrib/restricted/aws/aws-c-io/include/aws/io/channel.h b/contrib/restricted/aws/aws-c-io/include/aws/io/channel.h
index 59b7f0d686..50dc5cce26 100644
--- a/contrib/restricted/aws/aws-c-io/include/aws/io/channel.h
+++ b/contrib/restricted/aws/aws-c-io/include/aws/io/channel.h
@@ -279,12 +279,33 @@ struct aws_io_message *aws_channel_acquire_message_from_pool(
* This is the ideal way to move a task into the correct thread. It's also handy for context switches.
* This function is safe to call from any thread.
*
+ * If called from the channel's event loop, the task will get directly added to the run-now list.
+ * If called from outside the channel's event loop, the task will go into a cross-thread task queue.
+ *
+ * If tasks must be serialized relative to some source synchronization, you may not want to use this API
+ * because tasks submitted from the event loop thread can "jump ahead" of tasks submitted from external threads
+ * due to this optimization. If this is a problem, you can either refactor your submission logic or use
+ * the aws_channel_schedule_task_now_serialized variant which does not perform this optimization.
+ *
* The task should not be cleaned up or modified until its function is executed.
*/
AWS_IO_API
void aws_channel_schedule_task_now(struct aws_channel *channel, struct aws_channel_task *task);
/**
+ * Schedules a task to run on the event loop as soon as possible.
+ *
+ * This variant always uses the cross thread queue rather than conditionally skipping it when already in
+ * the destination event loop. While not "optimal", this allows us to serialize task execution no matter where
+ * the task was submitted from: if you are submitting tasks from a critical section, the serialized order that you
+ * submit is guaranteed to be the order that they execute on the event loop.
+ *
+ * The task should not be cleaned up or modified until its function is executed.
+ */
+AWS_IO_API
+void aws_channel_schedule_task_now_serialized(struct aws_channel *channel, struct aws_channel_task *task);
+
+/**
* Schedules a task to run on the event loop at the specified time.
* This is the ideal way to move a task into the correct thread. It's also handy for context switches.
* Use aws_channel_current_clock_time() to get the current time in nanoseconds.
diff --git a/contrib/restricted/aws/aws-c-io/include/aws/io/socket.h b/contrib/restricted/aws/aws-c-io/include/aws/io/socket.h
index e433dc6230..b4f3200803 100644
--- a/contrib/restricted/aws/aws-c-io/include/aws/io/socket.h
+++ b/contrib/restricted/aws/aws-c-io/include/aws/io/socket.h
@@ -177,6 +177,12 @@ AWS_IO_API int aws_socket_connect(
AWS_IO_API int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint);
/**
+ * Get the local address which the socket is bound to.
+ * Raises an error if no address is bound.
+ */
+AWS_IO_API int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address);
+
+/**
* TCP, LOCAL and VSOCK only. Sets up the socket to listen on the address bound to in `aws_socket_bind()`.
*/
AWS_IO_API int aws_socket_listen(struct aws_socket *socket, int backlog_size);
diff --git a/contrib/restricted/aws/aws-c-io/source/channel.c b/contrib/restricted/aws/aws-c-io/source/channel.c
index 5eb5c1e7a2..1cf6886fa1 100644
--- a/contrib/restricted/aws/aws-c-io/source/channel.c
+++ b/contrib/restricted/aws/aws-c-io/source/channel.c
@@ -540,46 +540,39 @@ void aws_channel_task_init(
channel_task->type_tag = type_tag;
}
-/* Common functionality for scheduling "now" and "future" tasks.
- * For "now" tasks, pass 0 for `run_at_nanos` */
-static void s_register_pending_task(
+static void s_register_pending_task_in_event_loop(
struct aws_channel *channel,
struct aws_channel_task *channel_task,
uint64_t run_at_nanos) {
- /* Reset every property on channel task other than user's fn & arg.*/
- aws_task_init(&channel_task->wrapper_task, s_channel_task_run, channel, channel_task->type_tag);
- channel_task->wrapper_task.timestamp = run_at_nanos;
- aws_linked_list_node_reset(&channel_task->node);
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_CHANNEL,
+ "id=%p: scheduling task with wrapper task id %p.",
+ (void *)channel,
+ (void *)&channel_task->wrapper_task);
- if (aws_channel_thread_is_callers_thread(channel)) {
- AWS_LOGF_TRACE(
+ /* If channel is shut down, run task immediately as canceled */
+ if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
+ AWS_LOGF_DEBUG(
AWS_LS_IO_CHANNEL,
- "id=%p: scheduling task with wrapper task id %p.",
+ "id=%p: Running %s channel task immediately as canceled due to shut down channel",
(void *)channel,
- (void *)&channel_task->wrapper_task);
-
- /* If channel is shut down, run task immediately as canceled */
- if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_CHANNEL,
- "id=%p: Running %s channel task immediately as canceled due to shut down channel",
- (void *)channel,
- channel_task->type_tag);
- channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED);
- return;
- }
-
- aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node);
- if (run_at_nanos == 0) {
- aws_event_loop_schedule_task_now(channel->loop, &channel_task->wrapper_task);
- } else {
- aws_event_loop_schedule_task_future(
- channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp);
- }
+ channel_task->type_tag);
+ channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED);
return;
}
+ aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node);
+ if (run_at_nanos == 0) {
+ aws_event_loop_schedule_task_now(channel->loop, &channel_task->wrapper_task);
+ } else {
+ aws_event_loop_schedule_task_future(
+ channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp);
+ }
+}
+
+static void s_register_pending_task_cross_thread(struct aws_channel *channel, struct aws_channel_task *channel_task) {
+
AWS_LOGF_TRACE(
AWS_LS_IO_CHANNEL,
"id=%p: scheduling task with wrapper task id %p from "
@@ -609,10 +602,43 @@ static void s_register_pending_task(
}
}
+static void s_reset_pending_channel_task(
+ struct aws_channel *channel,
+ struct aws_channel_task *channel_task,
+ uint64_t run_at_nanos) {
+
+ /* Reset every property on channel task other than user's fn & arg.*/
+ aws_task_init(&channel_task->wrapper_task, s_channel_task_run, channel, channel_task->type_tag);
+ channel_task->wrapper_task.timestamp = run_at_nanos;
+ aws_linked_list_node_reset(&channel_task->node);
+}
+
+/* Common functionality for scheduling "now" and "future" tasks.
+ * For "now" tasks, pass 0 for `run_at_nanos` */
+static void s_register_pending_task(
+ struct aws_channel *channel,
+ struct aws_channel_task *channel_task,
+ uint64_t run_at_nanos) {
+
+ s_reset_pending_channel_task(channel, channel_task, run_at_nanos);
+
+ if (aws_channel_thread_is_callers_thread(channel)) {
+ s_register_pending_task_in_event_loop(channel, channel_task, run_at_nanos);
+ } else {
+ s_register_pending_task_cross_thread(channel, channel_task);
+ }
+}
+
void aws_channel_schedule_task_now(struct aws_channel *channel, struct aws_channel_task *task) {
s_register_pending_task(channel, task, 0);
}
+void aws_channel_schedule_task_now_serialized(struct aws_channel *channel, struct aws_channel_task *task) {
+
+ s_reset_pending_channel_task(channel, task, 0);
+ s_register_pending_task_cross_thread(channel, task);
+}
+
void aws_channel_schedule_task_future(
struct aws_channel *channel,
struct aws_channel_task *task,
diff --git a/contrib/restricted/aws/aws-c-io/source/posix/socket.c b/contrib/restricted/aws/aws-c-io/source/posix/socket.c
index 788ddaa986..95b03cee41 100644
--- a/contrib/restricted/aws/aws-c-io/source/posix/socket.c
+++ b/contrib/restricted/aws/aws-c-io/source/posix/socket.c
@@ -263,6 +263,98 @@ void aws_socket_clean_up(struct aws_socket *socket) {
socket->io_handle.data.fd = -1;
}
+/* Update socket->local_endpoint based on the results of getsockname() */
+static int s_update_local_endpoint(struct aws_socket *socket) {
+ struct aws_socket_endpoint tmp_endpoint;
+ AWS_ZERO_STRUCT(tmp_endpoint);
+
+ struct sockaddr_storage address;
+ AWS_ZERO_STRUCT(address);
+ socklen_t address_size = sizeof(address);
+
+ if (getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size) != 0) {
+ AWS_LOGF_ERROR(
+ AWS_LS_IO_SOCKET,
+ "id=%p fd=%d: getsockname() failed with error %d",
+ (void *)socket,
+ socket->io_handle.data.fd,
+ errno);
+ int aws_error = s_determine_socket_error(errno);
+ return aws_raise_error(aws_error);
+ }
+
+ if (address.ss_family == AF_INET) {
+ struct sockaddr_in *s = (struct sockaddr_in *)&address;
+ tmp_endpoint.port = ntohs(s->sin_port);
+ if (inet_ntop(AF_INET, &s->sin_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) {
+ AWS_LOGF_ERROR(
+ AWS_LS_IO_SOCKET,
+ "id=%p fd=%d: inet_ntop() failed with error %d",
+ (void *)socket,
+ socket->io_handle.data.fd,
+ errno);
+ int aws_error = s_determine_socket_error(errno);
+ return aws_raise_error(aws_error);
+ }
+ } else if (address.ss_family == AF_INET6) {
+ struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address;
+ tmp_endpoint.port = ntohs(s->sin6_port);
+ if (inet_ntop(AF_INET6, &s->sin6_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) {
+ AWS_LOGF_ERROR(
+ AWS_LS_IO_SOCKET,
+ "id=%p fd=%d: inet_ntop() failed with error %d",
+ (void *)socket,
+ socket->io_handle.data.fd,
+ errno);
+ int aws_error = s_determine_socket_error(errno);
+ return aws_raise_error(aws_error);
+ }
+ } else if (address.ss_family == AF_UNIX) {
+ struct sockaddr_un *s = (struct sockaddr_un *)&address;
+
+ /* Ensure there's a null-terminator.
+ * On some platforms it may be missing when the path gets very long. See:
+ * https://man7.org/linux/man-pages/man7/unix.7.html#BUGS
+ * But let's keep it simple, and not deal with that madness until someone demands it. */
+ size_t sun_len;
+ if (aws_secure_strlen(s->sun_path, sizeof(tmp_endpoint.address), &sun_len)) {
+ AWS_LOGF_ERROR(
+ AWS_LS_IO_SOCKET,
+ "id=%p fd=%d: UNIX domain socket name is too long",
+ (void *)socket,
+ socket->io_handle.data.fd);
+ return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS);
+ }
+ memcpy(tmp_endpoint.address, s->sun_path, sun_len);
+#if USE_VSOCK
+ } else if (address.ss_family == AF_VSOCK) {
+ struct sockaddr_vm *s = (struct sockaddr_vm *)&address;
+
+ /* VSOCK port is 32bit, but aws_socket_endpoint.port is only 16bit.
+ * Hopefully this isn't an issue, since users can only pass in 16bit values.
+ * But if it becomes an issue, we'll need to make aws_socket_endpoint more flexible */
+ if (s->svm_port > UINT16_MAX) {
+ AWS_LOGF_ERROR(
+ AWS_LS_IO_SOCKET,
+ "id=%p fd=%d: aws_socket_endpoint can't deal with VSOCK port > UINT16_MAX",
+ (void *)socket,
+ socket->io_handle.data.fd);
+ return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS);
+ }
+ tmp_endpoint.port = (uint16_t)s->svm_port;
+
+ snprintf(tmp_endpoint.address, sizeof(tmp_endpoint.address), "%" PRIu32, s->svm_cid);
+ return AWS_OP_SUCCESS;
+#endif /* USE_VSOCK */
+ } else {
+ AWS_ASSERT(0);
+ return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
+ }
+
+ socket->local_endpoint = tmp_endpoint;
+ return AWS_OP_SUCCESS;
+}
+
static void s_on_connection_error(struct aws_socket *socket, int error);
static int s_on_connection_success(struct aws_socket *socket) {
@@ -308,67 +400,8 @@ static int s_on_connection_success(struct aws_socket *socket) {
AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd);
- struct sockaddr_storage address;
- AWS_ZERO_STRUCT(address);
- socklen_t address_size = sizeof(address);
- if (!getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size)) {
- uint16_t port = 0;
-
- if (address.ss_family == AF_INET) {
- struct sockaddr_in *s = (struct sockaddr_in *)&address;
- port = ntohs(s->sin_port);
- /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal
- * once we add logging, we can log this if it fails. */
- if (inet_ntop(
- AF_INET, &s->sin_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: local endpoint %s:%d",
- (void *)socket,
- socket->io_handle.data.fd,
- socket->local_endpoint.address,
- port);
- } else {
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: determining local endpoint failed",
- (void *)socket,
- socket->io_handle.data.fd);
- }
- } else if (address.ss_family == AF_INET6) {
- struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address;
- port = ntohs(s->sin6_port);
- /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal
- * once we add logging, we can log this if it fails. */
- if (inet_ntop(
- AF_INET6, &s->sin6_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd %d: local endpoint %s:%d",
- (void *)socket,
- socket->io_handle.data.fd,
- socket->local_endpoint.address,
- port);
- } else {
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: determining local endpoint failed",
- (void *)socket,
- socket->io_handle.data.fd);
- }
- }
-
- socket->local_endpoint.port = port;
- } else {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: getsockname() failed with error %d",
- (void *)socket,
- socket->io_handle.data.fd,
- errno);
- int aws_error = s_determine_socket_error(errno);
- aws_raise_error(aws_error);
- s_on_connection_error(socket, aws_error);
+ if (s_update_local_endpoint(socket)) {
+ s_on_connection_error(socket, aws_last_error());
return AWS_OP_ERR;
}
@@ -761,9 +794,6 @@ int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint
return AWS_OP_ERR;
}
- int error_code = -1;
-
- socket->local_endpoint = *local_endpoint;
AWS_LOGF_INFO(
AWS_LS_IO_SOCKET,
"id=%p fd=%d: binding to %s:%d.",
@@ -813,31 +843,55 @@ int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint
return aws_raise_error(s_convert_pton_error(pton_err));
}
- error_code = bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size);
+ if (bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size) != 0) {
+ AWS_LOGF_ERROR(
+ AWS_LS_IO_SOCKET,
+ "id=%p fd=%d: bind failed with error code %d",
+ (void *)socket,
+ socket->io_handle.data.fd,
+ errno);
- if (!error_code) {
- if (socket->options.type == AWS_SOCKET_STREAM) {
- socket->state = BOUND;
- } else {
- /* e.g. UDP is now readable */
- socket->state = CONNECTED_READ;
- }
- AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully bound", (void *)socket, socket->io_handle.data.fd);
+ aws_raise_error(s_determine_socket_error(errno));
+ goto error;
+ }
- return AWS_OP_SUCCESS;
+ if (s_update_local_endpoint(socket)) {
+ goto error;
}
- socket->state = ERROR;
- error_code = errno;
- AWS_LOGF_ERROR(
+ if (socket->options.type == AWS_SOCKET_STREAM) {
+ socket->state = BOUND;
+ } else {
+ /* e.g. UDP is now readable */
+ socket->state = CONNECTED_READ;
+ }
+
+ AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
- "id=%p fd=%d: bind failed with error code %d",
+ "id=%p fd=%d: successfully bound to %s:%d",
(void *)socket,
socket->io_handle.data.fd,
- error_code);
+ socket->local_endpoint.address,
+ socket->local_endpoint.port);
- int aws_error = s_determine_socket_error(error_code);
- return aws_raise_error(aws_error);
+ return AWS_OP_SUCCESS;
+
+error:
+ socket->state = ERROR;
+ return AWS_OP_ERR;
+}
+
+int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address) {
+ if (socket->local_endpoint.address[0] == 0) {
+ AWS_LOGF_ERROR(
+ AWS_LS_IO_SOCKET,
+ "id=%p fd=%d: Socket has no local address. Socket must be bound first.",
+ (void *)socket,
+ socket->io_handle.data.fd);
+ return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
+ }
+ *out_address = socket->local_endpoint;
+ return AWS_OP_SUCCESS;
}
int aws_socket_listen(struct aws_socket *socket, int backlog_size) {