diff options
author | thegeorg <[email protected]> | 2025-05-12 15:51:24 +0300 |
---|---|---|
committer | thegeorg <[email protected]> | 2025-05-12 16:06:27 +0300 |
commit | d629bb70c8773d2c0c43f5088ddbb5a86d8c37ea (patch) | |
tree | 4f678e0d65ad08c800db21c657d3b0f71fafed06 /contrib/restricted/aws/aws-c-io/source/future.c | |
parent | 92c4b696d7a1c03d54e13aff7a7c20a078d90dd7 (diff) |
Update contrib/restricted/aws libraries to nixpkgs 24.05
commit_hash:f8083acb039e6005e820cdee77b84e0a6b6c6d6d
Diffstat (limited to 'contrib/restricted/aws/aws-c-io/source/future.c')
-rw-r--r-- | contrib/restricted/aws/aws-c-io/source/future.c | 543 |
1 files changed, 543 insertions, 0 deletions
diff --git a/contrib/restricted/aws/aws-c-io/source/future.c b/contrib/restricted/aws/aws-c-io/source/future.c new file mode 100644 index 00000000000..be213184be6 --- /dev/null +++ b/contrib/restricted/aws/aws-c-io/source/future.c @@ -0,0 +1,543 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include <aws/io/future.h> + +#include <aws/common/condition_variable.h> +#include <aws/common/mutex.h> +#include <aws/common/ref_count.h> +#include <aws/common/task_scheduler.h> +#include <aws/io/channel.h> +#include <aws/io/event_loop.h> + +enum aws_future_type { + AWS_FUTURE_T_BY_VALUE, + AWS_FUTURE_T_BY_VALUE_WITH_CLEAN_UP, + AWS_FUTURE_T_POINTER, + AWS_FUTURE_T_POINTER_WITH_DESTROY, + AWS_FUTURE_T_POINTER_WITH_RELEASE, +}; + +struct aws_future_callback_data { + aws_future_callback_fn *fn; + void *user_data; + union aws_future_callback_union { + struct aws_event_loop *event_loop; + struct aws_channel *channel; + } u; + enum aws_future_callback_type { + AWS_FUTURE_IMMEDIATE_CALLBACK, + AWS_FUTURE_EVENT_LOOP_CALLBACK, + AWS_FUTURE_CHANNEL_CALLBACK, + } type; +}; + +/* When allocating aws_future<T> on the heap, we make 1 allocation containing: + * aws_future_impl followed by T */ +struct aws_future_impl { + struct aws_allocator *alloc; + struct aws_ref_count ref_count; + struct aws_mutex lock; + struct aws_condition_variable wait_cvar; + struct aws_future_callback_data callback; + union { + aws_future_impl_result_clean_up_fn *clean_up; + aws_future_impl_result_destroy_fn *destroy; + aws_future_impl_result_release_fn *release; + } result_dtor; + int error_code; + /* sum of bit fields should be 32 */ +#define BIT_COUNT_FOR_SIZEOF_RESULT 27 + unsigned int sizeof_result : BIT_COUNT_FOR_SIZEOF_RESULT; + unsigned int type : 3; /* aws_future_type */ + unsigned int is_done : 1; + unsigned int owns_result : 1; +}; + +static void s_future_impl_result_dtor(struct aws_future_impl *future, void *result_addr) { + switch (future->type) { + case AWS_FUTURE_T_BY_VALUE_WITH_CLEAN_UP: { + future->result_dtor.clean_up(result_addr); + break; + } break; + + case AWS_FUTURE_T_POINTER_WITH_DESTROY: { + void *result = *(void **)result_addr; + if (result) { + future->result_dtor.destroy(result); + } + } break; + + case AWS_FUTURE_T_POINTER_WITH_RELEASE: { + void *result = *(void **)result_addr; + if (result) { + future->result_dtor.release(result); + } + } break; + + default: + break; + } +} + +static void s_future_impl_destroy(void *user_data) { + struct aws_future_impl *future = user_data; + if (future->owns_result && !future->error_code) { + s_future_impl_result_dtor(future, aws_future_impl_get_result_address(future)); + } + aws_condition_variable_clean_up(&future->wait_cvar); + aws_mutex_clean_up(&future->lock); + aws_mem_release(future->alloc, future); +} + +static struct aws_future_impl *s_future_impl_new(struct aws_allocator *alloc, size_t sizeof_result) { + size_t total_size = sizeof(struct aws_future_impl) + sizeof_result; + struct aws_future_impl *future = aws_mem_calloc(alloc, 1, total_size); + future->alloc = alloc; + + /* we store sizeof_result in a bit field, ensure the number will fit */ + AWS_ASSERT(sizeof_result <= (UINT_MAX >> (32 - BIT_COUNT_FOR_SIZEOF_RESULT))); + future->sizeof_result = (unsigned int)sizeof_result; + + aws_ref_count_init(&future->ref_count, future, s_future_impl_destroy); + aws_mutex_init(&future->lock); + aws_condition_variable_init(&future->wait_cvar); + return future; +} + +struct aws_future_impl *aws_future_impl_new_by_value(struct aws_allocator *alloc, size_t sizeof_result) { + struct aws_future_impl *future = s_future_impl_new(alloc, sizeof_result); + future->type = AWS_FUTURE_T_BY_VALUE; + return future; +} + +struct aws_future_impl *aws_future_impl_new_by_value_with_clean_up( + struct aws_allocator *alloc, + size_t sizeof_result, + aws_future_impl_result_clean_up_fn *result_clean_up) { + + AWS_ASSERT(result_clean_up); + struct aws_future_impl *future = s_future_impl_new(alloc, sizeof_result); + future->type = AWS_FUTURE_T_BY_VALUE_WITH_CLEAN_UP; + future->result_dtor.clean_up = result_clean_up; + return future; +} + +struct aws_future_impl *aws_future_impl_new_pointer(struct aws_allocator *alloc) { + struct aws_future_impl *future = s_future_impl_new(alloc, sizeof(void *)); + future->type = AWS_FUTURE_T_POINTER; + return future; +} + +struct aws_future_impl *aws_future_impl_new_pointer_with_destroy( + struct aws_allocator *alloc, + aws_future_impl_result_destroy_fn *result_destroy) { + + AWS_ASSERT(result_destroy); + struct aws_future_impl *future = s_future_impl_new(alloc, sizeof(void *)); + future->type = AWS_FUTURE_T_POINTER_WITH_DESTROY; + future->result_dtor.destroy = result_destroy; + return future; +} + +struct aws_future_impl *aws_future_impl_new_pointer_with_release( + struct aws_allocator *alloc, + aws_future_impl_result_release_fn *result_release) { + + AWS_ASSERT(result_release); + struct aws_future_impl *future = s_future_impl_new(alloc, sizeof(void *)); + future->type = AWS_FUTURE_T_POINTER_WITH_RELEASE; + future->result_dtor.release = result_release; + return future; +} + +struct aws_future_impl *aws_future_impl_release(struct aws_future_impl *future) { + if (future != NULL) { + aws_ref_count_release(&future->ref_count); + } + return NULL; +} + +struct aws_future_impl *aws_future_impl_acquire(struct aws_future_impl *future) { + if (future != NULL) { + aws_ref_count_acquire(&future->ref_count); + } + return future; +} + +bool aws_future_impl_is_done(const struct aws_future_impl *future) { + AWS_ASSERT(future); + + /* this function is conceptually const, but we need to hold the lock a moment */ + struct aws_mutex *mutable_lock = (struct aws_mutex *)&future->lock; + + /* BEGIN CRITICAL SECTION */ + aws_mutex_lock(mutable_lock); + bool is_done = future->is_done != 0; + aws_mutex_unlock(mutable_lock); + /* END CRITICAL SECTION */ + + return is_done; +} + +int aws_future_impl_get_error(const struct aws_future_impl *future) { + AWS_ASSERT(future != NULL); + /* not bothering with lock, none of this can change after future is done */ + AWS_FATAL_ASSERT(future->is_done && "Cannot get error before future is done"); + return future->error_code; +} + +void *aws_future_impl_get_result_address(const struct aws_future_impl *future) { + AWS_ASSERT(future != NULL); + /* not bothering with lock, none of this can change after future is done */ + AWS_FATAL_ASSERT(future->is_done && "Cannot get result before future is done"); + AWS_FATAL_ASSERT(!future->error_code && "Cannot get result from future that failed with an error"); + AWS_FATAL_ASSERT(future->owns_result && "Result was already moved from future"); + + const struct aws_future_impl *address_of_memory_after_this_struct = future + 1; + void *result_addr = (void *)address_of_memory_after_this_struct; + return result_addr; +} + +void aws_future_impl_get_result_by_move(struct aws_future_impl *future, void *dst_address) { + void *result_addr = aws_future_impl_get_result_address(future); + memcpy(dst_address, result_addr, future->sizeof_result); + memset(result_addr, 0, future->sizeof_result); + future->owns_result = false; +} + +/* Data for invoking callback as a task on an event-loop */ +struct aws_future_event_loop_callback_job { + struct aws_allocator *alloc; + struct aws_task task; + struct aws_event_loop *event_loop; + aws_future_callback_fn *callback; + void *user_data; +}; + +static void s_future_impl_event_loop_callback_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + struct aws_future_event_loop_callback_job *job = arg; + job->callback(job->user_data); + // TODO: aws_event_loop_release(job->event_loop); + aws_mem_release(job->alloc, job); +} + +/* Data for invoking callback as a task on an aws_channel */ +struct aws_future_channel_callback_job { + struct aws_allocator *alloc; + struct aws_channel_task task; + struct aws_channel *channel; + aws_future_callback_fn *callback; + void *user_data; +}; + +static void s_future_impl_channel_callback_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + struct aws_future_channel_callback_job *job = arg; + job->callback(job->user_data); + aws_channel_release_hold(job->channel); + aws_mem_release(job->alloc, job); +} + +static void s_future_impl_invoke_callback(struct aws_future_callback_data *callback, struct aws_allocator *alloc) { + AWS_ASSERT(callback->fn); + + switch (callback->type) { + case AWS_FUTURE_IMMEDIATE_CALLBACK: { + callback->fn(callback->user_data); + } break; + + case AWS_FUTURE_EVENT_LOOP_CALLBACK: { + /* Schedule the callback as a task on the event-loop */ + struct aws_future_event_loop_callback_job *job = + aws_mem_calloc(alloc, 1, sizeof(struct aws_future_event_loop_callback_job)); + job->alloc = alloc; + aws_task_init(&job->task, s_future_impl_event_loop_callback_task, job, "aws_future_event_loop_callback"); + job->event_loop = callback->u.event_loop; + job->callback = callback->fn; + job->user_data = callback->user_data; + + aws_event_loop_schedule_task_now(callback->u.event_loop, &job->task); + } break; + + case AWS_FUTURE_CHANNEL_CALLBACK: { + /* Schedule the callback as a task on the channel */ + struct aws_future_channel_callback_job *job = + aws_mem_calloc(alloc, 1, sizeof(struct aws_future_channel_callback_job)); + job->alloc = alloc; + aws_channel_task_init(&job->task, s_future_impl_channel_callback_task, job, "aws_future_channel_callback"); + job->channel = callback->u.channel; + job->callback = callback->fn; + job->user_data = callback->user_data; + + aws_channel_schedule_task_now(callback->u.channel, &job->task); + } break; + } +} + +static void s_future_impl_set_done(struct aws_future_impl *future, void *src_address, int error_code) { + bool is_error = error_code != 0; + + /* BEGIN CRITICAL SECTION */ + aws_mutex_lock(&future->lock); + + struct aws_future_callback_data callback = future->callback; + + bool first_time = !future->is_done; + if (first_time) { + future->is_done = true; + AWS_ZERO_STRUCT(future->callback); + if (is_error) { + future->error_code = error_code; + } else { + future->owns_result = true; + AWS_FATAL_ASSERT(src_address != NULL); + memcpy(aws_future_impl_get_result_address(future), src_address, future->sizeof_result); + } + + aws_condition_variable_notify_all(&future->wait_cvar); + } + + aws_mutex_unlock(&future->lock); + /* END CRITICAL SECTION */ + + if (first_time) { + /* if callback was registered, invoke it now, outside of critical section to avoid deadlock */ + if (callback.fn != NULL) { + s_future_impl_invoke_callback(&callback, future->alloc); + } + } else if (!error_code) { + /* future was already done, so just destroy this newer result */ + s_future_impl_result_dtor(future, src_address); + } +} + +void aws_future_impl_set_error(struct aws_future_impl *future, int error_code) { + AWS_ASSERT(future); + + /* handle recoverable usage error */ + AWS_ASSERT(error_code != 0); + if (AWS_UNLIKELY(error_code == 0)) { + error_code = AWS_ERROR_UNKNOWN; + } + + s_future_impl_set_done(future, NULL /*src_address*/, error_code); +} + +void aws_future_impl_set_result_by_move(struct aws_future_impl *future, void *src_address) { + AWS_ASSERT(future); + AWS_ASSERT(src_address); + s_future_impl_set_done(future, src_address, 0 /*error_code*/); + + /* the future takes ownership of the result. + * zero out memory at the src_address to reinforce this transfer of ownership. */ + memset(src_address, 0, future->sizeof_result); +} + +/* Returns true if callback was registered, or false if callback was ignored + * because the the future is already done and invoke_if_already_done==false */ +static bool s_future_impl_register_callback( + struct aws_future_impl *future, + struct aws_future_callback_data *callback, + bool invoke_if_already_done) { + + /* BEGIN CRITICAL SECTION */ + aws_mutex_lock(&future->lock); + + AWS_FATAL_ASSERT(future->callback.fn == NULL && "Future done callback must only be set once"); + + bool already_done = future->is_done != 0; + + /* if not done, store callback for later */ + if (!already_done) { + future->callback = *callback; + } + + aws_mutex_unlock(&future->lock); + /* END CRITICAL SECTION */ + + /* if already done, invoke callback now */ + if (already_done && invoke_if_already_done) { + s_future_impl_invoke_callback(callback, future->alloc); + } + + return !already_done || invoke_if_already_done; +} + +void aws_future_impl_register_callback( + struct aws_future_impl *future, + aws_future_callback_fn *on_done, + void *user_data) { + + AWS_ASSERT(future); + AWS_ASSERT(on_done); + + struct aws_future_callback_data callback = { + .fn = on_done, + .user_data = user_data, + .type = AWS_FUTURE_IMMEDIATE_CALLBACK, + }; + s_future_impl_register_callback(future, &callback, true /*invoke_if_already_done*/); +} + +bool aws_future_impl_register_callback_if_not_done( + struct aws_future_impl *future, + aws_future_callback_fn *on_done, + void *user_data) { + + AWS_ASSERT(future); + AWS_ASSERT(on_done); + + struct aws_future_callback_data callback = { + .fn = on_done, + .user_data = user_data, + .type = AWS_FUTURE_IMMEDIATE_CALLBACK, + }; + return s_future_impl_register_callback(future, &callback, false /*invoke_if_already_done*/); +} + +void aws_future_impl_register_event_loop_callback( + struct aws_future_impl *future, + struct aws_event_loop *event_loop, + aws_future_callback_fn *on_done, + void *user_data) { + + AWS_ASSERT(future); + AWS_ASSERT(event_loop); + AWS_ASSERT(on_done); + + // TODO: aws_event_loop_acquire(event_loop); + + struct aws_future_callback_data callback = { + .fn = on_done, + .user_data = user_data, + .type = AWS_FUTURE_EVENT_LOOP_CALLBACK, + .u = {.event_loop = event_loop}, + }; + s_future_impl_register_callback(future, &callback, true /*invoke_if_already_done*/); +} + +void aws_future_impl_register_channel_callback( + struct aws_future_impl *future, + struct aws_channel *channel, + aws_future_callback_fn *on_done, + void *user_data) { + + AWS_ASSERT(future); + AWS_ASSERT(channel); + AWS_ASSERT(on_done); + + aws_channel_acquire_hold(channel); + + struct aws_future_callback_data callback = { + .fn = on_done, + .user_data = user_data, + .type = AWS_FUTURE_CHANNEL_CALLBACK, + .u = {.channel = channel}, + }; + s_future_impl_register_callback(future, &callback, true /*invoke_if_already_done*/); +} + +static bool s_future_impl_is_done_pred(void *user_data) { + struct aws_future_impl *future = user_data; + return future->is_done != 0; +} + +bool aws_future_impl_wait(const struct aws_future_impl *future, uint64_t timeout_ns) { + AWS_ASSERT(future); + + /* this function is conceptually const, but we need to use synchronization primitives */ + struct aws_future_impl *mutable_future = (struct aws_future_impl *)future; + + /* BEGIN CRITICAL SECTION */ + aws_mutex_lock(&mutable_future->lock); + + bool is_done = aws_condition_variable_wait_for_pred( + &mutable_future->wait_cvar, + &mutable_future->lock, + (int64_t)timeout_ns, + s_future_impl_is_done_pred, + mutable_future) == AWS_OP_SUCCESS; + + aws_mutex_unlock(&mutable_future->lock); + /* END CRITICAL SECTION */ + + return is_done; +} + +// AWS_FUTURE_T_BY_VALUE_IMPLEMENTATION(aws_future_bool, bool) +struct aws_future_bool *aws_future_bool_new(struct aws_allocator *alloc) { + return (struct aws_future_bool *)aws_future_impl_new_by_value(alloc, sizeof(_Bool)); +} +void aws_future_bool_set_result(struct aws_future_bool *future, _Bool result) { + aws_future_impl_set_result_by_move((struct aws_future_impl *)future, &result); +} +_Bool aws_future_bool_get_result(const struct aws_future_bool *future) { + return *(_Bool *)aws_future_impl_get_result_address((const struct aws_future_impl *)future); +} +struct aws_future_bool *aws_future_bool_acquire(struct aws_future_bool *future) { + return (struct aws_future_bool *)aws_future_impl_acquire((struct aws_future_impl *)future); +} +struct aws_future_bool *aws_future_bool_release(struct aws_future_bool *future) { + return (struct aws_future_bool *)aws_future_impl_release((struct aws_future_impl *)future); +} +void aws_future_bool_set_error(struct aws_future_bool *future, int error_code) { + aws_future_impl_set_error((struct aws_future_impl *)future, error_code); +} +_Bool aws_future_bool_is_done(const struct aws_future_bool *future) { + return aws_future_impl_is_done((const struct aws_future_impl *)future); +} +int aws_future_bool_get_error(const struct aws_future_bool *future) { + return aws_future_impl_get_error((const struct aws_future_impl *)future); +} +void aws_future_bool_register_callback( + struct aws_future_bool *future, + aws_future_callback_fn *on_done, + void *user_data) { + aws_future_impl_register_callback((struct aws_future_impl *)future, on_done, user_data); +} +_Bool aws_future_bool_register_callback_if_not_done( + struct aws_future_bool *future, + aws_future_callback_fn *on_done, + void *user_data) { + return aws_future_impl_register_callback_if_not_done((struct aws_future_impl *)future, on_done, user_data); +} +void aws_future_bool_register_event_loop_callback( + struct aws_future_bool *future, + struct aws_event_loop *event_loop, + aws_future_callback_fn *on_done, + void *user_data) { + aws_future_impl_register_event_loop_callback((struct aws_future_impl *)future, event_loop, on_done, user_data); +} +void aws_future_bool_register_channel_callback( + struct aws_future_bool *future, + struct aws_channel *channel, + aws_future_callback_fn *on_done, + void *user_data) { + aws_future_impl_register_channel_callback((struct aws_future_impl *)future, channel, on_done, user_data); +} +_Bool aws_future_bool_wait(struct aws_future_bool *future, uint64_t timeout_ns) { + return aws_future_impl_wait((struct aws_future_impl *)future, timeout_ns); +} + +AWS_FUTURE_T_BY_VALUE_IMPLEMENTATION(aws_future_size, size_t) + +/** + * aws_future<void> + */ +AWS_FUTURE_T_IMPLEMENTATION_BEGIN(aws_future_void) + +struct aws_future_void *aws_future_void_new(struct aws_allocator *alloc) { + /* Use aws_future<bool> under the hood, to avoid edge-cases with 0-sized result */ + return (struct aws_future_void *)aws_future_bool_new(alloc); +} + +void aws_future_void_set_result(struct aws_future_void *future) { + aws_future_bool_set_result((struct aws_future_bool *)future, false); +} + +AWS_FUTURE_T_IMPLEMENTATION_END(aws_future_void) |