summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-contrib <[email protected]>2025-07-05 09:07:48 +0300
committerrobot-contrib <[email protected]>2025-07-05 09:32:43 +0300
commitd39c61aee0928bd6ff45b74c44a5703dae4e47ae (patch)
tree80d2758d21e12acd823edd28d163f1836917e7cd
parent8bd56b64a4c4a94d91c8a9518883513c9d6075f9 (diff)
Update contrib/restricted/aws/aws-c-event-stream to 0.5.5
commit_hash:8307bd2fe419c4785e2cf1b39f6127b9f71b54cf
-rw-r--r--contrib/restricted/aws/aws-c-event-stream/.yandex_meta/override.nix4
-rw-r--r--contrib/restricted/aws/aws-c-event-stream/README.md2
-rw-r--r--contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_exports.h11
-rw-r--r--contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_rpc_client.h19
-rw-r--r--contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_client.c94
-rw-r--r--contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_server.c27
-rw-r--r--contrib/restricted/aws/aws-c-event-stream/ya.make4
7 files changed, 126 insertions, 35 deletions
diff --git a/contrib/restricted/aws/aws-c-event-stream/.yandex_meta/override.nix b/contrib/restricted/aws/aws-c-event-stream/.yandex_meta/override.nix
index 5c5b7fb3926..f8278485a26 100644
--- a/contrib/restricted/aws/aws-c-event-stream/.yandex_meta/override.nix
+++ b/contrib/restricted/aws/aws-c-event-stream/.yandex_meta/override.nix
@@ -1,10 +1,10 @@
pkgs: attrs: with pkgs; with attrs; rec {
- version = "0.5.4";
+ version = "0.5.5";
src = fetchFromGitHub {
owner = "awslabs";
repo = "aws-c-event-stream";
rev = "v${version}";
- hash = "sha256-Y4vyoNlYdIQg/NggBoYtX4CPiCzG24a4mKG8VGTdqy8=";
+ hash = "sha256-wVjpDKKwoksq5gFtvhH76c7ciP0XmMozhkWmzY6GwgU=";
};
}
diff --git a/contrib/restricted/aws/aws-c-event-stream/README.md b/contrib/restricted/aws/aws-c-event-stream/README.md
index ad54b1bf29f..47a5360f76a 100644
--- a/contrib/restricted/aws/aws-c-event-stream/README.md
+++ b/contrib/restricted/aws/aws-c-event-stream/README.md
@@ -70,4 +70,4 @@ Total message overhead, including the prelude and both checksums, is 16 bytes.
The following diagram shows the components that make up a message and a header. There are multiple headers per message.
-![Encoding Diagram](docs/encoding.png)
+![Encoding Diagram](docs/images/encoding.png)
diff --git a/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_exports.h b/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_exports.h
index 57c531db771..239f1cd7d86 100644
--- a/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_exports.h
+++ b/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_exports.h
@@ -4,7 +4,7 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
-#if defined(USE_WINDOWS_DLL_SEMANTICS) || defined(WIN32)
+#if defined(AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined(_WIN32)
# ifdef AWS_EVENT_STREAM_USE_IMPORT_EXPORT
# ifdef AWS_EVENT_STREAM_EXPORTS
# define AWS_EVENT_STREAM_API __declspec(dllexport)
@@ -15,15 +15,14 @@
# define AWS_EVENT_STREAM_API
# endif /* AWS_EVENT_STREAM_USE_IMPORT_EXPORT */
-#else /* defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) */
+#else /* defined (AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32) */
-# if ((__GNUC__ >= 4) || defined(__clang__)) && defined(AWS_EVENT_STREAM_USE_IMPORT_EXPORT) && \
- defined(AWS_EVENT_STREAM_EXPORTS)
+# if defined(AWS_EVENT_STREAM_USE_IMPORT_EXPORT) && defined(AWS_EVENT_STREAM_EXPORTS)
# define AWS_EVENT_STREAM_API __attribute__((visibility("default")))
# else
# define AWS_EVENT_STREAM_API
-# endif /* __GNUC__ >= 4 || defined(__clang__) */
+# endif
-#endif /* defined (USE_WINDOWS_DLL_SEMANTICS) || defined (WIN32) */
+#endif /* defined (AWS_CRT_USE_WINDOWS_DLL_SEMANTICS) || defined (_WIN32) */
#endif /* AWS_EVENT_STREAM_EXPORTS_H */
diff --git a/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_rpc_client.h b/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_rpc_client.h
index 24c7824d0f7..490e876cd79 100644
--- a/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_rpc_client.h
+++ b/contrib/restricted/aws/aws-c-event-stream/include/aws/event-stream/event_stream_rpc_client.h
@@ -10,6 +10,7 @@
AWS_PUSH_SANE_WARNING_LEVEL
struct aws_channel;
+struct aws_event_loop;
struct aws_event_stream_rpc_client_connection;
struct aws_event_stream_rpc_client_continuation_token;
@@ -30,9 +31,15 @@ typedef void(aws_event_stream_rpc_client_stream_continuation_closed_fn)(
struct aws_event_stream_rpc_client_continuation_token *token,
void *user_data);
+/**
+ * Invoked after a continuation has been fully destroyed. Listeners know that no further callbacks are possible.
+ */
+typedef void(aws_event_stream_rpc_client_stream_continuation_terminated_fn)(void *user_data);
+
struct aws_event_stream_rpc_client_stream_continuation_options {
aws_event_stream_rpc_client_stream_continuation_fn *on_continuation;
aws_event_stream_rpc_client_stream_continuation_closed_fn *on_continuation_closed;
+ aws_event_stream_rpc_client_stream_continuation_terminated_fn *on_continuation_terminated;
void *user_data;
};
@@ -74,6 +81,11 @@ typedef void(aws_event_stream_rpc_client_on_connection_setup_fn)(
void *user_data);
/**
+ * Invoked when a connection has been completely destroyed.
+ */
+typedef void(aws_event_stream_rpc_client_on_connection_terminated_fn)(void *user_data);
+
+/**
* Invoked whenever a message has been flushed to the channel.
*/
typedef void(aws_event_stream_rpc_client_message_flush_fn)(int error_code, void *user_data);
@@ -93,6 +105,7 @@ struct aws_event_stream_rpc_client_connection_options {
aws_event_stream_rpc_client_on_connection_setup_fn *on_connection_setup;
aws_event_stream_rpc_client_connection_protocol_message_fn *on_connection_protocol_message;
aws_event_stream_rpc_client_on_connection_shutdown_fn *on_connection_shutdown;
+ aws_event_stream_rpc_client_on_connection_terminated_fn *on_connection_terminated;
void *user_data;
};
@@ -143,6 +156,12 @@ AWS_EVENT_STREAM_API int aws_event_stream_rpc_client_connection_send_protocol_me
void *user_data);
/**
+ * Returns the event loop that a connection is seated on.
+ */
+AWS_EVENT_STREAM_API struct aws_event_loop *aws_event_stream_rpc_client_connection_get_event_loop(
+ const struct aws_event_stream_rpc_client_connection *connection);
+
+/**
* Create a new stream. continuation_option's callbacks will not be invoked, and nothing will be sent across the wire
* until aws_event_stream_rpc_client_continuation_activate() is invoked.
*
diff --git a/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_client.c b/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_client.c
index ab17c26ef25..0ccfa3574a6 100644
--- a/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_client.c
+++ b/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_client.c
@@ -14,6 +14,8 @@
#include <inttypes.h>
+#include "aws/io/event_loop.h"
+
#ifdef _MSC_VER
/* allow declared initializer using address of automatic variable */
# pragma warning(disable : 4221)
@@ -28,6 +30,7 @@ struct aws_event_stream_rpc_client_connection {
struct aws_allocator *allocator;
struct aws_hash_table continuation_table;
struct aws_client_bootstrap *bootstrap_ref;
+ struct aws_event_loop *event_loop;
struct aws_atomic_var ref_count;
struct aws_channel *channel;
struct aws_channel_handler *event_stream_handler;
@@ -39,6 +42,7 @@ struct aws_event_stream_rpc_client_connection {
aws_event_stream_rpc_client_on_connection_setup_fn *on_connection_setup;
aws_event_stream_rpc_client_connection_protocol_message_fn *on_connection_protocol_message;
aws_event_stream_rpc_client_on_connection_shutdown_fn *on_connection_shutdown;
+ aws_event_stream_rpc_client_on_connection_terminated_fn *on_connection_terminated;
void *user_data;
bool bootstrap_owned;
bool enable_read_back_pressure;
@@ -49,6 +53,7 @@ struct aws_event_stream_rpc_client_continuation_token {
struct aws_event_stream_rpc_client_connection *connection;
aws_event_stream_rpc_client_stream_continuation_fn *continuation_fn;
aws_event_stream_rpc_client_stream_continuation_closed_fn *closed_fn;
+ aws_event_stream_rpc_client_stream_continuation_terminated_fn *terminated_fn;
void *user_data;
struct aws_atomic_var ref_count;
struct aws_atomic_var is_closed;
@@ -187,9 +192,7 @@ static void s_on_channel_shutdown_fn(
aws_event_stream_rpc_client_connection_release(connection);
}
-/* Set each continuation's is_closed=true.
- * A lock MUST be held while calling this.
- * For use with aws_hash_table_foreach(). */
+/* Set each continuation's is_closed=true. */
static int s_mark_each_continuation_closed(void *context, struct aws_hash_element *p_element) {
(void)context;
struct aws_event_stream_rpc_client_continuation_token *continuation = p_element->value;
@@ -233,18 +236,31 @@ static int s_complete_and_clear_each_continuation(void *context, struct aws_hash
static void s_clear_continuation_table(struct aws_event_stream_rpc_client_connection *connection) {
AWS_ASSERT(!aws_event_stream_rpc_client_connection_is_open(connection));
+ struct aws_hash_table temp_table;
+ aws_hash_table_init(
+ &temp_table,
+ connection->allocator,
+ 64,
+ aws_event_stream_rpc_hash_streamid,
+ aws_event_stream_rpc_streamid_eq,
+ NULL,
+ NULL);
+
/* Use lock to ensure synchronization with code that adds entries to table.
* Since connection was just marked closed, no further entries will be
- * added to table once we acquire the lock. */
+ * added to table once we acquire the lock.
+ *
+ * While no further entries can be added, there are concurrent execution paths where things can be
+ * removed. So rather than iterating the connection's table, swap it out for an empty one and iterate
+ * the temporary table instead. Removing from an empty table will be harmless.
+ */
aws_mutex_lock(&connection->stream_lock);
- aws_hash_table_foreach(&connection->continuation_table, s_mark_each_continuation_closed, NULL);
+ aws_hash_table_swap(&temp_table, &connection->continuation_table);
aws_mutex_unlock(&connection->stream_lock);
- /* Now release lock before invoking callbacks.
- * It's safe to alter the table now without a lock, since no further
- * entries can be added, and we've gone through the critical section
- * above to ensure synchronization */
- aws_hash_table_foreach(&connection->continuation_table, s_complete_and_clear_each_continuation, NULL);
+ aws_hash_table_foreach(&temp_table, s_mark_each_continuation_closed, NULL);
+ aws_hash_table_foreach(&temp_table, s_complete_and_clear_each_continuation, NULL);
+ aws_hash_table_clean_up(&temp_table);
}
int aws_event_stream_rpc_client_connection_connect(
@@ -268,6 +284,8 @@ int aws_event_stream_rpc_client_connection_connect(
connection->allocator = allocator;
aws_atomic_init_int(&connection->ref_count, 1);
connection->bootstrap_ref = conn_options->bootstrap;
+ connection->event_loop = aws_event_loop_group_get_next_loop(connection->bootstrap_ref->event_loop_group);
+
/* this is released in the connection release which gets called regardless of if this function is successful or
* not*/
aws_client_bootstrap_acquire(connection->bootstrap_ref);
@@ -276,6 +294,7 @@ int aws_event_stream_rpc_client_connection_connect(
aws_mutex_init(&connection->stream_lock);
connection->on_connection_shutdown = conn_options->on_connection_shutdown;
+ connection->on_connection_terminated = conn_options->on_connection_terminated;
connection->on_connection_protocol_message = conn_options->on_connection_protocol_message;
connection->on_connection_setup = conn_options->on_connection_setup;
connection->user_data = conn_options->user_data;
@@ -307,6 +326,7 @@ int aws_event_stream_rpc_client_connection_connect(
.enable_read_back_pressure = false,
.setup_callback = s_on_channel_setup_fn,
.shutdown_callback = s_on_channel_shutdown_fn,
+ .requested_event_loop = connection->event_loop,
};
if (aws_client_bootstrap_new_socket_channel(&bootstrap_options)) {
@@ -340,7 +360,15 @@ static void s_destroy_connection(struct aws_event_stream_rpc_client_connection *
AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: destroying connection.", (void *)connection);
aws_hash_table_clean_up(&connection->continuation_table);
aws_client_bootstrap_release(connection->bootstrap_ref);
+
+ aws_event_stream_rpc_client_on_connection_terminated_fn *terminated_fn = connection->on_connection_terminated;
+ void *terminated_user_data = connection->user_data;
+
aws_mem_release(connection->allocator, connection);
+
+ if (terminated_fn) {
+ terminated_fn(terminated_user_data);
+ }
}
void aws_event_stream_rpc_client_connection_release(const struct aws_event_stream_rpc_client_connection *connection) {
@@ -434,13 +462,19 @@ static void s_on_protocol_message_written_fn(
AWS_FATAL_ASSERT(message_args->continuation && "end stream flag was set but it wasn't on a continuation");
aws_atomic_store_int(&message_args->continuation->is_closed, 1U);
+ int was_present = 0;
aws_mutex_lock(&message_args->connection->stream_lock);
aws_hash_table_remove(
- &message_args->connection->continuation_table, &message_args->continuation->stream_id, NULL, NULL);
+ &message_args->connection->continuation_table, &message_args->continuation->stream_id, NULL, &was_present);
aws_mutex_unlock(&message_args->connection->stream_lock);
- /* Lock must NOT be held while invoking callback */
- s_complete_continuation(message_args->continuation);
+ /*
+ * Whoever successfully removes the continuation from the table gets to complete it.
+ * Lock must NOT be held while invoking callback
+ */
+ if (was_present) {
+ s_complete_continuation(message_args->continuation);
+ }
}
message_args->flush_fn(error_code, message_args->user_data);
@@ -770,7 +804,6 @@ static void s_route_message_by_type(
aws_mutex_unlock(&connection->stream_lock);
continuation->continuation_fn(continuation, &message_args, continuation->user_data);
- aws_event_stream_rpc_client_continuation_release(continuation);
/* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
@@ -780,13 +813,21 @@ static void s_route_message_by_type(
(void *)connection,
(void *)continuation);
aws_atomic_store_int(&continuation->is_closed, 1U);
+ int was_present = 0;
aws_mutex_lock(&connection->stream_lock);
- aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
+ aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, &was_present);
aws_mutex_unlock(&connection->stream_lock);
- /* Note that we do not invoke callback while holding lock */
- s_complete_continuation(continuation);
+ /*
+ * Whoever successfully removes the continuation from the table gets to complete it.
+ * Lock must NOT be held while invoking callback
+ */
+ if (was_present) {
+ s_complete_continuation(continuation);
+ }
}
+
+ aws_event_stream_rpc_client_continuation_release(continuation);
} else {
if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) {
@@ -915,6 +956,7 @@ struct aws_event_stream_rpc_client_continuation_token *aws_event_stream_rpc_clie
aws_atomic_init_int(&continuation->is_complete, 0);
continuation->continuation_fn = continuation_options->on_continuation;
continuation->closed_fn = continuation_options->on_continuation_closed;
+ continuation->terminated_fn = continuation_options->on_continuation_terminated;
continuation->user_data = continuation_options->user_data;
return continuation;
@@ -959,7 +1001,15 @@ void aws_event_stream_rpc_client_continuation_release(
if (ref_count == 1) {
struct aws_allocator *allocator = continuation_mut->connection->allocator;
aws_event_stream_rpc_client_connection_release(continuation_mut->connection);
+
+ aws_event_stream_rpc_client_stream_continuation_terminated_fn *terminated_fn = continuation_mut->terminated_fn;
+ void *terminated_user_data = continuation_mut->user_data;
+
aws_mem_release(allocator, continuation_mut);
+
+ if (terminated_fn) {
+ terminated_fn(terminated_user_data);
+ }
}
}
@@ -1062,3 +1112,13 @@ int aws_event_stream_rpc_client_continuation_send_message(
return s_send_protocol_message(
continuation->connection, continuation, NULL, message_args, continuation->stream_id, flush_fn, user_data);
}
+
+struct aws_event_loop *aws_event_stream_rpc_client_connection_get_event_loop(
+ const struct aws_event_stream_rpc_client_connection *connection) {
+
+ if (!connection) {
+ return NULL;
+ }
+
+ return connection->event_loop;
+}
diff --git a/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_server.c b/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_server.c
index e8af15e7da0..0a9ad31c345 100644
--- a/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_server.c
+++ b/contrib/restricted/aws/aws-c-event-stream/source/event_stream_rpc_server.c
@@ -901,13 +901,26 @@ static void s_route_message_by_type(
struct aws_hash_element *continuation_element = NULL;
if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) ||
!continuation_element) {
- AWS_LOGF_ERROR(
- AWS_LS_EVENT_STREAM_RPC_SERVER,
- "id=%p: stream_id does not have a corresponding continuation",
- (void *)connection);
- aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
- s_send_connection_level_error(
- connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error);
+ if ((message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) == 0) {
+ AWS_LOGF_ERROR(
+ AWS_LS_EVENT_STREAM_RPC_SERVER,
+ "id=%p: stream_id does not have a corresponding continuation",
+ (void *)connection);
+ aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
+ s_send_connection_level_error(
+ connection,
+ AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR,
+ 0,
+ &s_invalid_client_stream_id_error);
+ } else {
+ /* Simultaneous close can trip this condition */
+ AWS_LOGF_DEBUG(
+ AWS_LS_EVENT_STREAM_RPC_SERVER,
+ "id=%p: received a terminate stream message for stream_id %d, which no longer has a "
+ "corresponding continuation",
+ (void *)connection,
+ (int)stream_id);
+ }
return;
}
diff --git a/contrib/restricted/aws/aws-c-event-stream/ya.make b/contrib/restricted/aws/aws-c-event-stream/ya.make
index 0d927c94e79..ebdac4df658 100644
--- a/contrib/restricted/aws/aws-c-event-stream/ya.make
+++ b/contrib/restricted/aws/aws-c-event-stream/ya.make
@@ -6,9 +6,9 @@ LICENSE(Apache-2.0)
LICENSE_TEXTS(.yandex_meta/licenses.list.txt)
-VERSION(0.5.4)
+VERSION(0.5.5)
-ORIGINAL_SOURCE(https://github.com/awslabs/aws-c-event-stream/archive/v0.5.4.tar.gz)
+ORIGINAL_SOURCE(https://github.com/awslabs/aws-c-event-stream/archive/v0.5.5.tar.gz)
PEERDIR(
contrib/restricted/aws/aws-c-common