summaryrefslogtreecommitdiffstats
path: root/contrib/restricted/aws/aws-c-http/source/h1_connection.c
diff options
context:
space:
mode:
authorthegeorg <[email protected]>2025-05-12 15:51:24 +0300
committerthegeorg <[email protected]>2025-05-12 16:06:27 +0300
commitd629bb70c8773d2c0c43f5088ddbb5a86d8c37ea (patch)
tree4f678e0d65ad08c800db21c657d3b0f71fafed06 /contrib/restricted/aws/aws-c-http/source/h1_connection.c
parent92c4b696d7a1c03d54e13aff7a7c20a078d90dd7 (diff)
Update contrib/restricted/aws libraries to nixpkgs 24.05
commit_hash:f8083acb039e6005e820cdee77b84e0a6b6c6d6d
Diffstat (limited to 'contrib/restricted/aws/aws-c-http/source/h1_connection.c')
-rw-r--r--contrib/restricted/aws/aws-c-http/source/h1_connection.c153
1 files changed, 151 insertions, 2 deletions
diff --git a/contrib/restricted/aws/aws-c-http/source/h1_connection.c b/contrib/restricted/aws/aws-c-http/source/h1_connection.c
index 3532bb80d94..903cf038144 100644
--- a/contrib/restricted/aws/aws-c-http/source/h1_connection.c
+++ b/contrib/restricted/aws/aws-c-http/source/h1_connection.c
@@ -11,6 +11,7 @@
#include <aws/http/private/h1_stream.h>
#include <aws/http/private/request_response_impl.h>
#include <aws/http/status_code.h>
+#include <aws/io/event_loop.h>
#include <aws/io/logging.h>
#include <inttypes.h>
@@ -371,6 +372,7 @@ int aws_h1_stream_activate(struct aws_http_stream *stream) {
/* connection keeps activated stream alive until stream completes */
aws_atomic_fetch_add(&stream->refcount, 1);
+ stream->metrics.stream_id = stream->id;
if (should_schedule_task) {
AWS_LOGF_TRACE(
@@ -386,6 +388,34 @@ int aws_h1_stream_activate(struct aws_http_stream *stream) {
return AWS_OP_SUCCESS;
}
+void aws_h1_stream_cancel(struct aws_http_stream *stream, int error_code) {
+ struct aws_h1_stream *h1_stream = AWS_CONTAINER_OF(stream, struct aws_h1_stream, base);
+ struct aws_http_connection *base_connection = stream->owning_connection;
+ struct aws_h1_connection *connection = AWS_CONTAINER_OF(base_connection, struct aws_h1_connection, base);
+
+ { /* BEGIN CRITICAL SECTION */
+ aws_h1_connection_lock_synced_data(connection);
+ if (h1_stream->synced_data.api_state != AWS_H1_STREAM_API_STATE_ACTIVE ||
+ connection->synced_data.is_open == false) {
+ /* Not active, nothing to cancel. */
+ aws_h1_connection_unlock_synced_data(connection);
+ AWS_LOGF_DEBUG(AWS_LS_HTTP_STREAM, "id=%p: Stream not active, nothing to cancel.", (void *)stream);
+ return;
+ }
+
+ aws_h1_connection_unlock_synced_data(connection);
+ } /* END CRITICAL SECTION */
+ AWS_LOGF_INFO(
+ AWS_LS_HTTP_CONNECTION,
+ "id=%p: Connection shutting down due to stream=%p cancelled with error code %d (%s).",
+ (void *)&connection->base,
+ (void *)stream,
+ error_code,
+ aws_error_name(error_code));
+
+ s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, error_code);
+}
+
struct aws_http_stream *s_make_request(
struct aws_http_connection *client_connection,
const struct aws_http_make_request_options *options) {
@@ -534,6 +564,7 @@ static int s_aws_http1_switch_protocols(struct aws_h1_connection *connection) {
static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
struct aws_h1_connection *connection =
AWS_CONTAINER_OF(stream->base.owning_connection, struct aws_h1_connection, base);
+ AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
/*
* If this is the end of a successful CONNECT request, mark ourselves as pass-through since the proxy layer
@@ -546,6 +577,14 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
}
}
+ if (stream->base.client_data && stream->base.client_data->response_first_byte_timeout_task.fn != NULL) {
+ /* There is an outstanding response timeout task, but stream completed, we can cancel it now. We are
+ * safe to do it as we always on connection thread to schedule the task or cancel it */
+ struct aws_event_loop *connection_loop = aws_channel_get_event_loop(connection->base.channel_slot->channel);
+ /* The task will be zeroed out within the call */
+ aws_event_loop_cancel_task(connection_loop, &stream->base.client_data->response_first_byte_timeout_task);
+ }
+
if (error_code != AWS_ERROR_SUCCESS) {
if (stream->base.client_data && stream->is_incoming_message_done) {
/* As a request that finished receiving the response, we ignore error and
@@ -633,6 +672,10 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
aws_h1_chunk_complete_and_destroy(chunk, &stream->base, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED);
}
+ if (stream->base.on_metrics) {
+ stream->base.on_metrics(&stream->base, &stream->base.metrics, stream->base.user_data);
+ }
+
/* Invoke callback and clean up stream. */
if (stream->base.on_complete) {
stream->base.on_complete(&stream->base, error_code, stream->base.user_data);
@@ -716,6 +759,87 @@ static void s_client_update_incoming_stream_ptr(struct aws_h1_connection *connec
s_set_incoming_stream_ptr(connection, desired);
}
+static void s_http_stream_response_first_byte_timeout_task(
+ struct aws_task *task,
+ void *arg,
+ enum aws_task_status status) {
+ (void)task;
+ struct aws_h1_stream *stream = arg;
+ struct aws_http_connection *connection_base = stream->base.owning_connection;
+ /* zero-out task to indicate that it's no longer scheduled */
+ AWS_ZERO_STRUCT(stream->base.client_data->response_first_byte_timeout_task);
+
+ if (status == AWS_TASK_STATUS_CANCELED) {
+ return;
+ }
+
+ struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);
+ /* Timeout happened, close the connection */
+ uint64_t response_first_byte_timeout_ms = stream->base.client_data->response_first_byte_timeout_ms == 0
+ ? connection_base->client_data->response_first_byte_timeout_ms
+ : stream->base.client_data->response_first_byte_timeout_ms;
+ AWS_LOGF_INFO(
+ AWS_LS_HTTP_CONNECTION,
+ "id=%p: Closing connection as timeout after request sent to the first byte received happened. "
+ "response_first_byte_timeout_ms is %" PRIu64 ".",
+ (void *)connection_base,
+ response_first_byte_timeout_ms);
+
+ /* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */
+ s_stop(
+ connection,
+ false /*stop_reading*/,
+ false /*stop_writing*/,
+ true /*schedule_shutdown*/,
+ AWS_ERROR_HTTP_RESPONSE_FIRST_BYTE_TIMEOUT);
+}
+
+static void s_set_outgoing_message_done(struct aws_h1_stream *stream) {
+ struct aws_http_connection *connection = stream->base.owning_connection;
+ struct aws_channel *channel = aws_http_connection_get_channel(connection);
+ AWS_ASSERT(aws_channel_thread_is_callers_thread(channel));
+
+ if (stream->is_outgoing_message_done) {
+ /* Already did the job */
+ return;
+ }
+
+ stream->is_outgoing_message_done = true;
+ AWS_ASSERT(stream->base.metrics.send_end_timestamp_ns == -1);
+ aws_high_res_clock_get_ticks((uint64_t *)&stream->base.metrics.send_end_timestamp_ns);
+ AWS_ASSERT(stream->base.metrics.send_start_timestamp_ns != -1);
+ AWS_ASSERT(stream->base.metrics.send_end_timestamp_ns >= stream->base.metrics.send_start_timestamp_ns);
+ stream->base.metrics.sending_duration_ns =
+ stream->base.metrics.send_end_timestamp_ns - stream->base.metrics.send_start_timestamp_ns;
+ if (stream->base.metrics.receive_start_timestamp_ns == -1) {
+ /* We haven't receive any message, schedule the response timeout task */
+
+ uint64_t response_first_byte_timeout_ms = 0;
+ if (stream->base.client_data != NULL && connection->client_data != NULL) {
+ response_first_byte_timeout_ms = stream->base.client_data->response_first_byte_timeout_ms == 0
+ ? connection->client_data->response_first_byte_timeout_ms
+ : stream->base.client_data->response_first_byte_timeout_ms;
+ }
+ if (response_first_byte_timeout_ms != 0) {
+ /* The task should not be initialized before. */
+ AWS_ASSERT(stream->base.client_data->response_first_byte_timeout_task.fn == NULL);
+ aws_task_init(
+ &stream->base.client_data->response_first_byte_timeout_task,
+ s_http_stream_response_first_byte_timeout_task,
+ stream,
+ "http_stream_response_first_byte_timeout_task");
+ uint64_t now_ns = 0;
+ aws_channel_current_clock_time(channel, &now_ns);
+ struct aws_event_loop *connection_loop = aws_channel_get_event_loop(channel);
+ aws_event_loop_schedule_task_future(
+ connection_loop,
+ &stream->base.client_data->response_first_byte_timeout_task,
+ now_ns + aws_timestamp_convert(
+ response_first_byte_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL));
+ }
+ }
+}
+
/**
* If necessary, update `outgoing_stream` so it is pointing at a stream
* with data to send, or NULL if all streams are done sending data.
@@ -730,7 +854,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
/* If current stream is done sending data... */
if (current && !aws_h1_encoder_is_message_in_progress(&connection->thread_data.encoder)) {
- current->is_outgoing_message_done = true;
+ s_set_outgoing_message_done(current);
/* RFC-7230 section 6.6: Tear-down.
* If this was the final stream, don't allows any further streams to be sent */
@@ -801,9 +925,13 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
s_set_outgoing_stream_ptr(connection, current);
if (current) {
+ AWS_ASSERT(current->base.metrics.send_start_timestamp_ns == -1);
+ aws_high_res_clock_get_ticks((uint64_t *)&current->base.metrics.send_start_timestamp_ns);
+
err = aws_h1_encoder_start_message(
&connection->thread_data.encoder, &current->encoder_message, &current->base);
(void)err;
+ AWS_ASSERT(connection->thread_data.encoder.state == AWS_H1_ENCODER_STATE_INIT);
AWS_ASSERT(!err);
}
@@ -1109,7 +1237,7 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void
AWS_LS_HTTP_STREAM,
"id=%p: Received 'Connection: close' header, no more request data will be sent.",
(void *)&incoming_stream->base);
- incoming_stream->is_outgoing_message_done = true;
+ s_set_outgoing_message_done(incoming_stream);
}
/* Stop writing right now.
* Shutdown will be scheduled after we finishing parsing the response */
@@ -1270,6 +1398,13 @@ static int s_decoder_on_done(void *user_data) {
/* Otherwise the incoming stream is finished decoding and we will update it if needed */
incoming_stream->is_incoming_message_done = true;
+ aws_high_res_clock_get_ticks((uint64_t *)&incoming_stream->base.metrics.receive_end_timestamp_ns);
+ AWS_ASSERT(incoming_stream->base.metrics.receive_start_timestamp_ns != -1);
+ AWS_ASSERT(
+ incoming_stream->base.metrics.receive_end_timestamp_ns >=
+ incoming_stream->base.metrics.receive_start_timestamp_ns);
+ incoming_stream->base.metrics.receiving_duration_ns = incoming_stream->base.metrics.receive_end_timestamp_ns -
+ incoming_stream->base.metrics.receive_start_timestamp_ns;
/* RFC-7230 section 6.6
* After reading the final message, the connection must not read any more */
@@ -1822,6 +1957,20 @@ static int s_try_process_next_stream_read_message(struct aws_h1_connection *conn
bool body_headers_ignored = incoming_stream->base.request_method == AWS_HTTP_METHOD_HEAD;
aws_h1_decoder_set_body_headers_ignored(connection->thread_data.incoming_stream_decoder, body_headers_ignored);
+ if (incoming_stream->base.metrics.receive_start_timestamp_ns == -1) {
+ /* That's the first time for the stream receives any message */
+ aws_high_res_clock_get_ticks((uint64_t *)&incoming_stream->base.metrics.receive_start_timestamp_ns);
+ if (incoming_stream->base.client_data &&
+ incoming_stream->base.client_data->response_first_byte_timeout_task.fn != NULL) {
+ /* There is an outstanding response timeout task, as we already received the data, we can cancel it now. We
+ * are safe to do it as we always on connection thread to schedule the task or cancel it */
+ struct aws_event_loop *connection_loop = aws_channel_get_event_loop(connection->base.channel_slot->channel);
+ /* The task will be zeroed out within the call */
+ aws_event_loop_cancel_task(
+ connection_loop, &incoming_stream->base.client_data->response_first_byte_timeout_task);
+ }
+ }
+
/* As decoder runs, it invokes the internal s_decoder_X callbacks, which in turn invoke user callbacks.
* The decoder will stop once it hits the end of the request/response OR the end of the message data. */
if (aws_h1_decode(connection->thread_data.incoming_stream_decoder, &message_cursor)) {