diff options
| author | robot-contrib <[email protected]> | 2025-07-05 09:07:48 +0300 |
|---|---|---|
| committer | robot-contrib <[email protected]> | 2025-07-05 09:32:43 +0300 |
| commit | d39c61aee0928bd6ff45b74c44a5703dae4e47ae (patch) | |
| tree | 80d2758d21e12acd823edd28d163f1836917e7cd | |
| parent | 8bd56b64a4c4a94d91c8a9518883513c9d6075f9 (diff) | |
Update contrib/restricted/aws/aws-c-event-stream to 0.5.5
commit_hash:8307bd2fe419c4785e2cf1b39f6127b9f71b54cf
7 files changed, 126 insertions, 35 deletions
diff --git a/contrib/restricted/aws/aws-c-event-stream/.yandex_meta/override.nix b/contrib/restricted/aws/aws-c-event-stream/.yandex_meta/override.nix index 5c5b7fb3926..f8278485a26 100644 --- a/contrib/restricted/aws/aws-c-event-stream/.yandex_meta/override.nix +++ b/contrib/restricted/aws/aws-c-event-stream/.yandex_meta/override.nix @@ -1,10 +1,10 @@ pkgs: attrs: with pkgs; with attrs; rec { - version = "0.5.4"; + version = "0.5.5"; src = fetchFromGitHub { owner = "awslabs"; repo = "aws-c-event-stream"; rev = "v${version}"; - hash = "sha256-Y4vyoNlYdIQg/NggBoYtX4CPiCzG24a4mKG8VGTdqy8="; + hash = "sha256-wVjpDKKwoksq5gFtvhH76c7ciP0XmMozhkWmzY6GwgU="; }; } diff --git a/contrib/restricted/aws/aws-c-event-stream/README.md b/contrib/restricted/aws/aws-c-event-stream/README.md index ad54b1bf29f..47a5360f76a 100644 --- a/contrib/restricted/aws/aws-c-event-stream/README.md +++ b/contrib/restricted/aws/aws-c-event-stream/README.md @@ -70,4 +70,4 @@ Total message overhead, including the prelude and both checksums, is 16 bytes. The following diagram shows the components that make up a message and a header. There are multiple headers per message. - + diff --git a/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_exports.h b/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_exports.h index 57c531db771..239f1cd7d86 100644 --- a/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_exports.h +++ b/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_exports.h @@ -4,7 +4,7 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ -#if defined(USE_WINDOWS_DLL_SEMANTICS) || defined(WIN32) +#if defined(AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined(_WIN32) # ifdef AWS_EVENT_STREAM_USE_IMPORT_EXPORT # ifdef AWS_EVENT_STREAM_EXPORTS # define AWS_EVENT_STREAM_API __declspec(dllexport) @@ -15,15 +15,14 @@ # define AWS_EVENT_STREAM_API # endif /* AWS_EVENT_STREAM_USE_IMPORT_EXPORT */ -#else /* defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) */ +#else /* defined (AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32) */ -# if ((__GNUC__ >= 4) || defined(__clang__)) && defined(AWS_EVENT_STREAM_USE_IMPORT_EXPORT) && \ - defined(AWS_EVENT_STREAM_EXPORTS) +# if defined(AWS_EVENT_STREAM_USE_IMPORT_EXPORT) && defined(AWS_EVENT_STREAM_EXPORTS) # define AWS_EVENT_STREAM_API __attribute__((visibility("default"))) # else # define AWS_EVENT_STREAM_API -# endif /* __GNUC__ >= 4 || defined(__clang__) */ +# endif -#endif /* defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) */ +#endif /* defined (AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32) */ #endif /* AWS_EVENT_STREAM_EXPORTS_H */ diff --git a/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_rpc_client.h b/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_rpc_client.h index 24c7824d0f7..490e876cd79 100644 --- a/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_rpc_client.h +++ b/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_rpc_client.h @@ -10,6 +10,7 @@ AWS_PUSH_SANE_WARNING_LEVEL struct aws_channel; +struct aws_event_loop; struct aws_event_stream_rpc_client_connection; struct aws_event_stream_rpc_client_continuation_token; @@ -30,9 +31,15 @@ typedef void(aws_event_stream_rpc_client_stream_continuation_closed_fn)( struct aws_event_stream_rpc_client_continuation_token *token, void *user_data); +/** + * Invoked after a continuation has been fully destroyed. Listeners know that no further callbacks are possible. + */ +typedef void(aws_event_stream_rpc_client_stream_continuation_terminated_fn)(void *user_data); + struct aws_event_stream_rpc_client_stream_continuation_options { aws_event_stream_rpc_client_stream_continuation_fn *on_continuation; aws_event_stream_rpc_client_stream_continuation_closed_fn *on_continuation_closed; + aws_event_stream_rpc_client_stream_continuation_terminated_fn *on_continuation_terminated; void *user_data; }; @@ -74,6 +81,11 @@ typedef void(aws_event_stream_rpc_client_on_connection_setup_fn)( void *user_data); /** + * Invoked when a connection has been completely destroyed. + */ +typedef void(aws_event_stream_rpc_client_on_connection_terminated_fn)(void *user_data); + +/** * Invoked whenever a message has been flushed to the channel. */ typedef void(aws_event_stream_rpc_client_message_flush_fn)(int error_code, void *user_data); @@ -93,6 +105,7 @@ struct aws_event_stream_rpc_client_connection_options { aws_event_stream_rpc_client_on_connection_setup_fn *on_connection_setup; aws_event_stream_rpc_client_connection_protocol_message_fn *on_connection_protocol_message; aws_event_stream_rpc_client_on_connection_shutdown_fn *on_connection_shutdown; + aws_event_stream_rpc_client_on_connection_terminated_fn *on_connection_terminated; void *user_data; }; @@ -143,6 +156,12 @@ AWS_EVENT_STREAM_API int aws_event_stream_rpc_client_connection_send_protocol_me void *user_data); /** + * Returns the event loop that a connection is seated on. + */ +AWS_EVENT_STREAM_API struct aws_event_loop *aws_event_stream_rpc_client_connection_get_event_loop( + const struct aws_event_stream_rpc_client_connection *connection); + +/** * Create a new stream. continuation_option's callbacks will not be invoked, and nothing will be sent across the wire * until aws_event_stream_rpc_client_continuation_activate() is invoked. * diff --git a/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_client.c b/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_client.c index ab17c26ef25..0ccfa3574a6 100644 --- a/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_client.c +++ b/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_client.c @@ -14,6 +14,8 @@ #include <inttypes.h> +#include "aws/io/event_loop.h" + #ifdef _MSC_VER /* allow declared initializer using address of automatic variable */ # pragma warning(disable : 4221) @@ -28,6 +30,7 @@ struct aws_event_stream_rpc_client_connection { struct aws_allocator *allocator; struct aws_hash_table continuation_table; struct aws_client_bootstrap *bootstrap_ref; + struct aws_event_loop *event_loop; struct aws_atomic_var ref_count; struct aws_channel *channel; struct aws_channel_handler *event_stream_handler; @@ -39,6 +42,7 @@ struct aws_event_stream_rpc_client_connection { aws_event_stream_rpc_client_on_connection_setup_fn *on_connection_setup; aws_event_stream_rpc_client_connection_protocol_message_fn *on_connection_protocol_message; aws_event_stream_rpc_client_on_connection_shutdown_fn *on_connection_shutdown; + aws_event_stream_rpc_client_on_connection_terminated_fn *on_connection_terminated; void *user_data; bool bootstrap_owned; bool enable_read_back_pressure; @@ -49,6 +53,7 @@ struct aws_event_stream_rpc_client_continuation_token { struct aws_event_stream_rpc_client_connection *connection; aws_event_stream_rpc_client_stream_continuation_fn *continuation_fn; aws_event_stream_rpc_client_stream_continuation_closed_fn *closed_fn; + aws_event_stream_rpc_client_stream_continuation_terminated_fn *terminated_fn; void *user_data; struct aws_atomic_var ref_count; struct aws_atomic_var is_closed; @@ -187,9 +192,7 @@ static void s_on_channel_shutdown_fn( aws_event_stream_rpc_client_connection_release(connection); } -/* Set each continuation's is_closed=true. - * A lock MUST be held while calling this. - * For use with aws_hash_table_foreach(). */ +/* Set each continuation's is_closed=true. */ static int s_mark_each_continuation_closed(void *context, struct aws_hash_element *p_element) { (void)context; struct aws_event_stream_rpc_client_continuation_token *continuation = p_element->value; @@ -233,18 +236,31 @@ static int s_complete_and_clear_each_continuation(void *context, struct aws_hash static void s_clear_continuation_table(struct aws_event_stream_rpc_client_connection *connection) { AWS_ASSERT(!aws_event_stream_rpc_client_connection_is_open(connection)); + struct aws_hash_table temp_table; + aws_hash_table_init( + &temp_table, + connection->allocator, + 64, + aws_event_stream_rpc_hash_streamid, + aws_event_stream_rpc_streamid_eq, + NULL, + NULL); + /* Use lock to ensure synchronization with code that adds entries to table. * Since connection was just marked closed, no further entries will be - * added to table once we acquire the lock. */ + * added to table once we acquire the lock. + * + * While no further entries can be added, there are concurrent execution paths where things can be + * removed. So rather than iterating the connection's table, swap it out for an empty one and iterate + * the temporary table instead. Removing from an empty table will be harmless. + */ aws_mutex_lock(&connection->stream_lock); - aws_hash_table_foreach(&connection->continuation_table, s_mark_each_continuation_closed, NULL); + aws_hash_table_swap(&temp_table, &connection->continuation_table); aws_mutex_unlock(&connection->stream_lock); - /* Now release lock before invoking callbacks. - * It's safe to alter the table now without a lock, since no further - * entries can be added, and we've gone through the critical section - * above to ensure synchronization */ - aws_hash_table_foreach(&connection->continuation_table, s_complete_and_clear_each_continuation, NULL); + aws_hash_table_foreach(&temp_table, s_mark_each_continuation_closed, NULL); + aws_hash_table_foreach(&temp_table, s_complete_and_clear_each_continuation, NULL); + aws_hash_table_clean_up(&temp_table); } int aws_event_stream_rpc_client_connection_connect( @@ -268,6 +284,8 @@ int aws_event_stream_rpc_client_connection_connect( connection->allocator = allocator; aws_atomic_init_int(&connection->ref_count, 1); connection->bootstrap_ref = conn_options->bootstrap; + connection->event_loop = aws_event_loop_group_get_next_loop(connection->bootstrap_ref->event_loop_group); + /* this is released in the connection release which gets called regardless of if this function is successful or * not*/ aws_client_bootstrap_acquire(connection->bootstrap_ref); @@ -276,6 +294,7 @@ int aws_event_stream_rpc_client_connection_connect( aws_mutex_init(&connection->stream_lock); connection->on_connection_shutdown = conn_options->on_connection_shutdown; + connection->on_connection_terminated = conn_options->on_connection_terminated; connection->on_connection_protocol_message = conn_options->on_connection_protocol_message; connection->on_connection_setup = conn_options->on_connection_setup; connection->user_data = conn_options->user_data; @@ -307,6 +326,7 @@ int aws_event_stream_rpc_client_connection_connect( .enable_read_back_pressure = false, .setup_callback = s_on_channel_setup_fn, .shutdown_callback = s_on_channel_shutdown_fn, + .requested_event_loop = connection->event_loop, }; if (aws_client_bootstrap_new_socket_channel(&bootstrap_options)) { @@ -340,7 +360,15 @@ static void s_destroy_connection(struct aws_event_stream_rpc_client_connection * AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: destroying connection.", (void *)connection); aws_hash_table_clean_up(&connection->continuation_table); aws_client_bootstrap_release(connection->bootstrap_ref); + + aws_event_stream_rpc_client_on_connection_terminated_fn *terminated_fn = connection->on_connection_terminated; + void *terminated_user_data = connection->user_data; + aws_mem_release(connection->allocator, connection); + + if (terminated_fn) { + terminated_fn(terminated_user_data); + } } void aws_event_stream_rpc_client_connection_release(const struct aws_event_stream_rpc_client_connection *connection) { @@ -434,13 +462,19 @@ static void s_on_protocol_message_written_fn( AWS_FATAL_ASSERT(message_args->continuation && "end stream flag was set but it wasn't on a continuation"); aws_atomic_store_int(&message_args->continuation->is_closed, 1U); + int was_present = 0; aws_mutex_lock(&message_args->connection->stream_lock); aws_hash_table_remove( - &message_args->connection->continuation_table, &message_args->continuation->stream_id, NULL, NULL); + &message_args->connection->continuation_table, &message_args->continuation->stream_id, NULL, &was_present); aws_mutex_unlock(&message_args->connection->stream_lock); - /* Lock must NOT be held while invoking callback */ - s_complete_continuation(message_args->continuation); + /* + * Whoever successfully removes the continuation from the table gets to complete it. + * Lock must NOT be held while invoking callback + */ + if (was_present) { + s_complete_continuation(message_args->continuation); + } } message_args->flush_fn(error_code, message_args->user_data); @@ -770,7 +804,6 @@ static void s_route_message_by_type( aws_mutex_unlock(&connection->stream_lock); continuation->continuation_fn(continuation, &message_args, continuation->user_data); - aws_event_stream_rpc_client_continuation_release(continuation); /* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */ if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) { @@ -780,13 +813,21 @@ static void s_route_message_by_type( (void *)connection, (void *)continuation); aws_atomic_store_int(&continuation->is_closed, 1U); + int was_present = 0; aws_mutex_lock(&connection->stream_lock); - aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL); + aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, &was_present); aws_mutex_unlock(&connection->stream_lock); - /* Note that we do not invoke callback while holding lock */ - s_complete_continuation(continuation); + /* + * Whoever successfully removes the continuation from the table gets to complete it. + * Lock must NOT be held while invoking callback + */ + if (was_present) { + s_complete_continuation(continuation); + } } + + aws_event_stream_rpc_client_continuation_release(continuation); } else { if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR || message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) { @@ -915,6 +956,7 @@ struct aws_event_stream_rpc_client_continuation_token *aws_event_stream_rpc_clie aws_atomic_init_int(&continuation->is_complete, 0); continuation->continuation_fn = continuation_options->on_continuation; continuation->closed_fn = continuation_options->on_continuation_closed; + continuation->terminated_fn = continuation_options->on_continuation_terminated; continuation->user_data = continuation_options->user_data; return continuation; @@ -959,7 +1001,15 @@ void aws_event_stream_rpc_client_continuation_release( if (ref_count == 1) { struct aws_allocator *allocator = continuation_mut->connection->allocator; aws_event_stream_rpc_client_connection_release(continuation_mut->connection); + + aws_event_stream_rpc_client_stream_continuation_terminated_fn *terminated_fn = continuation_mut->terminated_fn; + void *terminated_user_data = continuation_mut->user_data; + aws_mem_release(allocator, continuation_mut); + + if (terminated_fn) { + terminated_fn(terminated_user_data); + } } } @@ -1062,3 +1112,13 @@ int aws_event_stream_rpc_client_continuation_send_message( return s_send_protocol_message( continuation->connection, continuation, NULL, message_args, continuation->stream_id, flush_fn, user_data); } + +struct aws_event_loop *aws_event_stream_rpc_client_connection_get_event_loop( + const struct aws_event_stream_rpc_client_connection *connection) { + + if (!connection) { + return NULL; + } + + return connection->event_loop; +} diff --git a/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_server.c b/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_server.c index e8af15e7da0..0a9ad31c345 100644 --- a/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_server.c +++ b/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_server.c @@ -901,13 +901,26 @@ static void s_route_message_by_type( struct aws_hash_element *continuation_element = NULL; if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) || !continuation_element) { - AWS_LOGF_ERROR( - AWS_LS_EVENT_STREAM_RPC_SERVER, - "id=%p: stream_id does not have a corresponding continuation", - (void *)connection); - aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR); - s_send_connection_level_error( - connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error); + if ((message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) == 0) { + AWS_LOGF_ERROR( + AWS_LS_EVENT_STREAM_RPC_SERVER, + "id=%p: stream_id does not have a corresponding continuation", + (void *)connection); + aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR); + s_send_connection_level_error( + connection, + AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, + 0, + &s_invalid_client_stream_id_error); + } else { + /* Simultaneous close can trip this condition */ + AWS_LOGF_DEBUG( + AWS_LS_EVENT_STREAM_RPC_SERVER, + "id=%p: received a terminate stream message for stream_id %d, which no longer has a " + "corresponding continuation", + (void *)connection, + (int)stream_id); + } return; } diff --git a/contrib/restricted/aws/aws-c-event-stream/ya.make b/contrib/restricted/aws/aws-c-event-stream/ya.make index 0d927c94e79..ebdac4df658 100644 --- a/contrib/restricted/aws/aws-c-event-stream/ya.make +++ b/contrib/restricted/aws/aws-c-event-stream/ya.make @@ -6,9 +6,9 @@ LICENSE(Apache-2.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(0.5.4) +VERSION(0.5.5) -ORIGINAL_SOURCE(https://github.com/awslabs/aws-c-event-stream/archive/v0.5.4.tar.gz) +ORIGINAL_SOURCE(https://github.com/awslabs/aws-c-event-stream/archive/v0.5.5.tar.gz) PEERDIR( contrib/restricted/aws/aws-c-common |
