aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/restricted/aws/aws-c-io/source/async_stream.c
diff options
context:
space:
mode:
authorthegeorg <thegeorg@yandex-team.com>2025-05-12 15:51:24 +0300
committerthegeorg <thegeorg@yandex-team.com>2025-05-12 16:06:27 +0300
commitd629bb70c8773d2c0c43f5088ddbb5a86d8c37ea (patch)
tree4f678e0d65ad08c800db21c657d3b0f71fafed06 /contrib/restricted/aws/aws-c-io/source/async_stream.c
parent92c4b696d7a1c03d54e13aff7a7c20a078d90dd7 (diff)
downloadydb-d629bb70c8773d2c0c43f5088ddbb5a86d8c37ea.tar.gz
Update contrib/restricted/aws libraries to nixpkgs 24.05
commit_hash:f8083acb039e6005e820cdee77b84e0a6b6c6d6d
Diffstat (limited to 'contrib/restricted/aws/aws-c-io/source/async_stream.c')
-rw-r--r--contrib/restricted/aws/aws-c-io/source/async_stream.c153
1 files changed, 153 insertions, 0 deletions
diff --git a/contrib/restricted/aws/aws-c-io/source/async_stream.c b/contrib/restricted/aws/aws-c-io/source/async_stream.c
new file mode 100644
index 00000000000..6422bb84705
--- /dev/null
+++ b/contrib/restricted/aws/aws-c-io/source/async_stream.c
@@ -0,0 +1,153 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/io/async_stream.h>
+
+#include <aws/common/byte_buf.h>
+#include <aws/io/future.h>
+#include <aws/io/stream.h>
+
+void aws_async_input_stream_init_base(
+ struct aws_async_input_stream *stream,
+ struct aws_allocator *alloc,
+ const struct aws_async_input_stream_vtable *vtable,
+ void *impl) {
+
+ AWS_PRECONDITION(stream);
+ AWS_PRECONDITION(alloc);
+ AWS_PRECONDITION(vtable);
+ AWS_PRECONDITION(vtable->read);
+ AWS_PRECONDITION(vtable->destroy);
+
+ AWS_ZERO_STRUCT(*stream);
+ stream->alloc = alloc;
+ stream->vtable = vtable;
+ stream->impl = impl;
+ aws_ref_count_init(&stream->ref_count, stream, (aws_simple_completion_callback *)vtable->destroy);
+}
+
+struct aws_async_input_stream *aws_async_input_stream_acquire(struct aws_async_input_stream *stream) {
+ if (stream != NULL) {
+ aws_ref_count_acquire(&stream->ref_count);
+ }
+ return stream;
+}
+
+struct aws_async_input_stream *aws_async_input_stream_release(struct aws_async_input_stream *stream) {
+ if (stream) {
+ aws_ref_count_release(&stream->ref_count);
+ }
+ return NULL;
+}
+
+struct aws_future_bool *aws_async_input_stream_read(struct aws_async_input_stream *stream, struct aws_byte_buf *dest) {
+ AWS_PRECONDITION(stream);
+ AWS_PRECONDITION(dest);
+
+ /* Ensure the buffer has space available */
+ if (dest->len == dest->capacity) {
+ struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
+ aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
+ return future;
+ }
+
+ struct aws_future_bool *future = stream->vtable->read(stream, dest);
+ AWS_POSTCONDITION(future != NULL);
+ return future;
+}
+
+/* Data to perform the aws_async_input_stream_read_to_fill() job */
+struct aws_async_input_stream_fill_job {
+ struct aws_allocator *alloc;
+ struct aws_async_input_stream *stream;
+ struct aws_byte_buf *dest;
+ /* Future for each read() step */
+ struct aws_future_bool *read_step_future;
+ /* Future to set when this fill job completes */
+ struct aws_future_bool *on_complete_future;
+};
+
+static void s_async_stream_fill_job_complete(
+ struct aws_async_input_stream_fill_job *fill_job,
+ bool eof,
+ int error_code) {
+
+ if (error_code) {
+ aws_future_bool_set_error(fill_job->on_complete_future, error_code);
+ } else {
+ aws_future_bool_set_result(fill_job->on_complete_future, eof);
+ }
+ aws_future_bool_release(fill_job->on_complete_future);
+ aws_async_input_stream_release(fill_job->stream);
+ aws_mem_release(fill_job->alloc, fill_job);
+}
+
+/* Call read() in a loop.
+ * It would be simpler to set a completion callback for each read() call,
+ * but this risks our call stack growing large if there are many small, synchronous, reads.
+ * So be complicated and loop until a read() ) call is actually async,
+ * and only then set the completion callback (which is this same function, where we resume looping). */
+static void s_async_stream_fill_job_loop(void *user_data) {
+ struct aws_async_input_stream_fill_job *fill_job = user_data;
+
+ while (true) {
+ /* Process read_step_future from previous iteration of loop.
+ * It's NULL the first time the job ever enters the loop.
+ * But it's set in subsequent runs of the loop,
+ * and when this is a read_step_future completion callback. */
+ if (fill_job->read_step_future) {
+ if (aws_future_bool_register_callback_if_not_done(
+ fill_job->read_step_future, s_async_stream_fill_job_loop, fill_job)) {
+
+ /* not done, we'll resume this loop when callback fires */
+ return;
+ }
+
+ /* read_step_future is done */
+ int error_code = aws_future_bool_get_error(fill_job->read_step_future);
+ bool eof = error_code ? false : aws_future_bool_get_result(fill_job->read_step_future);
+ bool reached_capacity = fill_job->dest->len == fill_job->dest->capacity;
+ fill_job->read_step_future = aws_future_bool_release(fill_job->read_step_future); /* release and NULL */
+
+ if (error_code || eof || reached_capacity) {
+ /* job complete! */
+ s_async_stream_fill_job_complete(fill_job, eof, error_code);
+ return;
+ }
+ }
+
+ /* Kick off a read, which may or may not complete async */
+ fill_job->read_step_future = aws_async_input_stream_read(fill_job->stream, fill_job->dest);
+ }
+}
+
+struct aws_future_bool *aws_async_input_stream_read_to_fill(
+ struct aws_async_input_stream *stream,
+ struct aws_byte_buf *dest) {
+
+ AWS_PRECONDITION(stream);
+ AWS_PRECONDITION(dest);
+
+ struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
+
+ /* Ensure the buffer has space available */
+ if (dest->len == dest->capacity) {
+ aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
+ return future;
+ }
+
+ /* Prepare for async job */
+ struct aws_async_input_stream_fill_job *fill_job =
+ aws_mem_calloc(stream->alloc, 1, sizeof(struct aws_async_input_stream_fill_job));
+ fill_job->alloc = stream->alloc;
+ fill_job->stream = aws_async_input_stream_acquire(stream);
+ fill_job->dest = dest;
+ fill_job->on_complete_future = aws_future_bool_acquire(future);
+
+ /* Kick off work */
+ s_async_stream_fill_job_loop(fill_job);
+
+ return future;
+}