diff options
author | thegeorg <[email protected]> | 2025-05-12 15:51:24 +0300 |
---|---|---|
committer | thegeorg <[email protected]> | 2025-05-12 16:06:27 +0300 |
commit | d629bb70c8773d2c0c43f5088ddbb5a86d8c37ea (patch) | |
tree | 4f678e0d65ad08c800db21c657d3b0f71fafed06 /contrib/restricted/aws/aws-c-http/source/h1_connection.c | |
parent | 92c4b696d7a1c03d54e13aff7a7c20a078d90dd7 (diff) |
Update contrib/restricted/aws libraries to nixpkgs 24.05
commit_hash:f8083acb039e6005e820cdee77b84e0a6b6c6d6d
Diffstat (limited to 'contrib/restricted/aws/aws-c-http/source/h1_connection.c')
-rw-r--r-- | contrib/restricted/aws/aws-c-http/source/h1_connection.c | 153 |
1 files changed, 151 insertions, 2 deletions
diff --git a/contrib/restricted/aws/aws-c-http/source/h1_connection.c b/contrib/restricted/aws/aws-c-http/source/h1_connection.c index 3532bb80d94..903cf038144 100644 --- a/contrib/restricted/aws/aws-c-http/source/h1_connection.c +++ b/contrib/restricted/aws/aws-c-http/source/h1_connection.c @@ -11,6 +11,7 @@ #include <aws/http/private/h1_stream.h> #include <aws/http/private/request_response_impl.h> #include <aws/http/status_code.h> +#include <aws/io/event_loop.h> #include <aws/io/logging.h> #include <inttypes.h> @@ -371,6 +372,7 @@ int aws_h1_stream_activate(struct aws_http_stream *stream) { /* connection keeps activated stream alive until stream completes */ aws_atomic_fetch_add(&stream->refcount, 1); + stream->metrics.stream_id = stream->id; if (should_schedule_task) { AWS_LOGF_TRACE( @@ -386,6 +388,34 @@ int aws_h1_stream_activate(struct aws_http_stream *stream) { return AWS_OP_SUCCESS; } +void aws_h1_stream_cancel(struct aws_http_stream *stream, int error_code) { + struct aws_h1_stream *h1_stream = AWS_CONTAINER_OF(stream, struct aws_h1_stream, base); + struct aws_http_connection *base_connection = stream->owning_connection; + struct aws_h1_connection *connection = AWS_CONTAINER_OF(base_connection, struct aws_h1_connection, base); + + { /* BEGIN CRITICAL SECTION */ + aws_h1_connection_lock_synced_data(connection); + if (h1_stream->synced_data.api_state != AWS_H1_STREAM_API_STATE_ACTIVE || + connection->synced_data.is_open == false) { + /* Not active, nothing to cancel. */ + aws_h1_connection_unlock_synced_data(connection); + AWS_LOGF_DEBUG(AWS_LS_HTTP_STREAM, "id=%p: Stream not active, nothing to cancel.", (void *)stream); + return; + } + + aws_h1_connection_unlock_synced_data(connection); + } /* END CRITICAL SECTION */ + AWS_LOGF_INFO( + AWS_LS_HTTP_CONNECTION, + "id=%p: Connection shutting down due to stream=%p cancelled with error code %d (%s).", + (void *)&connection->base, + (void *)stream, + error_code, + aws_error_name(error_code)); + + s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, error_code); +} + struct aws_http_stream *s_make_request( struct aws_http_connection *client_connection, const struct aws_http_make_request_options *options) { @@ -534,6 +564,7 @@ static int s_aws_http1_switch_protocols(struct aws_h1_connection *connection) { static void s_stream_complete(struct aws_h1_stream *stream, int error_code) { struct aws_h1_connection *connection = AWS_CONTAINER_OF(stream->base.owning_connection, struct aws_h1_connection, base); + AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); /* * If this is the end of a successful CONNECT request, mark ourselves as pass-through since the proxy layer @@ -546,6 +577,14 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) { } } + if (stream->base.client_data && stream->base.client_data->response_first_byte_timeout_task.fn != NULL) { + /* There is an outstanding response timeout task, but stream completed, we can cancel it now. We are + * safe to do it as we always on connection thread to schedule the task or cancel it */ + struct aws_event_loop *connection_loop = aws_channel_get_event_loop(connection->base.channel_slot->channel); + /* The task will be zeroed out within the call */ + aws_event_loop_cancel_task(connection_loop, &stream->base.client_data->response_first_byte_timeout_task); + } + if (error_code != AWS_ERROR_SUCCESS) { if (stream->base.client_data && stream->is_incoming_message_done) { /* As a request that finished receiving the response, we ignore error and @@ -633,6 +672,10 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) { aws_h1_chunk_complete_and_destroy(chunk, &stream->base, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED); } + if (stream->base.on_metrics) { + stream->base.on_metrics(&stream->base, &stream->base.metrics, stream->base.user_data); + } + /* Invoke callback and clean up stream. */ if (stream->base.on_complete) { stream->base.on_complete(&stream->base, error_code, stream->base.user_data); @@ -716,6 +759,87 @@ static void s_client_update_incoming_stream_ptr(struct aws_h1_connection *connec s_set_incoming_stream_ptr(connection, desired); } +static void s_http_stream_response_first_byte_timeout_task( + struct aws_task *task, + void *arg, + enum aws_task_status status) { + (void)task; + struct aws_h1_stream *stream = arg; + struct aws_http_connection *connection_base = stream->base.owning_connection; + /* zero-out task to indicate that it's no longer scheduled */ + AWS_ZERO_STRUCT(stream->base.client_data->response_first_byte_timeout_task); + + if (status == AWS_TASK_STATUS_CANCELED) { + return; + } + + struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base); + /* Timeout happened, close the connection */ + uint64_t response_first_byte_timeout_ms = stream->base.client_data->response_first_byte_timeout_ms == 0 + ? connection_base->client_data->response_first_byte_timeout_ms + : stream->base.client_data->response_first_byte_timeout_ms; + AWS_LOGF_INFO( + AWS_LS_HTTP_CONNECTION, + "id=%p: Closing connection as timeout after request sent to the first byte received happened. " + "response_first_byte_timeout_ms is %" PRIu64 ".", + (void *)connection_base, + response_first_byte_timeout_ms); + + /* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */ + s_stop( + connection, + false /*stop_reading*/, + false /*stop_writing*/, + true /*schedule_shutdown*/, + AWS_ERROR_HTTP_RESPONSE_FIRST_BYTE_TIMEOUT); +} + +static void s_set_outgoing_message_done(struct aws_h1_stream *stream) { + struct aws_http_connection *connection = stream->base.owning_connection; + struct aws_channel *channel = aws_http_connection_get_channel(connection); + AWS_ASSERT(aws_channel_thread_is_callers_thread(channel)); + + if (stream->is_outgoing_message_done) { + /* Already did the job */ + return; + } + + stream->is_outgoing_message_done = true; + AWS_ASSERT(stream->base.metrics.send_end_timestamp_ns == -1); + aws_high_res_clock_get_ticks((uint64_t *)&stream->base.metrics.send_end_timestamp_ns); + AWS_ASSERT(stream->base.metrics.send_start_timestamp_ns != -1); + AWS_ASSERT(stream->base.metrics.send_end_timestamp_ns >= stream->base.metrics.send_start_timestamp_ns); + stream->base.metrics.sending_duration_ns = + stream->base.metrics.send_end_timestamp_ns - stream->base.metrics.send_start_timestamp_ns; + if (stream->base.metrics.receive_start_timestamp_ns == -1) { + /* We haven't receive any message, schedule the response timeout task */ + + uint64_t response_first_byte_timeout_ms = 0; + if (stream->base.client_data != NULL && connection->client_data != NULL) { + response_first_byte_timeout_ms = stream->base.client_data->response_first_byte_timeout_ms == 0 + ? connection->client_data->response_first_byte_timeout_ms + : stream->base.client_data->response_first_byte_timeout_ms; + } + if (response_first_byte_timeout_ms != 0) { + /* The task should not be initialized before. */ + AWS_ASSERT(stream->base.client_data->response_first_byte_timeout_task.fn == NULL); + aws_task_init( + &stream->base.client_data->response_first_byte_timeout_task, + s_http_stream_response_first_byte_timeout_task, + stream, + "http_stream_response_first_byte_timeout_task"); + uint64_t now_ns = 0; + aws_channel_current_clock_time(channel, &now_ns); + struct aws_event_loop *connection_loop = aws_channel_get_event_loop(channel); + aws_event_loop_schedule_task_future( + connection_loop, + &stream->base.client_data->response_first_byte_timeout_task, + now_ns + aws_timestamp_convert( + response_first_byte_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL)); + } + } +} + /** * If necessary, update `outgoing_stream` so it is pointing at a stream * with data to send, or NULL if all streams are done sending data. @@ -730,7 +854,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti /* If current stream is done sending data... */ if (current && !aws_h1_encoder_is_message_in_progress(&connection->thread_data.encoder)) { - current->is_outgoing_message_done = true; + s_set_outgoing_message_done(current); /* RFC-7230 section 6.6: Tear-down. * If this was the final stream, don't allows any further streams to be sent */ @@ -801,9 +925,13 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti s_set_outgoing_stream_ptr(connection, current); if (current) { + AWS_ASSERT(current->base.metrics.send_start_timestamp_ns == -1); + aws_high_res_clock_get_ticks((uint64_t *)¤t->base.metrics.send_start_timestamp_ns); + err = aws_h1_encoder_start_message( &connection->thread_data.encoder, ¤t->encoder_message, ¤t->base); (void)err; + AWS_ASSERT(connection->thread_data.encoder.state == AWS_H1_ENCODER_STATE_INIT); AWS_ASSERT(!err); } @@ -1109,7 +1237,7 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void AWS_LS_HTTP_STREAM, "id=%p: Received 'Connection: close' header, no more request data will be sent.", (void *)&incoming_stream->base); - incoming_stream->is_outgoing_message_done = true; + s_set_outgoing_message_done(incoming_stream); } /* Stop writing right now. * Shutdown will be scheduled after we finishing parsing the response */ @@ -1270,6 +1398,13 @@ static int s_decoder_on_done(void *user_data) { /* Otherwise the incoming stream is finished decoding and we will update it if needed */ incoming_stream->is_incoming_message_done = true; + aws_high_res_clock_get_ticks((uint64_t *)&incoming_stream->base.metrics.receive_end_timestamp_ns); + AWS_ASSERT(incoming_stream->base.metrics.receive_start_timestamp_ns != -1); + AWS_ASSERT( + incoming_stream->base.metrics.receive_end_timestamp_ns >= + incoming_stream->base.metrics.receive_start_timestamp_ns); + incoming_stream->base.metrics.receiving_duration_ns = incoming_stream->base.metrics.receive_end_timestamp_ns - + incoming_stream->base.metrics.receive_start_timestamp_ns; /* RFC-7230 section 6.6 * After reading the final message, the connection must not read any more */ @@ -1822,6 +1957,20 @@ static int s_try_process_next_stream_read_message(struct aws_h1_connection *conn bool body_headers_ignored = incoming_stream->base.request_method == AWS_HTTP_METHOD_HEAD; aws_h1_decoder_set_body_headers_ignored(connection->thread_data.incoming_stream_decoder, body_headers_ignored); + if (incoming_stream->base.metrics.receive_start_timestamp_ns == -1) { + /* That's the first time for the stream receives any message */ + aws_high_res_clock_get_ticks((uint64_t *)&incoming_stream->base.metrics.receive_start_timestamp_ns); + if (incoming_stream->base.client_data && + incoming_stream->base.client_data->response_first_byte_timeout_task.fn != NULL) { + /* There is an outstanding response timeout task, as we already received the data, we can cancel it now. We + * are safe to do it as we always on connection thread to schedule the task or cancel it */ + struct aws_event_loop *connection_loop = aws_channel_get_event_loop(connection->base.channel_slot->channel); + /* The task will be zeroed out within the call */ + aws_event_loop_cancel_task( + connection_loop, &incoming_stream->base.client_data->response_first_byte_timeout_task); + } + } + /* As decoder runs, it invokes the internal s_decoder_X callbacks, which in turn invoke user callbacks. * The decoder will stop once it hits the end of the request/response OR the end of the message data. */ if (aws_h1_decode(connection->thread_data.incoming_stream_decoder, &message_cursor)) { |