aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/restricted/aws/aws-c-io/source/bsd
diff options
context:
space:
mode:
authororivej <orivej@yandex-team.ru>2022-02-10 16:44:49 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:49 +0300
commit718c552901d703c502ccbefdfc3c9028d608b947 (patch)
tree46534a98bbefcd7b1f3faa5b52c138ab27db75b7 /contrib/restricted/aws/aws-c-io/source/bsd
parente9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (diff)
downloadydb-718c552901d703c502ccbefdfc3c9028d608b947.tar.gz
Restoring authorship annotation for <orivej@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/restricted/aws/aws-c-io/source/bsd')
-rw-r--r--contrib/restricted/aws/aws-c-io/source/bsd/kqueue_event_loop.c1920
1 files changed, 960 insertions, 960 deletions
diff --git a/contrib/restricted/aws/aws-c-io/source/bsd/kqueue_event_loop.c b/contrib/restricted/aws/aws-c-io/source/bsd/kqueue_event_loop.c
index 3de882d045..949549b941 100644
--- a/contrib/restricted/aws/aws-c-io/source/bsd/kqueue_event_loop.c
+++ b/contrib/restricted/aws/aws-c-io/source/bsd/kqueue_event_loop.c
@@ -1,960 +1,960 @@
-/**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
-
-#include <aws/io/event_loop.h>
-
-#include <aws/io/logging.h>
-
-#include <aws/common/atomics.h>
-#include <aws/common/clock.h>
-#include <aws/common/mutex.h>
-#include <aws/common/task_scheduler.h>
-#include <aws/common/thread.h>
-
-#if defined(__FreeBSD__) || defined(__NetBSD__)
-# define __BSD_VISIBLE 1
-# include <sys/types.h>
-#endif
-
-#include <sys/event.h>
-
-#include <aws/io/io.h>
-#include <limits.h>
-#include <unistd.h>
-
-static void s_destroy(struct aws_event_loop *event_loop);
-static int s_run(struct aws_event_loop *event_loop);
-static int s_stop(struct aws_event_loop *event_loop);
-static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
-static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
-static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
-static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
-static int s_subscribe_to_io_events(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- aws_event_loop_on_event_fn *on_event,
- void *user_data);
-static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
-static void s_free_io_event_resources(void *user_data);
-static bool s_is_event_thread(struct aws_event_loop *event_loop);
-
-static void s_event_thread_main(void *user_data);
-
-int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);
-
-enum event_thread_state {
- EVENT_THREAD_STATE_READY_TO_RUN,
- EVENT_THREAD_STATE_RUNNING,
- EVENT_THREAD_STATE_STOPPING,
-};
-
-enum pipe_fd_index {
- READ_FD,
- WRITE_FD,
-};
-
-struct kqueue_loop {
- /* thread_created_on is the handle to the event loop thread. */
- struct aws_thread thread_created_on;
- /* thread_joined_to is used by the thread destroying the event loop. */
- aws_thread_id_t thread_joined_to;
- /* running_thread_id is NULL if the event loop thread is stopped or points-to the thread_id of the thread running
- * the event loop (either thread_created_on or thread_joined_to). Atomic because of concurrent writes (e.g.,
- * run/stop) and reads (e.g., is_event_loop_thread).
- * An aws_thread_id_t variable itself cannot be atomic because it is an opaque type that is platform-dependent. */
- struct aws_atomic_var running_thread_id;
- int kq_fd; /* kqueue file descriptor */
-
- /* Pipe for signaling to event-thread that cross_thread_data has changed. */
- int cross_thread_signal_pipe[2];
-
- /* cross_thread_data holds things that must be communicated across threads.
- * When the event-thread is running, the mutex must be locked while anyone touches anything in cross_thread_data.
- * If this data is modified outside the thread, the thread is signaled via activity on a pipe. */
- struct {
- struct aws_mutex mutex;
- bool thread_signaled; /* whether thread has been signaled about changes to cross_thread_data */
- struct aws_linked_list tasks_to_schedule;
- enum event_thread_state state;
- } cross_thread_data;
-
- /* thread_data holds things which, when the event-thread is running, may only be touched by the thread */
- struct {
- struct aws_task_scheduler scheduler;
-
- int connected_handle_count;
-
- /* These variables duplicate ones in cross_thread_data. We move values out while holding the mutex and operate
- * on them later */
- enum event_thread_state state;
- } thread_data;
-};
-
-/* Data attached to aws_io_handle while the handle is subscribed to io events */
-struct handle_data {
- struct aws_io_handle *owner;
- struct aws_event_loop *event_loop;
- aws_event_loop_on_event_fn *on_event;
- void *on_event_user_data;
-
- int events_subscribed; /* aws_io_event_types this handle should be subscribed to */
- int events_this_loop; /* aws_io_event_types received during current loop of the event-thread */
-
- enum { HANDLE_STATE_SUBSCRIBING, HANDLE_STATE_SUBSCRIBED, HANDLE_STATE_UNSUBSCRIBED } state;
-
- struct aws_task subscribe_task;
- struct aws_task cleanup_task;
-};
-
-enum {
- DEFAULT_TIMEOUT_SEC = 100, /* Max kevent() timeout per loop of the event-thread */
- MAX_EVENTS = 100, /* Max kevents to process per loop of the event-thread */
-};
-
-struct aws_event_loop_vtable s_kqueue_vtable = {
- .destroy = s_destroy,
- .run = s_run,
- .stop = s_stop,
- .wait_for_stop_completion = s_wait_for_stop_completion,
- .schedule_task_now = s_schedule_task_now,
- .schedule_task_future = s_schedule_task_future,
- .subscribe_to_io_events = s_subscribe_to_io_events,
- .cancel_task = s_cancel_task,
- .unsubscribe_from_io_events = s_unsubscribe_from_io_events,
- .free_io_event_resources = s_free_io_event_resources,
- .is_on_callers_thread = s_is_event_thread,
-};
-
-struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, aws_io_clock_fn *clock) {
- AWS_ASSERT(alloc);
- AWS_ASSERT(clock);
-
- bool clean_up_event_loop_mem = false;
- bool clean_up_event_loop_base = false;
- bool clean_up_impl_mem = false;
- bool clean_up_thread = false;
- bool clean_up_kqueue = false;
- bool clean_up_signal_pipe = false;
- bool clean_up_signal_kevent = false;
- bool clean_up_mutex = false;
-
- struct aws_event_loop *event_loop = aws_mem_acquire(alloc, sizeof(struct aws_event_loop));
- if (!event_loop) {
- return NULL;
- }
-
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered kqueue", (void *)event_loop);
- clean_up_event_loop_mem = true;
-
- int err = aws_event_loop_init_base(event_loop, alloc, clock);
- if (err) {
- goto clean_up;
- }
- clean_up_event_loop_base = true;
-
- struct kqueue_loop *impl = aws_mem_calloc(alloc, 1, sizeof(struct kqueue_loop));
- if (!impl) {
- goto clean_up;
- }
- /* intialize thread id to NULL. It will be set when the event loop thread starts. */
- aws_atomic_init_ptr(&impl->running_thread_id, NULL);
- clean_up_impl_mem = true;
-
- err = aws_thread_init(&impl->thread_created_on, alloc);
- if (err) {
- goto clean_up;
- }
- clean_up_thread = true;
-
- impl->kq_fd = kqueue();
- if (impl->kq_fd == -1) {
- AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open kqueue handle.", (void *)event_loop);
- aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
- goto clean_up;
- }
- clean_up_kqueue = true;
-
- err = aws_open_nonblocking_posix_pipe(impl->cross_thread_signal_pipe);
- if (err) {
- AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)event_loop);
- goto clean_up;
- }
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: pipe descriptors read %d, write %d.",
- (void *)event_loop,
- impl->cross_thread_signal_pipe[READ_FD],
- impl->cross_thread_signal_pipe[WRITE_FD]);
- clean_up_signal_pipe = true;
-
- /* Set up kevent to handle activity on the cross_thread_signal_pipe */
- struct kevent thread_signal_kevent;
- EV_SET(
- &thread_signal_kevent,
- impl->cross_thread_signal_pipe[READ_FD],
- EVFILT_READ /*filter*/,
- EV_ADD | EV_CLEAR /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- NULL /*udata*/);
-
- int res = kevent(
- impl->kq_fd,
- &thread_signal_kevent /*changelist*/,
- 1 /*nchanges*/,
- NULL /*eventlist*/,
- 0 /*nevents*/,
- NULL /*timeout*/);
-
- if (res == -1) {
- AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to create cross-thread signal kevent.", (void *)event_loop);
- aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
- goto clean_up;
- }
- clean_up_signal_kevent = true;
-
- err = aws_mutex_init(&impl->cross_thread_data.mutex);
- if (err) {
- goto clean_up;
- }
- clean_up_mutex = true;
-
- impl->cross_thread_data.thread_signaled = false;
-
- aws_linked_list_init(&impl->cross_thread_data.tasks_to_schedule);
-
- impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
-
- err = aws_task_scheduler_init(&impl->thread_data.scheduler, alloc);
- if (err) {
- goto clean_up;
- }
-
- impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
-
- event_loop->impl_data = impl;
-
- event_loop->vtable = &s_kqueue_vtable;
-
- /* success */
- return event_loop;
-
-clean_up:
- if (clean_up_mutex) {
- aws_mutex_clean_up(&impl->cross_thread_data.mutex);
- }
- if (clean_up_signal_kevent) {
- thread_signal_kevent.flags = EV_DELETE;
- kevent(
- impl->kq_fd,
- &thread_signal_kevent /*changelist*/,
- 1 /*nchanges*/,
- NULL /*eventlist*/,
- 0 /*nevents*/,
- NULL /*timeout*/);
- }
- if (clean_up_signal_pipe) {
- close(impl->cross_thread_signal_pipe[READ_FD]);
- close(impl->cross_thread_signal_pipe[WRITE_FD]);
- }
- if (clean_up_kqueue) {
- close(impl->kq_fd);
- }
- if (clean_up_thread) {
- aws_thread_clean_up(&impl->thread_created_on);
- }
- if (clean_up_impl_mem) {
- aws_mem_release(alloc, impl);
- }
- if (clean_up_event_loop_base) {
- aws_event_loop_clean_up_base(event_loop);
- }
- if (clean_up_event_loop_mem) {
- aws_mem_release(alloc, event_loop);
- }
- return NULL;
-}
-
-static void s_destroy(struct aws_event_loop *event_loop) {
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: destroying event_loop", (void *)event_loop);
- struct kqueue_loop *impl = event_loop->impl_data;
-
- /* Stop the event-thread. This might have already happened. It's safe to call multiple times. */
- s_stop(event_loop);
- int err = s_wait_for_stop_completion(event_loop);
- if (err) {
- AWS_LOGF_WARN(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: failed to destroy event-thread, resources have been leaked",
- (void *)event_loop);
- AWS_ASSERT("Failed to destroy event-thread, resources have been leaked." == NULL);
- return;
- }
- /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
- impl->thread_joined_to = aws_thread_current_thread_id();
- aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_joined_to);
-
- /* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop.
- * Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */
-
- aws_task_scheduler_clean_up(&impl->thread_data.scheduler); /* Tasks in scheduler get cancelled*/
-
- while (!aws_linked_list_empty(&impl->cross_thread_data.tasks_to_schedule)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&impl->cross_thread_data.tasks_to_schedule);
- struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
- task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
- }
-
- /* Warn user if aws_io_handle was subscribed, but never unsubscribed. This would cause memory leaks. */
- AWS_ASSERT(impl->thread_data.connected_handle_count == 0);
-
- /* Clean up everything else */
- aws_mutex_clean_up(&impl->cross_thread_data.mutex);
-
- struct kevent thread_signal_kevent;
- EV_SET(
- &thread_signal_kevent,
- impl->cross_thread_signal_pipe[READ_FD],
- EVFILT_READ /*filter*/,
- EV_DELETE /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- NULL /*udata*/);
-
- kevent(
- impl->kq_fd,
- &thread_signal_kevent /*changelist*/,
- 1 /*nchanges*/,
- NULL /*eventlist*/,
- 0 /*nevents*/,
- NULL /*timeout*/);
-
- close(impl->cross_thread_signal_pipe[READ_FD]);
- close(impl->cross_thread_signal_pipe[WRITE_FD]);
- close(impl->kq_fd);
- aws_thread_clean_up(&impl->thread_created_on);
- aws_mem_release(event_loop->alloc, impl);
- aws_event_loop_clean_up_base(event_loop);
- aws_mem_release(event_loop->alloc, event_loop);
-}
-
-static int s_run(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
-
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: starting event-loop thread.", (void *)event_loop);
- /* to re-run, call stop() and wait_for_stop_completion() */
- AWS_ASSERT(impl->cross_thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
- AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
-
- /* Since thread isn't running it's ok to touch thread_data,
- * and it's ok to touch cross_thread_data without locking the mutex */
- impl->cross_thread_data.state = EVENT_THREAD_STATE_RUNNING;
-
- int err = aws_thread_launch(&impl->thread_created_on, s_event_thread_main, (void *)event_loop, NULL);
- if (err) {
- AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
- goto clean_up;
- }
-
- return AWS_OP_SUCCESS;
-
-clean_up:
- impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
- return AWS_OP_ERR;
-}
-
-/* This function can't fail, we're relying on the thread responding to critical messages (ex: stop thread) */
-void signal_cross_thread_data_changed(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
-
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: signaling event-loop that cross-thread tasks need to be scheduled.",
- (void *)event_loop);
- /* Doesn't actually matter what we write, any activity on pipe signals that cross_thread_data has changed,
- * If the pipe is full and the write fails, that's fine, the event-thread will get the signal from some previous
- * write */
- uint32_t write_whatever = 0xC0FFEE;
- write(impl->cross_thread_signal_pipe[WRITE_FD], &write_whatever, sizeof(write_whatever));
-}
-
-static int s_stop(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
-
- bool signal_thread = false;
-
- { /* Begin critical section */
- aws_mutex_lock(&impl->cross_thread_data.mutex);
- if (impl->cross_thread_data.state == EVENT_THREAD_STATE_RUNNING) {
- impl->cross_thread_data.state = EVENT_THREAD_STATE_STOPPING;
- signal_thread = !impl->cross_thread_data.thread_signaled;
- impl->cross_thread_data.thread_signaled = true;
- }
- aws_mutex_unlock(&impl->cross_thread_data.mutex);
- } /* End critical section */
-
- if (signal_thread) {
- signal_cross_thread_data_changed(event_loop);
- }
-
- return AWS_OP_SUCCESS;
-}
-
-static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
-
-#ifdef DEBUG_BUILD
- aws_mutex_lock(&impl->cross_thread_data.mutex);
- /* call stop() before wait_for_stop_completion() or you'll wait forever */
- AWS_ASSERT(impl->cross_thread_data.state != EVENT_THREAD_STATE_RUNNING);
- aws_mutex_unlock(&impl->cross_thread_data.mutex);
-#endif
-
- int err = aws_thread_join(&impl->thread_created_on);
- if (err) {
- return AWS_OP_ERR;
- }
-
- /* Since thread is no longer running it's ok to touch thread_data,
- * and it's ok to touch cross_thread_data without locking the mutex */
- impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
- impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
-
- return AWS_OP_SUCCESS;
-}
-
-/* Common functionality for "now" and "future" task scheduling.
- * If `run_at_nanos` is zero then the task is scheduled as a "now" task. */
-static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
- AWS_ASSERT(task);
- struct kqueue_loop *impl = event_loop->impl_data;
-
- /* If we're on the event-thread, just schedule it directly */
- if (s_is_event_thread(event_loop)) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: scheduling task %p in-thread for timestamp %llu",
- (void *)event_loop,
- (void *)task,
- (unsigned long long)run_at_nanos);
- if (run_at_nanos == 0) {
- aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
- } else {
- aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
- }
- return;
- }
-
- /* Otherwise, add it to cross_thread_data.tasks_to_schedule and signal the event-thread to process it */
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: scheduling task %p cross-thread for timestamp %llu",
- (void *)event_loop,
- (void *)task,
- (unsigned long long)run_at_nanos);
- task->timestamp = run_at_nanos;
- bool should_signal_thread = false;
-
- /* Begin critical section */
- aws_mutex_lock(&impl->cross_thread_data.mutex);
- aws_linked_list_push_back(&impl->cross_thread_data.tasks_to_schedule, &task->node);
-
- /* Signal thread that cross_thread_data has changed (unless it's been signaled already) */
- if (!impl->cross_thread_data.thread_signaled) {
- should_signal_thread = true;
- impl->cross_thread_data.thread_signaled = true;
- }
-
- aws_mutex_unlock(&impl->cross_thread_data.mutex);
- /* End critical section */
-
- if (should_signal_thread) {
- signal_cross_thread_data_changed(event_loop);
- }
-}
-
-static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
- s_schedule_task_common(event_loop, task, 0); /* Zero is used to denote "now" tasks */
-}
-
-static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
- s_schedule_task_common(event_loop, task, run_at_nanos);
-}
-
-static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
- struct kqueue_loop *kqueue_loop = event_loop->impl_data;
- AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
- aws_task_scheduler_cancel_task(&kqueue_loop->thread_data.scheduler, task);
-}
-
-/* Scheduled task that connects aws_io_handle with the kqueue */
-static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
- (void)task;
- struct handle_data *handle_data = user_data;
- struct aws_event_loop *event_loop = handle_data->event_loop;
- struct kqueue_loop *impl = handle_data->event_loop->impl_data;
-
- impl->thread_data.connected_handle_count++;
-
- /* if task was cancelled, nothing to do */
- if (status == AWS_TASK_STATUS_CANCELED) {
- return;
- }
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle_data->owner->data.fd);
-
- /* If handle was unsubscribed before this task could execute, nothing to do */
- if (handle_data->state == HANDLE_STATE_UNSUBSCRIBED) {
- return;
- }
-
- AWS_ASSERT(handle_data->state == HANDLE_STATE_SUBSCRIBING);
-
- /* In order to monitor both reads and writes, kqueue requires you to add two separate kevents.
- * If we're adding two separate kevents, but one of those fails, we need to remove the other kevent.
- * Therefore we use the EV_RECEIPT flag. This causes kevent() to tell whether each EV_ADD succeeded,
- * rather than the usual behavior of telling us about recent events. */
- struct kevent changelist[2];
- AWS_ZERO_ARRAY(changelist);
-
- int changelist_size = 0;
-
- if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
- EV_SET(
- &changelist[changelist_size++],
- handle_data->owner->data.fd,
- EVFILT_READ /*filter*/,
- EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- handle_data /*udata*/);
- }
- if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
- EV_SET(
- &changelist[changelist_size++],
- handle_data->owner->data.fd,
- EVFILT_WRITE /*filter*/,
- EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- handle_data /*udata*/);
- }
-
- int num_events = kevent(
- impl->kq_fd,
- changelist /*changelist*/,
- changelist_size /*nchanges*/,
- changelist /*eventlist. It's OK to re-use the same memory for changelist input and eventlist output*/,
- changelist_size /*nevents*/,
- NULL /*timeout*/);
- if (num_events == -1) {
- goto subscribe_failed;
- }
-
- /* Look through results to see if any failed */
- for (int i = 0; i < num_events; ++i) {
- /* Every result should be flagged as error, that's just how EV_RECEIPT works */
- AWS_ASSERT(changelist[i].flags & EV_ERROR);
-
- /* If a real error occurred, .data contains the error code */
- if (changelist[i].data != 0) {
- goto subscribe_failed;
- }
- }
-
- /* Success */
- handle_data->state = HANDLE_STATE_SUBSCRIBED;
- return;
-
-subscribe_failed:
- AWS_LOGF_ERROR(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: failed to subscribe to events on fd %d",
- (void *)event_loop,
- handle_data->owner->data.fd);
- /* Remove any related kevents that succeeded */
- for (int i = 0; i < num_events; ++i) {
- if (changelist[i].data == 0) {
- changelist[i].flags = EV_DELETE;
- kevent(
- impl->kq_fd,
- &changelist[i] /*changelist*/,
- 1 /*nchanges*/,
- NULL /*eventlist*/,
- 0 /*nevents*/,
- NULL /*timeout*/);
- }
- }
-
- /* We can't return an error code because this was a scheduled task.
- * Notify the user of the failed subscription by passing AWS_IO_EVENT_TYPE_ERROR to the callback. */
- handle_data->on_event(event_loop, handle_data->owner, AWS_IO_EVENT_TYPE_ERROR, handle_data->on_event_user_data);
-}
-
-static int s_subscribe_to_io_events(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- aws_event_loop_on_event_fn *on_event,
- void *user_data) {
-
- AWS_ASSERT(event_loop);
- AWS_ASSERT(handle->data.fd != -1);
- AWS_ASSERT(handle->additional_data == NULL);
- AWS_ASSERT(on_event);
- /* Must subscribe for read, write, or both */
- AWS_ASSERT(events & (AWS_IO_EVENT_TYPE_READABLE | AWS_IO_EVENT_TYPE_WRITABLE));
-
- struct handle_data *handle_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct handle_data));
- if (!handle_data) {
- return AWS_OP_ERR;
- }
-
- handle_data->owner = handle;
- handle_data->event_loop = event_loop;
- handle_data->on_event = on_event;
- handle_data->on_event_user_data = user_data;
- handle_data->events_subscribed = events;
- handle_data->state = HANDLE_STATE_SUBSCRIBING;
-
- handle->additional_data = handle_data;
-
- /* We schedule a task to perform the actual changes to the kqueue, read on for an explanation why...
- *
- * kqueue requires separate registrations for read and write events.
- * If the user wants to know about both read and write, we need register once for read and once for write.
- * If the first registration succeeds, but the second registration fails, we need to delete the first registration.
- * If this all happened outside the event-thread, the successful registration's events could begin processing
- * in the brief window of time before the registration is deleted. */
-
- aws_task_init(&handle_data->subscribe_task, s_subscribe_task, handle_data, "kqueue_event_loop_subscribe");
- s_schedule_task_now(event_loop, &handle_data->subscribe_task);
-
- return AWS_OP_SUCCESS;
-}
-
-static void s_free_io_event_resources(void *user_data) {
- struct handle_data *handle_data = user_data;
- struct kqueue_loop *impl = handle_data->event_loop->impl_data;
-
- impl->thread_data.connected_handle_count--;
-
- aws_mem_release(handle_data->event_loop->alloc, handle_data);
-}
-
-static void s_clean_up_handle_data_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
- (void)task;
- (void)status;
-
- struct handle_data *handle_data = user_data;
- s_free_io_event_resources(handle_data);
-}
-
-static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd);
- AWS_ASSERT(handle->additional_data);
- struct handle_data *handle_data = handle->additional_data;
- struct kqueue_loop *impl = event_loop->impl_data;
-
- AWS_ASSERT(event_loop == handle_data->event_loop);
-
- /* If the handle was successfully subscribed to kqueue, then remove it. */
- if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
- struct kevent changelist[2];
- int changelist_size = 0;
-
- if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
- EV_SET(
- &changelist[changelist_size++],
- handle_data->owner->data.fd,
- EVFILT_READ /*filter*/,
- EV_DELETE /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- handle_data /*udata*/);
- }
- if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
- EV_SET(
- &changelist[changelist_size++],
- handle_data->owner->data.fd,
- EVFILT_WRITE /*filter*/,
- EV_DELETE /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- handle_data /*udata*/);
- }
-
- kevent(impl->kq_fd, changelist, changelist_size, NULL /*eventlist*/, 0 /*nevents*/, NULL /*timeout*/);
- }
-
- /* Schedule a task to clean up the memory. This is done in a task to prevent the following scenario:
- * - While processing a batch of events, some callback unsubscribes another aws_io_handle.
- * - One of the other events in this batch belongs to that other aws_io_handle.
- * - If the handle_data were already deleted, there would be an access invalid memory. */
-
- aws_task_init(
- &handle_data->cleanup_task, s_clean_up_handle_data_task, handle_data, "kqueue_event_loop_clean_up_handle_data");
- aws_event_loop_schedule_task_now(event_loop, &handle_data->cleanup_task);
-
- handle_data->state = HANDLE_STATE_UNSUBSCRIBED;
- handle->additional_data = NULL;
-
- return AWS_OP_SUCCESS;
-}
-
-static bool s_is_event_thread(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
-
- aws_thread_id_t *thread_id = aws_atomic_load_ptr(&impl->running_thread_id);
- return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
-}
-
-/* Called from thread.
- * Takes tasks from tasks_to_schedule and adds them to the scheduler. */
-static void s_process_tasks_to_schedule(struct aws_event_loop *event_loop, struct aws_linked_list *tasks_to_schedule) {
- struct kqueue_loop *impl = event_loop->impl_data;
- AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop);
-
- while (!aws_linked_list_empty(tasks_to_schedule)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(tasks_to_schedule);
- struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
-
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: task %p pulled to event-loop, scheduling now.",
- (void *)event_loop,
- (void *)task);
- /* Timestamp 0 is used to denote "now" tasks */
- if (task->timestamp == 0) {
- aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
- } else {
- aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, task->timestamp);
- }
- }
-}
-
-static void s_process_cross_thread_data(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
-
- AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread data to process", (void *)event_loop);
- /* If there are tasks to schedule, grab them all out of synced_data.tasks_to_schedule.
- * We'll process them later, so that we minimize time spent holding the mutex. */
- struct aws_linked_list tasks_to_schedule;
- aws_linked_list_init(&tasks_to_schedule);
-
- { /* Begin critical section */
- aws_mutex_lock(&impl->cross_thread_data.mutex);
- impl->cross_thread_data.thread_signaled = false;
-
- bool initiate_stop = (impl->cross_thread_data.state == EVENT_THREAD_STATE_STOPPING) &&
- (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING);
- if (AWS_UNLIKELY(initiate_stop)) {
- impl->thread_data.state = EVENT_THREAD_STATE_STOPPING;
- }
-
- aws_linked_list_swap_contents(&impl->cross_thread_data.tasks_to_schedule, &tasks_to_schedule);
-
- aws_mutex_unlock(&impl->cross_thread_data.mutex);
- } /* End critical section */
-
- s_process_tasks_to_schedule(event_loop, &tasks_to_schedule);
-}
-
-static int s_aws_event_flags_from_kevent(struct kevent *kevent) {
- int event_flags = 0;
-
- if (kevent->flags & EV_ERROR) {
- event_flags |= AWS_IO_EVENT_TYPE_ERROR;
- } else if (kevent->filter == EVFILT_READ) {
- if (kevent->data != 0) {
- event_flags |= AWS_IO_EVENT_TYPE_READABLE;
- }
-
- if (kevent->flags & EV_EOF) {
- event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
- }
- } else if (kevent->filter == EVFILT_WRITE) {
- if (kevent->data != 0) {
- event_flags |= AWS_IO_EVENT_TYPE_WRITABLE;
- }
-
- if (kevent->flags & EV_EOF) {
- event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
- }
- }
-
- return event_flags;
-}
-
-static void s_event_thread_main(void *user_data) {
- struct aws_event_loop *event_loop = user_data;
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
- struct kqueue_loop *impl = event_loop->impl_data;
-
- /* set thread id to the event-loop's thread. */
- aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);
-
- AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
- impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
-
- struct kevent kevents[MAX_EVENTS];
-
- /* A single aws_io_handle could have two separate kevents if subscribed for both read and write.
- * If both the read and write kevents fire in the same loop of the event-thread,
- * combine the event-flags and deliver them in a single callback.
- * This makes the kqueue_event_loop behave more like the other platform implementations. */
- struct handle_data *io_handle_events[MAX_EVENTS];
-
- struct timespec timeout = {
- .tv_sec = DEFAULT_TIMEOUT_SEC,
- .tv_nsec = 0,
- };
-
- AWS_LOGF_INFO(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: default timeout %ds, and max events to process per tick %d",
- (void *)event_loop,
- DEFAULT_TIMEOUT_SEC,
- MAX_EVENTS);
-
- while (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING) {
- int num_io_handle_events = 0;
- bool should_process_cross_thread_data = false;
-
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: waiting for a maximum of %ds %lluns",
- (void *)event_loop,
- (int)timeout.tv_sec,
- (unsigned long long)timeout.tv_nsec);
-
- /* Process kqueue events */
- int num_kevents = kevent(
- impl->kq_fd, NULL /*changelist*/, 0 /*nchanges*/, kevents /*eventlist*/, MAX_EVENTS /*nevents*/, &timeout);
-
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, num_kevents);
- if (num_kevents == -1) {
- /* Raise an error, in case this is interesting to anyone monitoring,
- * and continue on with this loop. We can't process events,
- * but we can still process scheduled tasks */
- aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
-
- /* Force the cross_thread_data to be processed.
- * There might be valuable info in there, like the message to stop the thread.
- * It's fine to do this even if nothing has changed, it just costs a mutex lock/unlock. */
- should_process_cross_thread_data = true;
- }
-
- for (int i = 0; i < num_kevents; ++i) {
- struct kevent *kevent = &kevents[i];
-
- /* Was this event to signal that cross_thread_data has changed? */
- if ((int)kevent->ident == impl->cross_thread_signal_pipe[READ_FD]) {
- should_process_cross_thread_data = true;
-
- /* Drain whatever data was written to the signaling pipe */
- uint32_t read_whatever;
- while (read((int)kevent->ident, &read_whatever, sizeof(read_whatever)) > 0) {
- }
-
- continue;
- }
-
- /* Otherwise this was a normal event on a subscribed handle. Figure out which flags to report. */
- int event_flags = s_aws_event_flags_from_kevent(kevent);
- if (event_flags == 0) {
- continue;
- }
-
- /* Combine flags, in case multiple kevents correspond to one handle. (see notes at top of function) */
- struct handle_data *handle_data = kevent->udata;
- if (handle_data->events_this_loop == 0) {
- io_handle_events[num_io_handle_events++] = handle_data;
- }
- handle_data->events_this_loop |= event_flags;
- }
-
- /* Invoke each handle's event callback (unless the handle has been unsubscribed) */
- for (int i = 0; i < num_io_handle_events; ++i) {
- struct handle_data *handle_data = io_handle_events[i];
-
- if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: activity on fd %d, invoking handler.",
- (void *)event_loop,
- handle_data->owner->data.fd);
- handle_data->on_event(
- event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data);
- }
-
- handle_data->events_this_loop = 0;
- }
-
- /* Process cross_thread_data */
- if (should_process_cross_thread_data) {
- s_process_cross_thread_data(event_loop);
- }
-
- /* Run scheduled tasks */
- uint64_t now_ns = 0;
- event_loop->clock(&now_ns); /* If clock fails, now_ns will be 0 and tasks scheduled for a specific time
- will not be run. That's ok, we'll handle them next time around. */
- AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
- aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);
-
- /* Set timeout for next kevent() call.
- * If clock fails, or scheduler has no tasks, use default timeout */
- bool use_default_timeout = false;
-
- int err = event_loop->clock(&now_ns);
- if (err) {
- use_default_timeout = true;
- }
-
- uint64_t next_run_time_ns;
- if (!aws_task_scheduler_has_tasks(&impl->thread_data.scheduler, &next_run_time_ns)) {
-
- use_default_timeout = true;
- }
-
- if (use_default_timeout) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
- timeout.tv_sec = DEFAULT_TIMEOUT_SEC;
- timeout.tv_nsec = 0;
- } else {
- /* Convert from timestamp in nanoseconds, to timeout in seconds with nanosecond remainder */
- uint64_t timeout_ns = next_run_time_ns > now_ns ? next_run_time_ns - now_ns : 0;
-
- uint64_t timeout_remainder_ns = 0;
- uint64_t timeout_sec =
- aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &timeout_remainder_ns);
-
- if (timeout_sec > LONG_MAX) { /* Check for overflow. On Darwin, these values are stored as longs */
- timeout_sec = LONG_MAX;
- timeout_remainder_ns = 0;
- }
-
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: detected more scheduled tasks with the next occurring at "
- "%llu using timeout of %ds %lluns.",
- (void *)event_loop,
- (unsigned long long)timeout_ns,
- (int)timeout_sec,
- (unsigned long long)timeout_remainder_ns);
- timeout.tv_sec = (time_t)(timeout_sec);
- timeout.tv_nsec = (long)(timeout_remainder_ns);
- }
- }
-
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
- /* reset to NULL. This should be updated again during destroy before tasks are canceled. */
- aws_atomic_store_ptr(&impl->running_thread_id, NULL);
-}
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+#include <aws/io/event_loop.h>
+
+#include <aws/io/logging.h>
+
+#include <aws/common/atomics.h>
+#include <aws/common/clock.h>
+#include <aws/common/mutex.h>
+#include <aws/common/task_scheduler.h>
+#include <aws/common/thread.h>
+
+#if defined(__FreeBSD__) || defined(__NetBSD__)
+# define __BSD_VISIBLE 1
+# include <sys/types.h>
+#endif
+
+#include <sys/event.h>
+
+#include <aws/io/io.h>
+#include <limits.h>
+#include <unistd.h>
+
+static void s_destroy(struct aws_event_loop *event_loop);
+static int s_run(struct aws_event_loop *event_loop);
+static int s_stop(struct aws_event_loop *event_loop);
+static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
+static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
+static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
+static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
+static int s_subscribe_to_io_events(
+ struct aws_event_loop *event_loop,
+ struct aws_io_handle *handle,
+ int events,
+ aws_event_loop_on_event_fn *on_event,
+ void *user_data);
+static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
+static void s_free_io_event_resources(void *user_data);
+static bool s_is_event_thread(struct aws_event_loop *event_loop);
+
+static void s_event_thread_main(void *user_data);
+
+int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);
+
+enum event_thread_state {
+ EVENT_THREAD_STATE_READY_TO_RUN,
+ EVENT_THREAD_STATE_RUNNING,
+ EVENT_THREAD_STATE_STOPPING,
+};
+
+enum pipe_fd_index {
+ READ_FD,
+ WRITE_FD,
+};
+
+struct kqueue_loop {
+ /* thread_created_on is the handle to the event loop thread. */
+ struct aws_thread thread_created_on;
+ /* thread_joined_to is used by the thread destroying the event loop. */
+ aws_thread_id_t thread_joined_to;
+ /* running_thread_id is NULL if the event loop thread is stopped or points-to the thread_id of the thread running
+ * the event loop (either thread_created_on or thread_joined_to). Atomic because of concurrent writes (e.g.,
+ * run/stop) and reads (e.g., is_event_loop_thread).
+ * An aws_thread_id_t variable itself cannot be atomic because it is an opaque type that is platform-dependent. */
+ struct aws_atomic_var running_thread_id;
+ int kq_fd; /* kqueue file descriptor */
+
+ /* Pipe for signaling to event-thread that cross_thread_data has changed. */
+ int cross_thread_signal_pipe[2];
+
+ /* cross_thread_data holds things that must be communicated across threads.
+ * When the event-thread is running, the mutex must be locked while anyone touches anything in cross_thread_data.
+ * If this data is modified outside the thread, the thread is signaled via activity on a pipe. */
+ struct {
+ struct aws_mutex mutex;
+ bool thread_signaled; /* whether thread has been signaled about changes to cross_thread_data */
+ struct aws_linked_list tasks_to_schedule;
+ enum event_thread_state state;
+ } cross_thread_data;
+
+ /* thread_data holds things which, when the event-thread is running, may only be touched by the thread */
+ struct {
+ struct aws_task_scheduler scheduler;
+
+ int connected_handle_count;
+
+ /* These variables duplicate ones in cross_thread_data. We move values out while holding the mutex and operate
+ * on them later */
+ enum event_thread_state state;
+ } thread_data;
+};
+
+/* Data attached to aws_io_handle while the handle is subscribed to io events */
+struct handle_data {
+ struct aws_io_handle *owner;
+ struct aws_event_loop *event_loop;
+ aws_event_loop_on_event_fn *on_event;
+ void *on_event_user_data;
+
+ int events_subscribed; /* aws_io_event_types this handle should be subscribed to */
+ int events_this_loop; /* aws_io_event_types received during current loop of the event-thread */
+
+ enum { HANDLE_STATE_SUBSCRIBING, HANDLE_STATE_SUBSCRIBED, HANDLE_STATE_UNSUBSCRIBED } state;
+
+ struct aws_task subscribe_task;
+ struct aws_task cleanup_task;
+};
+
+enum {
+ DEFAULT_TIMEOUT_SEC = 100, /* Max kevent() timeout per loop of the event-thread */
+ MAX_EVENTS = 100, /* Max kevents to process per loop of the event-thread */
+};
+
+struct aws_event_loop_vtable s_kqueue_vtable = {
+ .destroy = s_destroy,
+ .run = s_run,
+ .stop = s_stop,
+ .wait_for_stop_completion = s_wait_for_stop_completion,
+ .schedule_task_now = s_schedule_task_now,
+ .schedule_task_future = s_schedule_task_future,
+ .subscribe_to_io_events = s_subscribe_to_io_events,
+ .cancel_task = s_cancel_task,
+ .unsubscribe_from_io_events = s_unsubscribe_from_io_events,
+ .free_io_event_resources = s_free_io_event_resources,
+ .is_on_callers_thread = s_is_event_thread,
+};
+
+struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, aws_io_clock_fn *clock) {
+ AWS_ASSERT(alloc);
+ AWS_ASSERT(clock);
+
+ bool clean_up_event_loop_mem = false;
+ bool clean_up_event_loop_base = false;
+ bool clean_up_impl_mem = false;
+ bool clean_up_thread = false;
+ bool clean_up_kqueue = false;
+ bool clean_up_signal_pipe = false;
+ bool clean_up_signal_kevent = false;
+ bool clean_up_mutex = false;
+
+ struct aws_event_loop *event_loop = aws_mem_acquire(alloc, sizeof(struct aws_event_loop));
+ if (!event_loop) {
+ return NULL;
+ }
+
+ AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered kqueue", (void *)event_loop);
+ clean_up_event_loop_mem = true;
+
+ int err = aws_event_loop_init_base(event_loop, alloc, clock);
+ if (err) {
+ goto clean_up;
+ }
+ clean_up_event_loop_base = true;
+
+ struct kqueue_loop *impl = aws_mem_calloc(alloc, 1, sizeof(struct kqueue_loop));
+ if (!impl) {
+ goto clean_up;
+ }
+ /* intialize thread id to NULL. It will be set when the event loop thread starts. */
+ aws_atomic_init_ptr(&impl->running_thread_id, NULL);
+ clean_up_impl_mem = true;
+
+ err = aws_thread_init(&impl->thread_created_on, alloc);
+ if (err) {
+ goto clean_up;
+ }
+ clean_up_thread = true;
+
+ impl->kq_fd = kqueue();
+ if (impl->kq_fd == -1) {
+ AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open kqueue handle.", (void *)event_loop);
+ aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
+ goto clean_up;
+ }
+ clean_up_kqueue = true;
+
+ err = aws_open_nonblocking_posix_pipe(impl->cross_thread_signal_pipe);
+ if (err) {
+ AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)event_loop);
+ goto clean_up;
+ }
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: pipe descriptors read %d, write %d.",
+ (void *)event_loop,
+ impl->cross_thread_signal_pipe[READ_FD],
+ impl->cross_thread_signal_pipe[WRITE_FD]);
+ clean_up_signal_pipe = true;
+
+ /* Set up kevent to handle activity on the cross_thread_signal_pipe */
+ struct kevent thread_signal_kevent;
+ EV_SET(
+ &thread_signal_kevent,
+ impl->cross_thread_signal_pipe[READ_FD],
+ EVFILT_READ /*filter*/,
+ EV_ADD | EV_CLEAR /*flags*/,
+ 0 /*fflags*/,
+ 0 /*data*/,
+ NULL /*udata*/);
+
+ int res = kevent(
+ impl->kq_fd,
+ &thread_signal_kevent /*changelist*/,
+ 1 /*nchanges*/,
+ NULL /*eventlist*/,
+ 0 /*nevents*/,
+ NULL /*timeout*/);
+
+ if (res == -1) {
+ AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to create cross-thread signal kevent.", (void *)event_loop);
+ aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
+ goto clean_up;
+ }
+ clean_up_signal_kevent = true;
+
+ err = aws_mutex_init(&impl->cross_thread_data.mutex);
+ if (err) {
+ goto clean_up;
+ }
+ clean_up_mutex = true;
+
+ impl->cross_thread_data.thread_signaled = false;
+
+ aws_linked_list_init(&impl->cross_thread_data.tasks_to_schedule);
+
+ impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
+
+ err = aws_task_scheduler_init(&impl->thread_data.scheduler, alloc);
+ if (err) {
+ goto clean_up;
+ }
+
+ impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
+
+ event_loop->impl_data = impl;
+
+ event_loop->vtable = &s_kqueue_vtable;
+
+ /* success */
+ return event_loop;
+
+clean_up:
+ if (clean_up_mutex) {
+ aws_mutex_clean_up(&impl->cross_thread_data.mutex);
+ }
+ if (clean_up_signal_kevent) {
+ thread_signal_kevent.flags = EV_DELETE;
+ kevent(
+ impl->kq_fd,
+ &thread_signal_kevent /*changelist*/,
+ 1 /*nchanges*/,
+ NULL /*eventlist*/,
+ 0 /*nevents*/,
+ NULL /*timeout*/);
+ }
+ if (clean_up_signal_pipe) {
+ close(impl->cross_thread_signal_pipe[READ_FD]);
+ close(impl->cross_thread_signal_pipe[WRITE_FD]);
+ }
+ if (clean_up_kqueue) {
+ close(impl->kq_fd);
+ }
+ if (clean_up_thread) {
+ aws_thread_clean_up(&impl->thread_created_on);
+ }
+ if (clean_up_impl_mem) {
+ aws_mem_release(alloc, impl);
+ }
+ if (clean_up_event_loop_base) {
+ aws_event_loop_clean_up_base(event_loop);
+ }
+ if (clean_up_event_loop_mem) {
+ aws_mem_release(alloc, event_loop);
+ }
+ return NULL;
+}
+
+static void s_destroy(struct aws_event_loop *event_loop) {
+ AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: destroying event_loop", (void *)event_loop);
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+ /* Stop the event-thread. This might have already happened. It's safe to call multiple times. */
+ s_stop(event_loop);
+ int err = s_wait_for_stop_completion(event_loop);
+ if (err) {
+ AWS_LOGF_WARN(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: failed to destroy event-thread, resources have been leaked",
+ (void *)event_loop);
+ AWS_ASSERT("Failed to destroy event-thread, resources have been leaked." == NULL);
+ return;
+ }
+ /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
+ impl->thread_joined_to = aws_thread_current_thread_id();
+ aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_joined_to);
+
+ /* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop.
+ * Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */
+
+ aws_task_scheduler_clean_up(&impl->thread_data.scheduler); /* Tasks in scheduler get cancelled*/
+
+ while (!aws_linked_list_empty(&impl->cross_thread_data.tasks_to_schedule)) {
+ struct aws_linked_list_node *node = aws_linked_list_pop_front(&impl->cross_thread_data.tasks_to_schedule);
+ struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
+ task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
+ }
+
+ /* Warn user if aws_io_handle was subscribed, but never unsubscribed. This would cause memory leaks. */
+ AWS_ASSERT(impl->thread_data.connected_handle_count == 0);
+
+ /* Clean up everything else */
+ aws_mutex_clean_up(&impl->cross_thread_data.mutex);
+
+ struct kevent thread_signal_kevent;
+ EV_SET(
+ &thread_signal_kevent,
+ impl->cross_thread_signal_pipe[READ_FD],
+ EVFILT_READ /*filter*/,
+ EV_DELETE /*flags*/,
+ 0 /*fflags*/,
+ 0 /*data*/,
+ NULL /*udata*/);
+
+ kevent(
+ impl->kq_fd,
+ &thread_signal_kevent /*changelist*/,
+ 1 /*nchanges*/,
+ NULL /*eventlist*/,
+ 0 /*nevents*/,
+ NULL /*timeout*/);
+
+ close(impl->cross_thread_signal_pipe[READ_FD]);
+ close(impl->cross_thread_signal_pipe[WRITE_FD]);
+ close(impl->kq_fd);
+ aws_thread_clean_up(&impl->thread_created_on);
+ aws_mem_release(event_loop->alloc, impl);
+ aws_event_loop_clean_up_base(event_loop);
+ aws_mem_release(event_loop->alloc, event_loop);
+}
+
+static int s_run(struct aws_event_loop *event_loop) {
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+ AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: starting event-loop thread.", (void *)event_loop);
+ /* to re-run, call stop() and wait_for_stop_completion() */
+ AWS_ASSERT(impl->cross_thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
+ AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
+
+ /* Since thread isn't running it's ok to touch thread_data,
+ * and it's ok to touch cross_thread_data without locking the mutex */
+ impl->cross_thread_data.state = EVENT_THREAD_STATE_RUNNING;
+
+ int err = aws_thread_launch(&impl->thread_created_on, s_event_thread_main, (void *)event_loop, NULL);
+ if (err) {
+ AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
+ goto clean_up;
+ }
+
+ return AWS_OP_SUCCESS;
+
+clean_up:
+ impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
+ return AWS_OP_ERR;
+}
+
+/* This function can't fail, we're relying on the thread responding to critical messages (ex: stop thread) */
+void signal_cross_thread_data_changed(struct aws_event_loop *event_loop) {
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: signaling event-loop that cross-thread tasks need to be scheduled.",
+ (void *)event_loop);
+ /* Doesn't actually matter what we write, any activity on pipe signals that cross_thread_data has changed,
+ * If the pipe is full and the write fails, that's fine, the event-thread will get the signal from some previous
+ * write */
+ uint32_t write_whatever = 0xC0FFEE;
+ write(impl->cross_thread_signal_pipe[WRITE_FD], &write_whatever, sizeof(write_whatever));
+}
+
+static int s_stop(struct aws_event_loop *event_loop) {
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+ bool signal_thread = false;
+
+ { /* Begin critical section */
+ aws_mutex_lock(&impl->cross_thread_data.mutex);
+ if (impl->cross_thread_data.state == EVENT_THREAD_STATE_RUNNING) {
+ impl->cross_thread_data.state = EVENT_THREAD_STATE_STOPPING;
+ signal_thread = !impl->cross_thread_data.thread_signaled;
+ impl->cross_thread_data.thread_signaled = true;
+ }
+ aws_mutex_unlock(&impl->cross_thread_data.mutex);
+ } /* End critical section */
+
+ if (signal_thread) {
+ signal_cross_thread_data_changed(event_loop);
+ }
+
+ return AWS_OP_SUCCESS;
+}
+
+static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+#ifdef DEBUG_BUILD
+ aws_mutex_lock(&impl->cross_thread_data.mutex);
+ /* call stop() before wait_for_stop_completion() or you'll wait forever */
+ AWS_ASSERT(impl->cross_thread_data.state != EVENT_THREAD_STATE_RUNNING);
+ aws_mutex_unlock(&impl->cross_thread_data.mutex);
+#endif
+
+ int err = aws_thread_join(&impl->thread_created_on);
+ if (err) {
+ return AWS_OP_ERR;
+ }
+
+ /* Since thread is no longer running it's ok to touch thread_data,
+ * and it's ok to touch cross_thread_data without locking the mutex */
+ impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
+ impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
+
+ return AWS_OP_SUCCESS;
+}
+
+/* Common functionality for "now" and "future" task scheduling.
+ * If `run_at_nanos` is zero then the task is scheduled as a "now" task. */
+static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
+ AWS_ASSERT(task);
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+ /* If we're on the event-thread, just schedule it directly */
+ if (s_is_event_thread(event_loop)) {
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: scheduling task %p in-thread for timestamp %llu",
+ (void *)event_loop,
+ (void *)task,
+ (unsigned long long)run_at_nanos);
+ if (run_at_nanos == 0) {
+ aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
+ } else {
+ aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
+ }
+ return;
+ }
+
+ /* Otherwise, add it to cross_thread_data.tasks_to_schedule and signal the event-thread to process it */
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: scheduling task %p cross-thread for timestamp %llu",
+ (void *)event_loop,
+ (void *)task,
+ (unsigned long long)run_at_nanos);
+ task->timestamp = run_at_nanos;
+ bool should_signal_thread = false;
+
+ /* Begin critical section */
+ aws_mutex_lock(&impl->cross_thread_data.mutex);
+ aws_linked_list_push_back(&impl->cross_thread_data.tasks_to_schedule, &task->node);
+
+ /* Signal thread that cross_thread_data has changed (unless it's been signaled already) */
+ if (!impl->cross_thread_data.thread_signaled) {
+ should_signal_thread = true;
+ impl->cross_thread_data.thread_signaled = true;
+ }
+
+ aws_mutex_unlock(&impl->cross_thread_data.mutex);
+ /* End critical section */
+
+ if (should_signal_thread) {
+ signal_cross_thread_data_changed(event_loop);
+ }
+}
+
+static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
+ s_schedule_task_common(event_loop, task, 0); /* Zero is used to denote "now" tasks */
+}
+
+static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
+ s_schedule_task_common(event_loop, task, run_at_nanos);
+}
+
+static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
+ struct kqueue_loop *kqueue_loop = event_loop->impl_data;
+ AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
+ aws_task_scheduler_cancel_task(&kqueue_loop->thread_data.scheduler, task);
+}
+
+/* Scheduled task that connects aws_io_handle with the kqueue */
+static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
+ (void)task;
+ struct handle_data *handle_data = user_data;
+ struct aws_event_loop *event_loop = handle_data->event_loop;
+ struct kqueue_loop *impl = handle_data->event_loop->impl_data;
+
+ impl->thread_data.connected_handle_count++;
+
+ /* if task was cancelled, nothing to do */
+ if (status == AWS_TASK_STATUS_CANCELED) {
+ return;
+ }
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle_data->owner->data.fd);
+
+ /* If handle was unsubscribed before this task could execute, nothing to do */
+ if (handle_data->state == HANDLE_STATE_UNSUBSCRIBED) {
+ return;
+ }
+
+ AWS_ASSERT(handle_data->state == HANDLE_STATE_SUBSCRIBING);
+
+ /* In order to monitor both reads and writes, kqueue requires you to add two separate kevents.
+ * If we're adding two separate kevents, but one of those fails, we need to remove the other kevent.
+ * Therefore we use the EV_RECEIPT flag. This causes kevent() to tell whether each EV_ADD succeeded,
+ * rather than the usual behavior of telling us about recent events. */
+ struct kevent changelist[2];
+ AWS_ZERO_ARRAY(changelist);
+
+ int changelist_size = 0;
+
+ if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
+ EV_SET(
+ &changelist[changelist_size++],
+ handle_data->owner->data.fd,
+ EVFILT_READ /*filter*/,
+ EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
+ 0 /*fflags*/,
+ 0 /*data*/,
+ handle_data /*udata*/);
+ }
+ if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
+ EV_SET(
+ &changelist[changelist_size++],
+ handle_data->owner->data.fd,
+ EVFILT_WRITE /*filter*/,
+ EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
+ 0 /*fflags*/,
+ 0 /*data*/,
+ handle_data /*udata*/);
+ }
+
+ int num_events = kevent(
+ impl->kq_fd,
+ changelist /*changelist*/,
+ changelist_size /*nchanges*/,
+ changelist /*eventlist. It's OK to re-use the same memory for changelist input and eventlist output*/,
+ changelist_size /*nevents*/,
+ NULL /*timeout*/);
+ if (num_events == -1) {
+ goto subscribe_failed;
+ }
+
+ /* Look through results to see if any failed */
+ for (int i = 0; i < num_events; ++i) {
+ /* Every result should be flagged as error, that's just how EV_RECEIPT works */
+ AWS_ASSERT(changelist[i].flags & EV_ERROR);
+
+ /* If a real error occurred, .data contains the error code */
+ if (changelist[i].data != 0) {
+ goto subscribe_failed;
+ }
+ }
+
+ /* Success */
+ handle_data->state = HANDLE_STATE_SUBSCRIBED;
+ return;
+
+subscribe_failed:
+ AWS_LOGF_ERROR(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: failed to subscribe to events on fd %d",
+ (void *)event_loop,
+ handle_data->owner->data.fd);
+ /* Remove any related kevents that succeeded */
+ for (int i = 0; i < num_events; ++i) {
+ if (changelist[i].data == 0) {
+ changelist[i].flags = EV_DELETE;
+ kevent(
+ impl->kq_fd,
+ &changelist[i] /*changelist*/,
+ 1 /*nchanges*/,
+ NULL /*eventlist*/,
+ 0 /*nevents*/,
+ NULL /*timeout*/);
+ }
+ }
+
+ /* We can't return an error code because this was a scheduled task.
+ * Notify the user of the failed subscription by passing AWS_IO_EVENT_TYPE_ERROR to the callback. */
+ handle_data->on_event(event_loop, handle_data->owner, AWS_IO_EVENT_TYPE_ERROR, handle_data->on_event_user_data);
+}
+
+static int s_subscribe_to_io_events(
+ struct aws_event_loop *event_loop,
+ struct aws_io_handle *handle,
+ int events,
+ aws_event_loop_on_event_fn *on_event,
+ void *user_data) {
+
+ AWS_ASSERT(event_loop);
+ AWS_ASSERT(handle->data.fd != -1);
+ AWS_ASSERT(handle->additional_data == NULL);
+ AWS_ASSERT(on_event);
+ /* Must subscribe for read, write, or both */
+ AWS_ASSERT(events & (AWS_IO_EVENT_TYPE_READABLE | AWS_IO_EVENT_TYPE_WRITABLE));
+
+ struct handle_data *handle_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct handle_data));
+ if (!handle_data) {
+ return AWS_OP_ERR;
+ }
+
+ handle_data->owner = handle;
+ handle_data->event_loop = event_loop;
+ handle_data->on_event = on_event;
+ handle_data->on_event_user_data = user_data;
+ handle_data->events_subscribed = events;
+ handle_data->state = HANDLE_STATE_SUBSCRIBING;
+
+ handle->additional_data = handle_data;
+
+ /* We schedule a task to perform the actual changes to the kqueue, read on for an explanation why...
+ *
+ * kqueue requires separate registrations for read and write events.
+ * If the user wants to know about both read and write, we need register once for read and once for write.
+ * If the first registration succeeds, but the second registration fails, we need to delete the first registration.
+ * If this all happened outside the event-thread, the successful registration's events could begin processing
+ * in the brief window of time before the registration is deleted. */
+
+ aws_task_init(&handle_data->subscribe_task, s_subscribe_task, handle_data, "kqueue_event_loop_subscribe");
+ s_schedule_task_now(event_loop, &handle_data->subscribe_task);
+
+ return AWS_OP_SUCCESS;
+}
+
+static void s_free_io_event_resources(void *user_data) {
+ struct handle_data *handle_data = user_data;
+ struct kqueue_loop *impl = handle_data->event_loop->impl_data;
+
+ impl->thread_data.connected_handle_count--;
+
+ aws_mem_release(handle_data->event_loop->alloc, handle_data);
+}
+
+static void s_clean_up_handle_data_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
+ (void)task;
+ (void)status;
+
+ struct handle_data *handle_data = user_data;
+ s_free_io_event_resources(handle_data);
+}
+
+static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd);
+ AWS_ASSERT(handle->additional_data);
+ struct handle_data *handle_data = handle->additional_data;
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+ AWS_ASSERT(event_loop == handle_data->event_loop);
+
+ /* If the handle was successfully subscribed to kqueue, then remove it. */
+ if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
+ struct kevent changelist[2];
+ int changelist_size = 0;
+
+ if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
+ EV_SET(
+ &changelist[changelist_size++],
+ handle_data->owner->data.fd,
+ EVFILT_READ /*filter*/,
+ EV_DELETE /*flags*/,
+ 0 /*fflags*/,
+ 0 /*data*/,
+ handle_data /*udata*/);
+ }
+ if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
+ EV_SET(
+ &changelist[changelist_size++],
+ handle_data->owner->data.fd,
+ EVFILT_WRITE /*filter*/,
+ EV_DELETE /*flags*/,
+ 0 /*fflags*/,
+ 0 /*data*/,
+ handle_data /*udata*/);
+ }
+
+ kevent(impl->kq_fd, changelist, changelist_size, NULL /*eventlist*/, 0 /*nevents*/, NULL /*timeout*/);
+ }
+
+ /* Schedule a task to clean up the memory. This is done in a task to prevent the following scenario:
+ * - While processing a batch of events, some callback unsubscribes another aws_io_handle.
+ * - One of the other events in this batch belongs to that other aws_io_handle.
+ * - If the handle_data were already deleted, there would be an access invalid memory. */
+
+ aws_task_init(
+ &handle_data->cleanup_task, s_clean_up_handle_data_task, handle_data, "kqueue_event_loop_clean_up_handle_data");
+ aws_event_loop_schedule_task_now(event_loop, &handle_data->cleanup_task);
+
+ handle_data->state = HANDLE_STATE_UNSUBSCRIBED;
+ handle->additional_data = NULL;
+
+ return AWS_OP_SUCCESS;
+}
+
+static bool s_is_event_thread(struct aws_event_loop *event_loop) {
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+ aws_thread_id_t *thread_id = aws_atomic_load_ptr(&impl->running_thread_id);
+ return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
+}
+
+/* Called from thread.
+ * Takes tasks from tasks_to_schedule and adds them to the scheduler. */
+static void s_process_tasks_to_schedule(struct aws_event_loop *event_loop, struct aws_linked_list *tasks_to_schedule) {
+ struct kqueue_loop *impl = event_loop->impl_data;
+ AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop);
+
+ while (!aws_linked_list_empty(tasks_to_schedule)) {
+ struct aws_linked_list_node *node = aws_linked_list_pop_front(tasks_to_schedule);
+ struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
+
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: task %p pulled to event-loop, scheduling now.",
+ (void *)event_loop,
+ (void *)task);
+ /* Timestamp 0 is used to denote "now" tasks */
+ if (task->timestamp == 0) {
+ aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
+ } else {
+ aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, task->timestamp);
+ }
+ }
+}
+
+static void s_process_cross_thread_data(struct aws_event_loop *event_loop) {
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+ AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread data to process", (void *)event_loop);
+ /* If there are tasks to schedule, grab them all out of synced_data.tasks_to_schedule.
+ * We'll process them later, so that we minimize time spent holding the mutex. */
+ struct aws_linked_list tasks_to_schedule;
+ aws_linked_list_init(&tasks_to_schedule);
+
+ { /* Begin critical section */
+ aws_mutex_lock(&impl->cross_thread_data.mutex);
+ impl->cross_thread_data.thread_signaled = false;
+
+ bool initiate_stop = (impl->cross_thread_data.state == EVENT_THREAD_STATE_STOPPING) &&
+ (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING);
+ if (AWS_UNLIKELY(initiate_stop)) {
+ impl->thread_data.state = EVENT_THREAD_STATE_STOPPING;
+ }
+
+ aws_linked_list_swap_contents(&impl->cross_thread_data.tasks_to_schedule, &tasks_to_schedule);
+
+ aws_mutex_unlock(&impl->cross_thread_data.mutex);
+ } /* End critical section */
+
+ s_process_tasks_to_schedule(event_loop, &tasks_to_schedule);
+}
+
+static int s_aws_event_flags_from_kevent(struct kevent *kevent) {
+ int event_flags = 0;
+
+ if (kevent->flags & EV_ERROR) {
+ event_flags |= AWS_IO_EVENT_TYPE_ERROR;
+ } else if (kevent->filter == EVFILT_READ) {
+ if (kevent->data != 0) {
+ event_flags |= AWS_IO_EVENT_TYPE_READABLE;
+ }
+
+ if (kevent->flags & EV_EOF) {
+ event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
+ }
+ } else if (kevent->filter == EVFILT_WRITE) {
+ if (kevent->data != 0) {
+ event_flags |= AWS_IO_EVENT_TYPE_WRITABLE;
+ }
+
+ if (kevent->flags & EV_EOF) {
+ event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
+ }
+ }
+
+ return event_flags;
+}
+
+static void s_event_thread_main(void *user_data) {
+ struct aws_event_loop *event_loop = user_data;
+ AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
+ struct kqueue_loop *impl = event_loop->impl_data;
+
+ /* set thread id to the event-loop's thread. */
+ aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);
+
+ AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
+ impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
+
+ struct kevent kevents[MAX_EVENTS];
+
+ /* A single aws_io_handle could have two separate kevents if subscribed for both read and write.
+ * If both the read and write kevents fire in the same loop of the event-thread,
+ * combine the event-flags and deliver them in a single callback.
+ * This makes the kqueue_event_loop behave more like the other platform implementations. */
+ struct handle_data *io_handle_events[MAX_EVENTS];
+
+ struct timespec timeout = {
+ .tv_sec = DEFAULT_TIMEOUT_SEC,
+ .tv_nsec = 0,
+ };
+
+ AWS_LOGF_INFO(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: default timeout %ds, and max events to process per tick %d",
+ (void *)event_loop,
+ DEFAULT_TIMEOUT_SEC,
+ MAX_EVENTS);
+
+ while (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING) {
+ int num_io_handle_events = 0;
+ bool should_process_cross_thread_data = false;
+
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: waiting for a maximum of %ds %lluns",
+ (void *)event_loop,
+ (int)timeout.tv_sec,
+ (unsigned long long)timeout.tv_nsec);
+
+ /* Process kqueue events */
+ int num_kevents = kevent(
+ impl->kq_fd, NULL /*changelist*/, 0 /*nchanges*/, kevents /*eventlist*/, MAX_EVENTS /*nevents*/, &timeout);
+
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, num_kevents);
+ if (num_kevents == -1) {
+ /* Raise an error, in case this is interesting to anyone monitoring,
+ * and continue on with this loop. We can't process events,
+ * but we can still process scheduled tasks */
+ aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
+
+ /* Force the cross_thread_data to be processed.
+ * There might be valuable info in there, like the message to stop the thread.
+ * It's fine to do this even if nothing has changed, it just costs a mutex lock/unlock. */
+ should_process_cross_thread_data = true;
+ }
+
+ for (int i = 0; i < num_kevents; ++i) {
+ struct kevent *kevent = &kevents[i];
+
+ /* Was this event to signal that cross_thread_data has changed? */
+ if ((int)kevent->ident == impl->cross_thread_signal_pipe[READ_FD]) {
+ should_process_cross_thread_data = true;
+
+ /* Drain whatever data was written to the signaling pipe */
+ uint32_t read_whatever;
+ while (read((int)kevent->ident, &read_whatever, sizeof(read_whatever)) > 0) {
+ }
+
+ continue;
+ }
+
+ /* Otherwise this was a normal event on a subscribed handle. Figure out which flags to report. */
+ int event_flags = s_aws_event_flags_from_kevent(kevent);
+ if (event_flags == 0) {
+ continue;
+ }
+
+ /* Combine flags, in case multiple kevents correspond to one handle. (see notes at top of function) */
+ struct handle_data *handle_data = kevent->udata;
+ if (handle_data->events_this_loop == 0) {
+ io_handle_events[num_io_handle_events++] = handle_data;
+ }
+ handle_data->events_this_loop |= event_flags;
+ }
+
+ /* Invoke each handle's event callback (unless the handle has been unsubscribed) */
+ for (int i = 0; i < num_io_handle_events; ++i) {
+ struct handle_data *handle_data = io_handle_events[i];
+
+ if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: activity on fd %d, invoking handler.",
+ (void *)event_loop,
+ handle_data->owner->data.fd);
+ handle_data->on_event(
+ event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data);
+ }
+
+ handle_data->events_this_loop = 0;
+ }
+
+ /* Process cross_thread_data */
+ if (should_process_cross_thread_data) {
+ s_process_cross_thread_data(event_loop);
+ }
+
+ /* Run scheduled tasks */
+ uint64_t now_ns = 0;
+ event_loop->clock(&now_ns); /* If clock fails, now_ns will be 0 and tasks scheduled for a specific time
+ will not be run. That's ok, we'll handle them next time around. */
+ AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
+ aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);
+
+ /* Set timeout for next kevent() call.
+ * If clock fails, or scheduler has no tasks, use default timeout */
+ bool use_default_timeout = false;
+
+ int err = event_loop->clock(&now_ns);
+ if (err) {
+ use_default_timeout = true;
+ }
+
+ uint64_t next_run_time_ns;
+ if (!aws_task_scheduler_has_tasks(&impl->thread_data.scheduler, &next_run_time_ns)) {
+
+ use_default_timeout = true;
+ }
+
+ if (use_default_timeout) {
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
+ timeout.tv_sec = DEFAULT_TIMEOUT_SEC;
+ timeout.tv_nsec = 0;
+ } else {
+ /* Convert from timestamp in nanoseconds, to timeout in seconds with nanosecond remainder */
+ uint64_t timeout_ns = next_run_time_ns > now_ns ? next_run_time_ns - now_ns : 0;
+
+ uint64_t timeout_remainder_ns = 0;
+ uint64_t timeout_sec =
+ aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &timeout_remainder_ns);
+
+ if (timeout_sec > LONG_MAX) { /* Check for overflow. On Darwin, these values are stored as longs */
+ timeout_sec = LONG_MAX;
+ timeout_remainder_ns = 0;
+ }
+
+ AWS_LOGF_TRACE(
+ AWS_LS_IO_EVENT_LOOP,
+ "id=%p: detected more scheduled tasks with the next occurring at "
+ "%llu using timeout of %ds %lluns.",
+ (void *)event_loop,
+ (unsigned long long)timeout_ns,
+ (int)timeout_sec,
+ (unsigned long long)timeout_remainder_ns);
+ timeout.tv_sec = (time_t)(timeout_sec);
+ timeout.tv_nsec = (long)(timeout_remainder_ns);
+ }
+ }
+
+ AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
+ /* reset to NULL. This should be updated again during destroy before tasks are canceled. */
+ aws_atomic_store_ptr(&impl->running_thread_id, NULL);
+}