diff options
author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/restricted/aws/aws-c-io/source/bsd |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/restricted/aws/aws-c-io/source/bsd')
-rw-r--r-- | contrib/restricted/aws/aws-c-io/source/bsd/kqueue_event_loop.c | 960 |
1 files changed, 960 insertions, 0 deletions
diff --git a/contrib/restricted/aws/aws-c-io/source/bsd/kqueue_event_loop.c b/contrib/restricted/aws/aws-c-io/source/bsd/kqueue_event_loop.c new file mode 100644 index 00000000000..3de882d0451 --- /dev/null +++ b/contrib/restricted/aws/aws-c-io/source/bsd/kqueue_event_loop.c @@ -0,0 +1,960 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include <aws/io/event_loop.h> + +#include <aws/io/logging.h> + +#include <aws/common/atomics.h> +#include <aws/common/clock.h> +#include <aws/common/mutex.h> +#include <aws/common/task_scheduler.h> +#include <aws/common/thread.h> + +#if defined(__FreeBSD__) || defined(__NetBSD__) +# define __BSD_VISIBLE 1 +# include <sys/types.h> +#endif + +#include <sys/event.h> + +#include <aws/io/io.h> +#include <limits.h> +#include <unistd.h> + +static void s_destroy(struct aws_event_loop *event_loop); +static int s_run(struct aws_event_loop *event_loop); +static int s_stop(struct aws_event_loop *event_loop); +static int s_wait_for_stop_completion(struct aws_event_loop *event_loop); +static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task); +static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos); +static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task); +static int s_subscribe_to_io_events( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + aws_event_loop_on_event_fn *on_event, + void *user_data); +static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle); +static void s_free_io_event_resources(void *user_data); +static bool s_is_event_thread(struct aws_event_loop *event_loop); + +static void s_event_thread_main(void *user_data); + +int aws_open_nonblocking_posix_pipe(int pipe_fds[2]); + +enum event_thread_state { + EVENT_THREAD_STATE_READY_TO_RUN, + EVENT_THREAD_STATE_RUNNING, + EVENT_THREAD_STATE_STOPPING, +}; + +enum pipe_fd_index { + READ_FD, + WRITE_FD, +}; + +struct kqueue_loop { + /* thread_created_on is the handle to the event loop thread. */ + struct aws_thread thread_created_on; + /* thread_joined_to is used by the thread destroying the event loop. */ + aws_thread_id_t thread_joined_to; + /* running_thread_id is NULL if the event loop thread is stopped or points-to the thread_id of the thread running + * the event loop (either thread_created_on or thread_joined_to). Atomic because of concurrent writes (e.g., + * run/stop) and reads (e.g., is_event_loop_thread). + * An aws_thread_id_t variable itself cannot be atomic because it is an opaque type that is platform-dependent. */ + struct aws_atomic_var running_thread_id; + int kq_fd; /* kqueue file descriptor */ + + /* Pipe for signaling to event-thread that cross_thread_data has changed. */ + int cross_thread_signal_pipe[2]; + + /* cross_thread_data holds things that must be communicated across threads. + * When the event-thread is running, the mutex must be locked while anyone touches anything in cross_thread_data. + * If this data is modified outside the thread, the thread is signaled via activity on a pipe. */ + struct { + struct aws_mutex mutex; + bool thread_signaled; /* whether thread has been signaled about changes to cross_thread_data */ + struct aws_linked_list tasks_to_schedule; + enum event_thread_state state; + } cross_thread_data; + + /* thread_data holds things which, when the event-thread is running, may only be touched by the thread */ + struct { + struct aws_task_scheduler scheduler; + + int connected_handle_count; + + /* These variables duplicate ones in cross_thread_data. We move values out while holding the mutex and operate + * on them later */ + enum event_thread_state state; + } thread_data; +}; + +/* Data attached to aws_io_handle while the handle is subscribed to io events */ +struct handle_data { + struct aws_io_handle *owner; + struct aws_event_loop *event_loop; + aws_event_loop_on_event_fn *on_event; + void *on_event_user_data; + + int events_subscribed; /* aws_io_event_types this handle should be subscribed to */ + int events_this_loop; /* aws_io_event_types received during current loop of the event-thread */ + + enum { HANDLE_STATE_SUBSCRIBING, HANDLE_STATE_SUBSCRIBED, HANDLE_STATE_UNSUBSCRIBED } state; + + struct aws_task subscribe_task; + struct aws_task cleanup_task; +}; + +enum { + DEFAULT_TIMEOUT_SEC = 100, /* Max kevent() timeout per loop of the event-thread */ + MAX_EVENTS = 100, /* Max kevents to process per loop of the event-thread */ +}; + +struct aws_event_loop_vtable s_kqueue_vtable = { + .destroy = s_destroy, + .run = s_run, + .stop = s_stop, + .wait_for_stop_completion = s_wait_for_stop_completion, + .schedule_task_now = s_schedule_task_now, + .schedule_task_future = s_schedule_task_future, + .subscribe_to_io_events = s_subscribe_to_io_events, + .cancel_task = s_cancel_task, + .unsubscribe_from_io_events = s_unsubscribe_from_io_events, + .free_io_event_resources = s_free_io_event_resources, + .is_on_callers_thread = s_is_event_thread, +}; + +struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, aws_io_clock_fn *clock) { + AWS_ASSERT(alloc); + AWS_ASSERT(clock); + + bool clean_up_event_loop_mem = false; + bool clean_up_event_loop_base = false; + bool clean_up_impl_mem = false; + bool clean_up_thread = false; + bool clean_up_kqueue = false; + bool clean_up_signal_pipe = false; + bool clean_up_signal_kevent = false; + bool clean_up_mutex = false; + + struct aws_event_loop *event_loop = aws_mem_acquire(alloc, sizeof(struct aws_event_loop)); + if (!event_loop) { + return NULL; + } + + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered kqueue", (void *)event_loop); + clean_up_event_loop_mem = true; + + int err = aws_event_loop_init_base(event_loop, alloc, clock); + if (err) { + goto clean_up; + } + clean_up_event_loop_base = true; + + struct kqueue_loop *impl = aws_mem_calloc(alloc, 1, sizeof(struct kqueue_loop)); + if (!impl) { + goto clean_up; + } + /* intialize thread id to NULL. It will be set when the event loop thread starts. */ + aws_atomic_init_ptr(&impl->running_thread_id, NULL); + clean_up_impl_mem = true; + + err = aws_thread_init(&impl->thread_created_on, alloc); + if (err) { + goto clean_up; + } + clean_up_thread = true; + + impl->kq_fd = kqueue(); + if (impl->kq_fd == -1) { + AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open kqueue handle.", (void *)event_loop); + aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); + goto clean_up; + } + clean_up_kqueue = true; + + err = aws_open_nonblocking_posix_pipe(impl->cross_thread_signal_pipe); + if (err) { + AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)event_loop); + goto clean_up; + } + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: pipe descriptors read %d, write %d.", + (void *)event_loop, + impl->cross_thread_signal_pipe[READ_FD], + impl->cross_thread_signal_pipe[WRITE_FD]); + clean_up_signal_pipe = true; + + /* Set up kevent to handle activity on the cross_thread_signal_pipe */ + struct kevent thread_signal_kevent; + EV_SET( + &thread_signal_kevent, + impl->cross_thread_signal_pipe[READ_FD], + EVFILT_READ /*filter*/, + EV_ADD | EV_CLEAR /*flags*/, + 0 /*fflags*/, + 0 /*data*/, + NULL /*udata*/); + + int res = kevent( + impl->kq_fd, + &thread_signal_kevent /*changelist*/, + 1 /*nchanges*/, + NULL /*eventlist*/, + 0 /*nevents*/, + NULL /*timeout*/); + + if (res == -1) { + AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to create cross-thread signal kevent.", (void *)event_loop); + aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); + goto clean_up; + } + clean_up_signal_kevent = true; + + err = aws_mutex_init(&impl->cross_thread_data.mutex); + if (err) { + goto clean_up; + } + clean_up_mutex = true; + + impl->cross_thread_data.thread_signaled = false; + + aws_linked_list_init(&impl->cross_thread_data.tasks_to_schedule); + + impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN; + + err = aws_task_scheduler_init(&impl->thread_data.scheduler, alloc); + if (err) { + goto clean_up; + } + + impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN; + + event_loop->impl_data = impl; + + event_loop->vtable = &s_kqueue_vtable; + + /* success */ + return event_loop; + +clean_up: + if (clean_up_mutex) { + aws_mutex_clean_up(&impl->cross_thread_data.mutex); + } + if (clean_up_signal_kevent) { + thread_signal_kevent.flags = EV_DELETE; + kevent( + impl->kq_fd, + &thread_signal_kevent /*changelist*/, + 1 /*nchanges*/, + NULL /*eventlist*/, + 0 /*nevents*/, + NULL /*timeout*/); + } + if (clean_up_signal_pipe) { + close(impl->cross_thread_signal_pipe[READ_FD]); + close(impl->cross_thread_signal_pipe[WRITE_FD]); + } + if (clean_up_kqueue) { + close(impl->kq_fd); + } + if (clean_up_thread) { + aws_thread_clean_up(&impl->thread_created_on); + } + if (clean_up_impl_mem) { + aws_mem_release(alloc, impl); + } + if (clean_up_event_loop_base) { + aws_event_loop_clean_up_base(event_loop); + } + if (clean_up_event_loop_mem) { + aws_mem_release(alloc, event_loop); + } + return NULL; +} + +static void s_destroy(struct aws_event_loop *event_loop) { + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: destroying event_loop", (void *)event_loop); + struct kqueue_loop *impl = event_loop->impl_data; + + /* Stop the event-thread. This might have already happened. It's safe to call multiple times. */ + s_stop(event_loop); + int err = s_wait_for_stop_completion(event_loop); + if (err) { + AWS_LOGF_WARN( + AWS_LS_IO_EVENT_LOOP, + "id=%p: failed to destroy event-thread, resources have been leaked", + (void *)event_loop); + AWS_ASSERT("Failed to destroy event-thread, resources have been leaked." == NULL); + return; + } + /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */ + impl->thread_joined_to = aws_thread_current_thread_id(); + aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_joined_to); + + /* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop. + * Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */ + + aws_task_scheduler_clean_up(&impl->thread_data.scheduler); /* Tasks in scheduler get cancelled*/ + + while (!aws_linked_list_empty(&impl->cross_thread_data.tasks_to_schedule)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&impl->cross_thread_data.tasks_to_schedule); + struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); + task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); + } + + /* Warn user if aws_io_handle was subscribed, but never unsubscribed. This would cause memory leaks. */ + AWS_ASSERT(impl->thread_data.connected_handle_count == 0); + + /* Clean up everything else */ + aws_mutex_clean_up(&impl->cross_thread_data.mutex); + + struct kevent thread_signal_kevent; + EV_SET( + &thread_signal_kevent, + impl->cross_thread_signal_pipe[READ_FD], + EVFILT_READ /*filter*/, + EV_DELETE /*flags*/, + 0 /*fflags*/, + 0 /*data*/, + NULL /*udata*/); + + kevent( + impl->kq_fd, + &thread_signal_kevent /*changelist*/, + 1 /*nchanges*/, + NULL /*eventlist*/, + 0 /*nevents*/, + NULL /*timeout*/); + + close(impl->cross_thread_signal_pipe[READ_FD]); + close(impl->cross_thread_signal_pipe[WRITE_FD]); + close(impl->kq_fd); + aws_thread_clean_up(&impl->thread_created_on); + aws_mem_release(event_loop->alloc, impl); + aws_event_loop_clean_up_base(event_loop); + aws_mem_release(event_loop->alloc, event_loop); +} + +static int s_run(struct aws_event_loop *event_loop) { + struct kqueue_loop *impl = event_loop->impl_data; + + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: starting event-loop thread.", (void *)event_loop); + /* to re-run, call stop() and wait_for_stop_completion() */ + AWS_ASSERT(impl->cross_thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN); + AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN); + + /* Since thread isn't running it's ok to touch thread_data, + * and it's ok to touch cross_thread_data without locking the mutex */ + impl->cross_thread_data.state = EVENT_THREAD_STATE_RUNNING; + + int err = aws_thread_launch(&impl->thread_created_on, s_event_thread_main, (void *)event_loop, NULL); + if (err) { + AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop); + goto clean_up; + } + + return AWS_OP_SUCCESS; + +clean_up: + impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN; + return AWS_OP_ERR; +} + +/* This function can't fail, we're relying on the thread responding to critical messages (ex: stop thread) */ +void signal_cross_thread_data_changed(struct aws_event_loop *event_loop) { + struct kqueue_loop *impl = event_loop->impl_data; + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: signaling event-loop that cross-thread tasks need to be scheduled.", + (void *)event_loop); + /* Doesn't actually matter what we write, any activity on pipe signals that cross_thread_data has changed, + * If the pipe is full and the write fails, that's fine, the event-thread will get the signal from some previous + * write */ + uint32_t write_whatever = 0xC0FFEE; + write(impl->cross_thread_signal_pipe[WRITE_FD], &write_whatever, sizeof(write_whatever)); +} + +static int s_stop(struct aws_event_loop *event_loop) { + struct kqueue_loop *impl = event_loop->impl_data; + + bool signal_thread = false; + + { /* Begin critical section */ + aws_mutex_lock(&impl->cross_thread_data.mutex); + if (impl->cross_thread_data.state == EVENT_THREAD_STATE_RUNNING) { + impl->cross_thread_data.state = EVENT_THREAD_STATE_STOPPING; + signal_thread = !impl->cross_thread_data.thread_signaled; + impl->cross_thread_data.thread_signaled = true; + } + aws_mutex_unlock(&impl->cross_thread_data.mutex); + } /* End critical section */ + + if (signal_thread) { + signal_cross_thread_data_changed(event_loop); + } + + return AWS_OP_SUCCESS; +} + +static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) { + struct kqueue_loop *impl = event_loop->impl_data; + +#ifdef DEBUG_BUILD + aws_mutex_lock(&impl->cross_thread_data.mutex); + /* call stop() before wait_for_stop_completion() or you'll wait forever */ + AWS_ASSERT(impl->cross_thread_data.state != EVENT_THREAD_STATE_RUNNING); + aws_mutex_unlock(&impl->cross_thread_data.mutex); +#endif + + int err = aws_thread_join(&impl->thread_created_on); + if (err) { + return AWS_OP_ERR; + } + + /* Since thread is no longer running it's ok to touch thread_data, + * and it's ok to touch cross_thread_data without locking the mutex */ + impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN; + impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN; + + return AWS_OP_SUCCESS; +} + +/* Common functionality for "now" and "future" task scheduling. + * If `run_at_nanos` is zero then the task is scheduled as a "now" task. */ +static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) { + AWS_ASSERT(task); + struct kqueue_loop *impl = event_loop->impl_data; + + /* If we're on the event-thread, just schedule it directly */ + if (s_is_event_thread(event_loop)) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: scheduling task %p in-thread for timestamp %llu", + (void *)event_loop, + (void *)task, + (unsigned long long)run_at_nanos); + if (run_at_nanos == 0) { + aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task); + } else { + aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos); + } + return; + } + + /* Otherwise, add it to cross_thread_data.tasks_to_schedule and signal the event-thread to process it */ + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: scheduling task %p cross-thread for timestamp %llu", + (void *)event_loop, + (void *)task, + (unsigned long long)run_at_nanos); + task->timestamp = run_at_nanos; + bool should_signal_thread = false; + + /* Begin critical section */ + aws_mutex_lock(&impl->cross_thread_data.mutex); + aws_linked_list_push_back(&impl->cross_thread_data.tasks_to_schedule, &task->node); + + /* Signal thread that cross_thread_data has changed (unless it's been signaled already) */ + if (!impl->cross_thread_data.thread_signaled) { + should_signal_thread = true; + impl->cross_thread_data.thread_signaled = true; + } + + aws_mutex_unlock(&impl->cross_thread_data.mutex); + /* End critical section */ + + if (should_signal_thread) { + signal_cross_thread_data_changed(event_loop); + } +} + +static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) { + s_schedule_task_common(event_loop, task, 0); /* Zero is used to denote "now" tasks */ +} + +static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) { + s_schedule_task_common(event_loop, task, run_at_nanos); +} + +static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) { + struct kqueue_loop *kqueue_loop = event_loop->impl_data; + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task); + aws_task_scheduler_cancel_task(&kqueue_loop->thread_data.scheduler, task); +} + +/* Scheduled task that connects aws_io_handle with the kqueue */ +static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) { + (void)task; + struct handle_data *handle_data = user_data; + struct aws_event_loop *event_loop = handle_data->event_loop; + struct kqueue_loop *impl = handle_data->event_loop->impl_data; + + impl->thread_data.connected_handle_count++; + + /* if task was cancelled, nothing to do */ + if (status == AWS_TASK_STATUS_CANCELED) { + return; + } + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle_data->owner->data.fd); + + /* If handle was unsubscribed before this task could execute, nothing to do */ + if (handle_data->state == HANDLE_STATE_UNSUBSCRIBED) { + return; + } + + AWS_ASSERT(handle_data->state == HANDLE_STATE_SUBSCRIBING); + + /* In order to monitor both reads and writes, kqueue requires you to add two separate kevents. + * If we're adding two separate kevents, but one of those fails, we need to remove the other kevent. + * Therefore we use the EV_RECEIPT flag. This causes kevent() to tell whether each EV_ADD succeeded, + * rather than the usual behavior of telling us about recent events. */ + struct kevent changelist[2]; + AWS_ZERO_ARRAY(changelist); + + int changelist_size = 0; + + if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) { + EV_SET( + &changelist[changelist_size++], + handle_data->owner->data.fd, + EVFILT_READ /*filter*/, + EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/, + 0 /*fflags*/, + 0 /*data*/, + handle_data /*udata*/); + } + if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) { + EV_SET( + &changelist[changelist_size++], + handle_data->owner->data.fd, + EVFILT_WRITE /*filter*/, + EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/, + 0 /*fflags*/, + 0 /*data*/, + handle_data /*udata*/); + } + + int num_events = kevent( + impl->kq_fd, + changelist /*changelist*/, + changelist_size /*nchanges*/, + changelist /*eventlist. It's OK to re-use the same memory for changelist input and eventlist output*/, + changelist_size /*nevents*/, + NULL /*timeout*/); + if (num_events == -1) { + goto subscribe_failed; + } + + /* Look through results to see if any failed */ + for (int i = 0; i < num_events; ++i) { + /* Every result should be flagged as error, that's just how EV_RECEIPT works */ + AWS_ASSERT(changelist[i].flags & EV_ERROR); + + /* If a real error occurred, .data contains the error code */ + if (changelist[i].data != 0) { + goto subscribe_failed; + } + } + + /* Success */ + handle_data->state = HANDLE_STATE_SUBSCRIBED; + return; + +subscribe_failed: + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p: failed to subscribe to events on fd %d", + (void *)event_loop, + handle_data->owner->data.fd); + /* Remove any related kevents that succeeded */ + for (int i = 0; i < num_events; ++i) { + if (changelist[i].data == 0) { + changelist[i].flags = EV_DELETE; + kevent( + impl->kq_fd, + &changelist[i] /*changelist*/, + 1 /*nchanges*/, + NULL /*eventlist*/, + 0 /*nevents*/, + NULL /*timeout*/); + } + } + + /* We can't return an error code because this was a scheduled task. + * Notify the user of the failed subscription by passing AWS_IO_EVENT_TYPE_ERROR to the callback. */ + handle_data->on_event(event_loop, handle_data->owner, AWS_IO_EVENT_TYPE_ERROR, handle_data->on_event_user_data); +} + +static int s_subscribe_to_io_events( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + aws_event_loop_on_event_fn *on_event, + void *user_data) { + + AWS_ASSERT(event_loop); + AWS_ASSERT(handle->data.fd != -1); + AWS_ASSERT(handle->additional_data == NULL); + AWS_ASSERT(on_event); + /* Must subscribe for read, write, or both */ + AWS_ASSERT(events & (AWS_IO_EVENT_TYPE_READABLE | AWS_IO_EVENT_TYPE_WRITABLE)); + + struct handle_data *handle_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct handle_data)); + if (!handle_data) { + return AWS_OP_ERR; + } + + handle_data->owner = handle; + handle_data->event_loop = event_loop; + handle_data->on_event = on_event; + handle_data->on_event_user_data = user_data; + handle_data->events_subscribed = events; + handle_data->state = HANDLE_STATE_SUBSCRIBING; + + handle->additional_data = handle_data; + + /* We schedule a task to perform the actual changes to the kqueue, read on for an explanation why... + * + * kqueue requires separate registrations for read and write events. + * If the user wants to know about both read and write, we need register once for read and once for write. + * If the first registration succeeds, but the second registration fails, we need to delete the first registration. + * If this all happened outside the event-thread, the successful registration's events could begin processing + * in the brief window of time before the registration is deleted. */ + + aws_task_init(&handle_data->subscribe_task, s_subscribe_task, handle_data, "kqueue_event_loop_subscribe"); + s_schedule_task_now(event_loop, &handle_data->subscribe_task); + + return AWS_OP_SUCCESS; +} + +static void s_free_io_event_resources(void *user_data) { + struct handle_data *handle_data = user_data; + struct kqueue_loop *impl = handle_data->event_loop->impl_data; + + impl->thread_data.connected_handle_count--; + + aws_mem_release(handle_data->event_loop->alloc, handle_data); +} + +static void s_clean_up_handle_data_task(struct aws_task *task, void *user_data, enum aws_task_status status) { + (void)task; + (void)status; + + struct handle_data *handle_data = user_data; + s_free_io_event_resources(handle_data); +} + +static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd); + AWS_ASSERT(handle->additional_data); + struct handle_data *handle_data = handle->additional_data; + struct kqueue_loop *impl = event_loop->impl_data; + + AWS_ASSERT(event_loop == handle_data->event_loop); + + /* If the handle was successfully subscribed to kqueue, then remove it. */ + if (handle_data->state == HANDLE_STATE_SUBSCRIBED) { + struct kevent changelist[2]; + int changelist_size = 0; + + if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) { + EV_SET( + &changelist[changelist_size++], + handle_data->owner->data.fd, + EVFILT_READ /*filter*/, + EV_DELETE /*flags*/, + 0 /*fflags*/, + 0 /*data*/, + handle_data /*udata*/); + } + if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) { + EV_SET( + &changelist[changelist_size++], + handle_data->owner->data.fd, + EVFILT_WRITE /*filter*/, + EV_DELETE /*flags*/, + 0 /*fflags*/, + 0 /*data*/, + handle_data /*udata*/); + } + + kevent(impl->kq_fd, changelist, changelist_size, NULL /*eventlist*/, 0 /*nevents*/, NULL /*timeout*/); + } + + /* Schedule a task to clean up the memory. This is done in a task to prevent the following scenario: + * - While processing a batch of events, some callback unsubscribes another aws_io_handle. + * - One of the other events in this batch belongs to that other aws_io_handle. + * - If the handle_data were already deleted, there would be an access invalid memory. */ + + aws_task_init( + &handle_data->cleanup_task, s_clean_up_handle_data_task, handle_data, "kqueue_event_loop_clean_up_handle_data"); + aws_event_loop_schedule_task_now(event_loop, &handle_data->cleanup_task); + + handle_data->state = HANDLE_STATE_UNSUBSCRIBED; + handle->additional_data = NULL; + + return AWS_OP_SUCCESS; +} + +static bool s_is_event_thread(struct aws_event_loop *event_loop) { + struct kqueue_loop *impl = event_loop->impl_data; + + aws_thread_id_t *thread_id = aws_atomic_load_ptr(&impl->running_thread_id); + return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id()); +} + +/* Called from thread. + * Takes tasks from tasks_to_schedule and adds them to the scheduler. */ +static void s_process_tasks_to_schedule(struct aws_event_loop *event_loop, struct aws_linked_list *tasks_to_schedule) { + struct kqueue_loop *impl = event_loop->impl_data; + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop); + + while (!aws_linked_list_empty(tasks_to_schedule)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(tasks_to_schedule); + struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: task %p pulled to event-loop, scheduling now.", + (void *)event_loop, + (void *)task); + /* Timestamp 0 is used to denote "now" tasks */ + if (task->timestamp == 0) { + aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task); + } else { + aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, task->timestamp); + } + } +} + +static void s_process_cross_thread_data(struct aws_event_loop *event_loop) { + struct kqueue_loop *impl = event_loop->impl_data; + + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread data to process", (void *)event_loop); + /* If there are tasks to schedule, grab them all out of synced_data.tasks_to_schedule. + * We'll process them later, so that we minimize time spent holding the mutex. */ + struct aws_linked_list tasks_to_schedule; + aws_linked_list_init(&tasks_to_schedule); + + { /* Begin critical section */ + aws_mutex_lock(&impl->cross_thread_data.mutex); + impl->cross_thread_data.thread_signaled = false; + + bool initiate_stop = (impl->cross_thread_data.state == EVENT_THREAD_STATE_STOPPING) && + (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING); + if (AWS_UNLIKELY(initiate_stop)) { + impl->thread_data.state = EVENT_THREAD_STATE_STOPPING; + } + + aws_linked_list_swap_contents(&impl->cross_thread_data.tasks_to_schedule, &tasks_to_schedule); + + aws_mutex_unlock(&impl->cross_thread_data.mutex); + } /* End critical section */ + + s_process_tasks_to_schedule(event_loop, &tasks_to_schedule); +} + +static int s_aws_event_flags_from_kevent(struct kevent *kevent) { + int event_flags = 0; + + if (kevent->flags & EV_ERROR) { + event_flags |= AWS_IO_EVENT_TYPE_ERROR; + } else if (kevent->filter == EVFILT_READ) { + if (kevent->data != 0) { + event_flags |= AWS_IO_EVENT_TYPE_READABLE; + } + + if (kevent->flags & EV_EOF) { + event_flags |= AWS_IO_EVENT_TYPE_CLOSED; + } + } else if (kevent->filter == EVFILT_WRITE) { + if (kevent->data != 0) { + event_flags |= AWS_IO_EVENT_TYPE_WRITABLE; + } + + if (kevent->flags & EV_EOF) { + event_flags |= AWS_IO_EVENT_TYPE_CLOSED; + } + } + + return event_flags; +} + +static void s_event_thread_main(void *user_data) { + struct aws_event_loop *event_loop = user_data; + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop); + struct kqueue_loop *impl = event_loop->impl_data; + + /* set thread id to the event-loop's thread. */ + aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id); + + AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN); + impl->thread_data.state = EVENT_THREAD_STATE_RUNNING; + + struct kevent kevents[MAX_EVENTS]; + + /* A single aws_io_handle could have two separate kevents if subscribed for both read and write. + * If both the read and write kevents fire in the same loop of the event-thread, + * combine the event-flags and deliver them in a single callback. + * This makes the kqueue_event_loop behave more like the other platform implementations. */ + struct handle_data *io_handle_events[MAX_EVENTS]; + + struct timespec timeout = { + .tv_sec = DEFAULT_TIMEOUT_SEC, + .tv_nsec = 0, + }; + + AWS_LOGF_INFO( + AWS_LS_IO_EVENT_LOOP, + "id=%p: default timeout %ds, and max events to process per tick %d", + (void *)event_loop, + DEFAULT_TIMEOUT_SEC, + MAX_EVENTS); + + while (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING) { + int num_io_handle_events = 0; + bool should_process_cross_thread_data = false; + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: waiting for a maximum of %ds %lluns", + (void *)event_loop, + (int)timeout.tv_sec, + (unsigned long long)timeout.tv_nsec); + + /* Process kqueue events */ + int num_kevents = kevent( + impl->kq_fd, NULL /*changelist*/, 0 /*nchanges*/, kevents /*eventlist*/, MAX_EVENTS /*nevents*/, &timeout); + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, num_kevents); + if (num_kevents == -1) { + /* Raise an error, in case this is interesting to anyone monitoring, + * and continue on with this loop. We can't process events, + * but we can still process scheduled tasks */ + aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE); + + /* Force the cross_thread_data to be processed. + * There might be valuable info in there, like the message to stop the thread. + * It's fine to do this even if nothing has changed, it just costs a mutex lock/unlock. */ + should_process_cross_thread_data = true; + } + + for (int i = 0; i < num_kevents; ++i) { + struct kevent *kevent = &kevents[i]; + + /* Was this event to signal that cross_thread_data has changed? */ + if ((int)kevent->ident == impl->cross_thread_signal_pipe[READ_FD]) { + should_process_cross_thread_data = true; + + /* Drain whatever data was written to the signaling pipe */ + uint32_t read_whatever; + while (read((int)kevent->ident, &read_whatever, sizeof(read_whatever)) > 0) { + } + + continue; + } + + /* Otherwise this was a normal event on a subscribed handle. Figure out which flags to report. */ + int event_flags = s_aws_event_flags_from_kevent(kevent); + if (event_flags == 0) { + continue; + } + + /* Combine flags, in case multiple kevents correspond to one handle. (see notes at top of function) */ + struct handle_data *handle_data = kevent->udata; + if (handle_data->events_this_loop == 0) { + io_handle_events[num_io_handle_events++] = handle_data; + } + handle_data->events_this_loop |= event_flags; + } + + /* Invoke each handle's event callback (unless the handle has been unsubscribed) */ + for (int i = 0; i < num_io_handle_events; ++i) { + struct handle_data *handle_data = io_handle_events[i]; + + if (handle_data->state == HANDLE_STATE_SUBSCRIBED) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: activity on fd %d, invoking handler.", + (void *)event_loop, + handle_data->owner->data.fd); + handle_data->on_event( + event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data); + } + + handle_data->events_this_loop = 0; + } + + /* Process cross_thread_data */ + if (should_process_cross_thread_data) { + s_process_cross_thread_data(event_loop); + } + + /* Run scheduled tasks */ + uint64_t now_ns = 0; + event_loop->clock(&now_ns); /* If clock fails, now_ns will be 0 and tasks scheduled for a specific time + will not be run. That's ok, we'll handle them next time around. */ + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop); + aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns); + + /* Set timeout for next kevent() call. + * If clock fails, or scheduler has no tasks, use default timeout */ + bool use_default_timeout = false; + + int err = event_loop->clock(&now_ns); + if (err) { + use_default_timeout = true; + } + + uint64_t next_run_time_ns; + if (!aws_task_scheduler_has_tasks(&impl->thread_data.scheduler, &next_run_time_ns)) { + + use_default_timeout = true; + } + + if (use_default_timeout) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop); + timeout.tv_sec = DEFAULT_TIMEOUT_SEC; + timeout.tv_nsec = 0; + } else { + /* Convert from timestamp in nanoseconds, to timeout in seconds with nanosecond remainder */ + uint64_t timeout_ns = next_run_time_ns > now_ns ? next_run_time_ns - now_ns : 0; + + uint64_t timeout_remainder_ns = 0; + uint64_t timeout_sec = + aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &timeout_remainder_ns); + + if (timeout_sec > LONG_MAX) { /* Check for overflow. On Darwin, these values are stored as longs */ + timeout_sec = LONG_MAX; + timeout_remainder_ns = 0; + } + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: detected more scheduled tasks with the next occurring at " + "%llu using timeout of %ds %lluns.", + (void *)event_loop, + (unsigned long long)timeout_ns, + (int)timeout_sec, + (unsigned long long)timeout_remainder_ns); + timeout.tv_sec = (time_t)(timeout_sec); + timeout.tv_nsec = (long)(timeout_remainder_ns); + } + } + + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop); + /* reset to NULL. This should be updated again during destroy before tasks are canceled. */ + aws_atomic_store_ptr(&impl->running_thread_id, NULL); +} |