aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-08-07 13:35:52 +0300
committersnaury <snaury@ydb.tech>2023-08-07 14:26:11 +0300
commit884279adc343b71eaadb4d75d088c1338aeda378 (patch)
tree483c1ecc0788d8d597dd60cee1e1c2adce5b6345 /library/cpp
parentd79a2a297724679b1adf2ab348defdbdae3e73be (diff)
downloadydb-884279adc343b71eaadb4d75d088c1338aeda378.tar.gz
Support running C++ coroutines as actors KIKIMR-18962
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/CMakeLists.txt1
-rw-r--r--library/cpp/actors/cppcoro/CMakeLists.darwin-x86_64.txt22
-rw-r--r--library/cpp/actors/cppcoro/CMakeLists.linux-aarch64.txt23
-rw-r--r--library/cpp/actors/cppcoro/CMakeLists.linux-x86_64.txt23
-rw-r--r--library/cpp/actors/cppcoro/CMakeLists.txt17
-rw-r--r--library/cpp/actors/cppcoro/CMakeLists.windows-x86_64.txt22
-rw-r--r--library/cpp/actors/cppcoro/await_callback.cpp1
-rw-r--r--library/cpp/actors/cppcoro/await_callback.h98
-rw-r--r--library/cpp/actors/cppcoro/task.cpp1
-rw-r--r--library/cpp/actors/cppcoro/task.h312
-rw-r--r--library/cpp/actors/cppcoro/task_actor.cpp157
-rw-r--r--library/cpp/actors/cppcoro/task_actor.h89
-rw-r--r--library/cpp/actors/cppcoro/task_actor_ut.cpp93
-rw-r--r--library/cpp/actors/cppcoro/task_group.cpp1
-rw-r--r--library/cpp/actors/cppcoro/task_group.h333
-rw-r--r--library/cpp/actors/cppcoro/task_ut.cpp261
-rw-r--r--library/cpp/actors/cppcoro/ut/CMakeLists.darwin-x86_64.txt69
-rw-r--r--library/cpp/actors/cppcoro/ut/CMakeLists.linux-aarch64.txt72
-rw-r--r--library/cpp/actors/cppcoro/ut/CMakeLists.linux-x86_64.txt74
-rw-r--r--library/cpp/actors/cppcoro/ut/CMakeLists.txt17
-rw-r--r--library/cpp/actors/cppcoro/ut/CMakeLists.windows-x86_64.txt62
-rw-r--r--library/cpp/actors/cppcoro/ut/ya.make12
-rw-r--r--library/cpp/actors/cppcoro/ya.make22
-rw-r--r--library/cpp/actors/ya.make1
24 files changed, 1783 insertions, 0 deletions
diff --git a/library/cpp/actors/CMakeLists.txt b/library/cpp/actors/CMakeLists.txt
index 45fe46a94d..becd73cd24 100644
--- a/library/cpp/actors/CMakeLists.txt
+++ b/library/cpp/actors/CMakeLists.txt
@@ -8,6 +8,7 @@
add_subdirectory(actor_type)
add_subdirectory(core)
+add_subdirectory(cppcoro)
add_subdirectory(dnscachelib)
add_subdirectory(dnsresolver)
add_subdirectory(examples)
diff --git a/library/cpp/actors/cppcoro/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/cppcoro/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..ecac0aa784
--- /dev/null
+++ b/library/cpp/actors/cppcoro/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(cpp-actors-cppcoro)
+target_link_libraries(cpp-actors-cppcoro PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+)
+target_sources(cpp-actors-cppcoro PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/await_callback.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_group.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task.cpp
+)
diff --git a/library/cpp/actors/cppcoro/CMakeLists.linux-aarch64.txt b/library/cpp/actors/cppcoro/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..ff385af6fa
--- /dev/null
+++ b/library/cpp/actors/cppcoro/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(cpp-actors-cppcoro)
+target_link_libraries(cpp-actors-cppcoro PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+)
+target_sources(cpp-actors-cppcoro PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/await_callback.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_group.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task.cpp
+)
diff --git a/library/cpp/actors/cppcoro/CMakeLists.linux-x86_64.txt b/library/cpp/actors/cppcoro/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..ff385af6fa
--- /dev/null
+++ b/library/cpp/actors/cppcoro/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(cpp-actors-cppcoro)
+target_link_libraries(cpp-actors-cppcoro PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+)
+target_sources(cpp-actors-cppcoro PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/await_callback.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_group.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task.cpp
+)
diff --git a/library/cpp/actors/cppcoro/CMakeLists.txt b/library/cpp/actors/cppcoro/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/library/cpp/actors/cppcoro/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/library/cpp/actors/cppcoro/CMakeLists.windows-x86_64.txt b/library/cpp/actors/cppcoro/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..ecac0aa784
--- /dev/null
+++ b/library/cpp/actors/cppcoro/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(cpp-actors-cppcoro)
+target_link_libraries(cpp-actors-cppcoro PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+)
+target_sources(cpp-actors-cppcoro PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/await_callback.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_group.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task.cpp
+)
diff --git a/library/cpp/actors/cppcoro/await_callback.cpp b/library/cpp/actors/cppcoro/await_callback.cpp
new file mode 100644
index 0000000000..5132131a8e
--- /dev/null
+++ b/library/cpp/actors/cppcoro/await_callback.cpp
@@ -0,0 +1 @@
+#include "await_callback.h"
diff --git a/library/cpp/actors/cppcoro/await_callback.h b/library/cpp/actors/cppcoro/await_callback.h
new file mode 100644
index 0000000000..9f23d5e0db
--- /dev/null
+++ b/library/cpp/actors/cppcoro/await_callback.h
@@ -0,0 +1,98 @@
+#include <coroutine>
+#include <exception>
+#include <concepts>
+
+namespace NActors {
+
+ namespace NDetail {
+ template<class TAwaitable>
+ decltype(auto) GetAwaiter(TAwaitable&& awaitable) {
+ if constexpr (requires { ((TAwaitable&&) awaitable).operator co_await(); }) {
+ return ((TAwaitable&&) awaitable).operator co_await();
+ } else if constexpr (requires { operator co_await((TAwaitable&&) awaitable); }) {
+ return operator co_await((TAwaitable&&) awaitable);
+ } else {
+ return ((TAwaitable&&) awaitable);
+ }
+ }
+
+ template<class TAwaitable>
+ using TAwaitResult = decltype(GetAwaiter(std::declval<TAwaitable>()).await_resume());
+
+ template<class TCallback, class TResult>
+ class TCallbackResult {
+ public:
+ TCallbackResult(TCallback& callback)
+ : Callback_(callback)
+ {}
+
+ template<class TRealResult>
+ void return_value(TRealResult&& result) noexcept {
+ Callback_(std::forward<TRealResult>(result));
+ }
+
+ private:
+ TCallback& Callback_;
+ };
+
+ template<class TCallback>
+ class TCallbackResult<TCallback, void> {
+ public:
+ TCallbackResult(TCallback& callback)
+ : Callback_(callback)
+ {}
+
+ void return_void() noexcept {
+ Callback_();
+ }
+
+ private:
+ TCallback& Callback_;
+ };
+
+ template<class TAwaitable, class TCallback>
+ class TAwaitThenCallbackPromise
+ : public TCallbackResult<TCallback, TAwaitResult<TAwaitable>>
+ {
+ public:
+ using THandle = std::coroutine_handle<TAwaitThenCallbackPromise<TAwaitable, TCallback>>;
+
+ TAwaitThenCallbackPromise(TAwaitable&, TCallback& callback)
+ : TCallbackResult<TCallback, TAwaitResult<TAwaitable>>(callback)
+ {}
+
+ THandle get_return_object() noexcept {
+ return THandle::from_promise(*this);
+ }
+
+ static auto initial_suspend() noexcept { return std::suspend_never{}; }
+ static auto final_suspend() noexcept { return std::suspend_never{}; }
+
+ void unhandled_exception() noexcept {
+ std::terminate();
+ }
+ };
+
+ template<class TAwaitable, class TCallback>
+ class TAwaitThenCallback {
+ public:
+ using promise_type = TAwaitThenCallbackPromise<TAwaitable, TCallback>;
+
+ using THandle = typename promise_type::THandle;
+
+ TAwaitThenCallback(THandle) noexcept {}
+ };
+ }
+
+ /**
+ * Awaits the awaitable and calls callback with the result.
+ *
+ * Note: program terminates if awaitable or callback throw an exception.
+ */
+ template<class TAwaitable, class TCallback>
+ NDetail::TAwaitThenCallback<TAwaitable, TCallback> AwaitThenCallback(TAwaitable awaitable, TCallback) {
+ // Note: underlying promise takes callback argument address and calls it when we return
+ co_return co_await std::move(awaitable);
+ }
+
+} // namespace NActors
diff --git a/library/cpp/actors/cppcoro/task.cpp b/library/cpp/actors/cppcoro/task.cpp
new file mode 100644
index 0000000000..204c27c573
--- /dev/null
+++ b/library/cpp/actors/cppcoro/task.cpp
@@ -0,0 +1 @@
+#include "task.h"
diff --git a/library/cpp/actors/cppcoro/task.h b/library/cpp/actors/cppcoro/task.h
new file mode 100644
index 0000000000..dade638ddb
--- /dev/null
+++ b/library/cpp/actors/cppcoro/task.h
@@ -0,0 +1,312 @@
+#pragma once
+#include <util/system/compiler.h>
+#include <util/system/yassert.h>
+#include <coroutine>
+#include <exception>
+#include <variant>
+
+namespace NActors {
+
+ template<class T>
+ class TTask;
+
+ namespace NDetail {
+
+ class TTaskPromiseBase {
+ public:
+ static auto initial_suspend() noexcept {
+ return std::suspend_always{};
+ }
+
+ struct TFinalSuspend {
+ static bool await_ready() noexcept { return false; }
+ static void await_resume() noexcept { std::terminate(); }
+
+ template<class TPromise>
+ static std::coroutine_handle<> await_suspend(std::coroutine_handle<TPromise> h) noexcept {
+ TTaskPromiseBase& promise = h.promise();
+ return std::exchange(promise.Continuation_, {});
+ }
+ };
+
+ static auto final_suspend() noexcept {
+ return TFinalSuspend{};
+ }
+
+ bool HasStarted() const noexcept {
+ return Flags_ & 1;
+ }
+
+ void SetStarted() noexcept {
+ Flags_ |= 1;
+ }
+
+ bool HasContinuation() const noexcept {
+ return Flags_ & 2;
+ }
+
+ void SetContinuation(std::coroutine_handle<> continuation) noexcept {
+ Y_VERIFY_DEBUG(continuation, "Attempt to set an invalid continuation");
+ Y_VERIFY_DEBUG(!HasContinuation(), "Attempt to set multiple continuations");
+ Continuation_ = continuation;
+ Flags_ |= 2;
+ }
+
+ private:
+ // Default is used when task is resumed without a continuation
+ std::coroutine_handle<> Continuation_ = std::noop_coroutine();
+ unsigned char Flags_ = 0;
+ };
+
+ template<class T>
+ class TTaskPromise;
+
+ template<class T>
+ using TTaskHandle = std::coroutine_handle<TTaskPromise<T>>;
+
+ template<class T>
+ class TTaskPromise final : public TTaskPromiseBase {
+ public:
+ TTask<T> get_return_object() noexcept;
+
+ std::coroutine_handle<> Start() noexcept {
+ if (Y_LIKELY(!HasStarted())) {
+ SetStarted();
+ return TTaskHandle<T>::from_promise(*this);
+ } else {
+ // After coroutine starts is cannot be safely resumed, because
+ // it is waiting for something and must be resumed via its
+ // continuation.
+ return std::noop_coroutine();
+ }
+ }
+
+ void unhandled_exception() noexcept {
+ Result_.template emplace<std::exception_ptr>(std::current_exception());
+ }
+
+ template<class TResult>
+ void return_value(TResult&& result) {
+ Result_.template emplace<T>(std::forward<TResult>(result));
+ }
+
+ T ExtractResult() {
+ switch (Result_.index()) {
+ case 0: {
+ std::rethrow_exception(std::get<0>(std::move(Result_)));
+ }
+ case 1: {
+ return std::get<1>(std::move(Result_));
+ }
+ }
+ std::terminate();
+ }
+
+ private:
+ std::variant<std::exception_ptr, T> Result_;
+ };
+
+ template<>
+ class TTaskPromise<void> final : public TTaskPromiseBase {
+ public:
+ TTask<void> get_return_object() noexcept;
+
+ std::coroutine_handle<> Start() noexcept {
+ if (Y_LIKELY(!HasStarted())) {
+ SetStarted();
+ return TTaskHandle<void>::from_promise(*this);
+ } else {
+ // After coroutine starts is cannot be safely resumed, because
+ // it is waiting for something and must be resumed via its
+ // continuation.
+ return std::noop_coroutine();
+ }
+ }
+
+ void unhandled_exception() noexcept {
+ Exception_ = std::current_exception();
+ }
+
+ void return_void() noexcept {
+ Exception_ = nullptr;
+ }
+
+ void ExtractResult() {
+ if (Exception_) {
+ std::rethrow_exception(std::move(Exception_));
+ }
+ }
+
+ private:
+ std::exception_ptr Exception_;
+ };
+
+ } // namespace NDetail
+
+ /**
+ * A bare-bones lazy task implementation
+ *
+ * This task is not thread safe and assumes external synchronization, e.g.
+ * races between destructor and await resume are not allowed and not safe.
+ */
+ template<class T>
+ class TTask final {
+ public:
+ using promise_type = NDetail::TTaskPromise<T>;
+ using value_type = T;
+
+ public:
+ TTask() noexcept = default;
+
+ explicit TTask(NDetail::TTaskHandle<T> handle) noexcept
+ : Handle_(handle)
+ {}
+
+ TTask(TTask&& rhs) noexcept
+ : Handle_(std::exchange(rhs.Handle_, {}))
+ {}
+
+ ~TTask() {
+ if (Handle_) {
+ Handle_.destroy();
+ }
+ }
+
+ TTask& operator=(TTask&& rhs) noexcept {
+ if (Y_LIKELY(this != &rhs)) {
+ auto handle = std::exchange(Handle_, {});
+ Handle_ = std::exchange(rhs.Handle_, {});
+ if (handle) {
+ handle.destroy();
+ }
+ }
+ return *this;
+ }
+
+ /**
+ * Returns true for a valid task object
+ */
+ explicit operator bool() const noexcept {
+ return bool(Handle_);
+ }
+
+ /**
+ * Returns true if the task finished executing (produced a result)
+ */
+ bool Done() const {
+ Y_VERIFY_DEBUG(Handle_);
+ return Handle_.done();
+ }
+
+ /**
+ * Manually start the task, only possible once
+ */
+ void Start() const {
+ Y_VERIFY_DEBUG(!Done());
+ Y_VERIFY_DEBUG(!Promise().HasStarted());
+ Y_VERIFY_DEBUG(!Promise().HasContinuation());
+ Promise().Start().resume();
+ }
+
+ /**
+ * Implementation of awaiter for WhenDone
+ */
+ class TWhenDoneAwaiter {
+ public:
+ TWhenDoneAwaiter(NDetail::TTaskHandle<T> handle) noexcept
+ : Handle_(handle)
+ {}
+
+ bool await_ready() const noexcept {
+ return Handle_.done();
+ }
+
+ std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation) const noexcept {
+ Handle_.promise().SetContinuation(continuation);
+ return Handle_.promise().Start();
+ }
+
+ void await_resume() const noexcept {
+ // nothing
+ }
+
+ private:
+ NDetail::TTaskHandle<T> Handle_;
+ };
+
+ /**
+ * Returns an awaitable that completes when task finishes executing
+ *
+ * Note the result of the task is not consumed.
+ */
+ auto WhenDone() const noexcept {
+ return TWhenDoneAwaiter(Handle_);
+ }
+
+ /**
+ * Extracts result of the task
+ */
+ T ExtractResult() {
+ Y_VERIFY_DEBUG(Done());
+ return Promise().ExtractResult();
+ }
+
+ /**
+ * Implementation of awaiter for co_await
+ */
+ class TAwaiter {
+ public:
+ TAwaiter(TTask&& task) noexcept
+ : Task_(std::move(task))
+ {}
+
+ bool await_ready() const noexcept {
+ return Task_.Done();
+ }
+
+ std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation) const noexcept {
+ Task_.Promise().SetContinuation(continuation);
+ return Task_.Promise().Start();
+ }
+
+ T await_resume() {
+ // We destroy task state before we return
+ TTask task(std::move(Task_));
+ return task.ExtractResult();
+ }
+
+ private:
+ TTask Task_;
+ };
+
+ /**
+ * Returns the task result when it finishes
+ */
+ auto operator co_await() && noexcept {
+ return TAwaiter(std::move(*this));
+ }
+
+ private:
+ NDetail::TTaskPromise<T>& Promise() const noexcept {
+ Y_VERIFY_DEBUG(Handle_);
+ return Handle_.promise();
+ }
+
+ private:
+ NDetail::TTaskHandle<T> Handle_;
+ };
+
+ namespace NDetail {
+
+ template<class T>
+ inline TTask<T> TTaskPromise<T>::get_return_object() noexcept {
+ return TTask<T>(TTaskHandle<T>::from_promise(*this));
+ }
+
+ inline TTask<void> TTaskPromise<void>::get_return_object() noexcept {
+ return TTask<void>(TTaskHandle<void>::from_promise(*this));
+ }
+
+ } // namespace NDetail
+
+} // namespace NActors
diff --git a/library/cpp/actors/cppcoro/task_actor.cpp b/library/cpp/actors/cppcoro/task_actor.cpp
new file mode 100644
index 0000000000..756d7e2f32
--- /dev/null
+++ b/library/cpp/actors/cppcoro/task_actor.cpp
@@ -0,0 +1,157 @@
+#include "task_actor.h"
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NActors {
+
+ class TTaskActorImpl;
+
+ static Y_POD_THREAD(TTaskActorImpl*) TlsCurrentTaskActor{nullptr};
+
+ struct TCurrentTaskActorGuard {
+ TCurrentTaskActorGuard(TTaskActorImpl* current) noexcept {
+ Y_VERIFY(TlsCurrentTaskActor == nullptr);
+ TlsCurrentTaskActor = current;
+ }
+
+ ~TCurrentTaskActorGuard() noexcept {
+ TlsCurrentTaskActor = nullptr;
+ }
+ };
+
+ enum : ui32 {
+ EvResumeTask = EventSpaceBegin(TEvents::ES_SYSTEM) + 256,
+ };
+
+ struct TEvResumeTask : public TEventLocal<TEvResumeTask, EvResumeTask> {
+ std::coroutine_handle<> Handle;
+
+ explicit TEvResumeTask(std::coroutine_handle<> handle) noexcept
+ : Handle(handle)
+ {}
+
+ ~TEvResumeTask() noexcept {
+ // TODO: actor may be dead already
+ }
+ };
+
+ class TTaskActorImpl : public TActor<TTaskActorImpl> {
+ friend class TTaskActor;
+ friend struct TAfterAwaiter;
+ friend struct TBindAwaiter;
+
+ public:
+ TTaskActorImpl(TTask<void>&& task)
+ : TActor(&TThis::StateBoot)
+ , Task(std::move(task))
+ {
+ Y_VERIFY(Task);
+ }
+
+ void Registered(TActorSystem* sys, const TActorId& parent) override {
+ ParentId = parent;
+ sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), SelfId(), {}, 0));
+ }
+
+ STATEFN(StateBoot) {
+ Y_VERIFY(ev->GetTypeRewrite() == TEvents::TSystem::Bootstrap, "Expected bootstrap event");
+ TCurrentTaskActorGuard guard(this);
+ Become(&TThis::StateWork);
+ Task.Start();
+ Check();
+ }
+
+ STATEFN(StateWork) {
+ TCurrentTaskActorGuard guard(this);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvResumeTask, Handle);
+ default:
+ Y_VERIFY(EventWaiter);
+ Event.reset(ev.Release());
+ std::exchange(EventWaiter, {}).resume();
+ }
+ Check();
+ }
+
+ void Handle(TEvResumeTask::TPtr& ev) {
+ auto* msg = ev->Get();
+ std::exchange(msg->Handle, {}).resume();
+ }
+
+ bool Check() {
+ if (Task.Done()) {
+ Y_VERIFY(!EventWaiter, "Task terminated while waiting for the next event");
+ Task.ExtractResult();
+ PassAway();
+ return false;
+ }
+
+ Y_VERIFY(EventWaiter, "Task suspended without waiting for the next event");
+ Event.reset();
+ return true;
+ }
+
+ void WaitForEvent(std::coroutine_handle<> h) noexcept {
+ Y_VERIFY(!EventWaiter, "Task cannot have multiple waiters for the next event");
+ EventWaiter = h;
+ }
+
+ std::unique_ptr<IEventHandle> FinishWaitForEvent() noexcept {
+ Y_VERIFY(Event, "Task does not have current event");
+ return std::move(Event);
+ }
+
+ private:
+ TTask<void> Task;
+ TActorId ParentId;
+ std::coroutine_handle<> EventWaiter;
+ std::unique_ptr<IEventHandle> Event;
+ };
+
+ void TTaskActorNextEvent::await_suspend(std::coroutine_handle<> h) noexcept {
+ Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context");
+ TlsCurrentTaskActor->WaitForEvent(h);
+ }
+
+ std::unique_ptr<IEventHandle> TTaskActorNextEvent::await_resume() noexcept {
+ Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context");
+ return TlsCurrentTaskActor->FinishWaitForEvent();
+ }
+
+ IActor* TTaskActor::Create(TTask<void>&& task) {
+ return new TTaskActorImpl(std::move(task));
+ }
+
+ TActorIdentity TTaskActor::SelfId() noexcept {
+ Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context");
+ return TlsCurrentTaskActor->SelfId();
+ }
+
+ TActorId TTaskActor::ParentId() noexcept {
+ Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context");
+ return TlsCurrentTaskActor->ParentId;
+ }
+
+ void TAfterAwaiter::await_suspend(std::coroutine_handle<> h) noexcept {
+ Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context");
+ TlsCurrentTaskActor->Schedule(Duration, new TEvResumeTask(h));
+ }
+
+ void TAfterAwaiter::await_resume() {
+ }
+
+ bool TBindAwaiter::await_ready() noexcept {
+ if (TlsCurrentTaskActor && TlsCurrentTaskActor->SelfId() == ActorId) {
+ return true;
+ }
+ return false;
+ }
+
+ void TBindAwaiter::await_suspend(std::coroutine_handle<> h) noexcept {
+ Sys->Send(new IEventHandle(ActorId, ActorId, new TEvResumeTask(h)));
+ }
+
+ void TBindAwaiter::await_resume() {
+ }
+
+} // namespace NActors
diff --git a/library/cpp/actors/cppcoro/task_actor.h b/library/cpp/actors/cppcoro/task_actor.h
new file mode 100644
index 0000000000..e4a1c9df3e
--- /dev/null
+++ b/library/cpp/actors/cppcoro/task_actor.h
@@ -0,0 +1,89 @@
+#include <library/cpp/actors/core/actor.h>
+#include "task.h"
+
+namespace NActors {
+
+ struct TTaskActorNextEvent {
+ static constexpr bool await_ready() noexcept { return false; }
+
+ static void await_suspend(std::coroutine_handle<> h) noexcept;
+
+ static std::unique_ptr<IEventHandle> await_resume() noexcept;
+ };
+
+ struct TAfterAwaiter {
+ TDuration Duration;
+
+ static constexpr bool await_ready() noexcept { return false; }
+
+ void await_suspend(std::coroutine_handle<> h) noexcept;
+
+ void await_resume();
+ };
+
+ struct TBindAwaiter {
+ TActorSystem* Sys;
+ TActorId ActorId;
+
+ bool await_ready() noexcept;
+
+ void await_suspend(std::coroutine_handle<> h) noexcept;
+
+ void await_resume();
+ };
+
+ class TTaskActor {
+ public:
+ /**
+ * Creates a new actor that will run the specified task.
+ */
+ static IActor* Create(TTask<void>&& task);
+
+ /**
+ * Returns the next actor event when awaited
+ */
+ static constexpr TTaskActorNextEvent NextEvent{};
+
+ /**
+ * Returns the identity of current task actor.
+ */
+ static TActorIdentity SelfId() noexcept;
+
+ /**
+ * Returns an actor id of the actor that registered current task actor.
+ */
+ static TActorId ParentId() noexcept;
+
+ /**
+ * Returns awaiter that completes after the specified timeout.
+ */
+ static TAfterAwaiter After(TDuration duration) noexcept {
+ return TAfterAwaiter{ duration };
+ }
+
+ /**
+ * Returns awaiter that completes on actor thread when awaited.
+ */
+ static TBindAwaiter Bind() noexcept {
+ TActorId actorId = SelfId();
+ TActorSystem* sys = TActivationContext::ActorSystem();
+ return TBindAwaiter{ sys, actorId };
+ }
+
+ /**
+ * Returns a task that runs the specified task, but binds the result
+ * back to the actor thread. Useful when the specified task may be
+ * working with non-actor coroutines.
+ */
+ template<class T>
+ static TTask<T> Bind(TTask<T>&& task) {
+ // TODO: may run on non-actor thread, protect from unwind
+ return [](TTask<T> task, TBindAwaiter bindTask) -> TTask<T> {
+ co_await task.WhenDone();
+ co_await bindTask;
+ co_return task.ExtractResult();
+ }(std::move(task), Bind());
+ }
+ };
+
+} // namespace NActors
diff --git a/library/cpp/actors/cppcoro/task_actor_ut.cpp b/library/cpp/actors/cppcoro/task_actor_ut.cpp
new file mode 100644
index 0000000000..8037dd418d
--- /dev/null
+++ b/library/cpp/actors/cppcoro/task_actor_ut.cpp
@@ -0,0 +1,93 @@
+#include "task_actor.h"
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(TaskActor) {
+
+ using namespace NActors;
+
+ enum : ui32 {
+ EvBegin = EventSpaceBegin(TEvents::ES_USERSPACE),
+ EvRequest,
+ EvResponse,
+ EvStop,
+ };
+
+ struct TEvRequest: public TEventLocal<TEvRequest, EvRequest> {
+ };
+
+ struct TEvResponse: public TEventLocal<TEvResponse, EvResponse> {
+ };
+
+ struct TEvStop: public TEventLocal<TEvStop, EvStop> {
+ };
+
+ TTask<void> SimpleResponder() {
+ for (;;) {
+ auto ev = co_await TTaskActor::NextEvent;
+ Y_VERIFY(ev->GetTypeRewrite() == TEvRequest::EventType);
+ auto* msg = ev->Get<TEvRequest>();
+ Y_UNUSED(msg);
+ TTaskActor::SelfId().Send(ev->Sender, new TEvResponse);
+ }
+ }
+
+ TTask<void> SimpleRequester(TActorId responder, TManualEvent& doneEvent, std::atomic<int>& itemsProcessed) {
+ // Note: it's ok to use lambda capture because captures outlive this coroutine
+ auto singleRequest = [&]() -> TTask<bool> {
+ TTaskActor::SelfId().Send(responder, new TEvRequest);
+ auto ev = co_await TTaskActor::NextEvent;
+ switch (ev->GetTypeRewrite()) {
+ case TEvResponse::EventType:
+ co_return true;
+ case TEvStop::EventType:
+ co_return false;
+ default:
+ Y_FAIL("Unexpected event");
+ }
+ };
+ while (co_await singleRequest()) {
+ ++itemsProcessed;
+ }
+ doneEvent.Signal();
+ }
+
+ void Check(TDuration duration, std::unique_ptr<IEventBase> stopEvent) {
+ THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>();
+ setup->NodeId = 0;
+ setup->ExecutorsCount = 1;
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
+ for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
+ setup->Executors[i] = new TBasicExecutorPool(i, 5, 10, "basic");
+ }
+ setup->Scheduler = new TBasicSchedulerThread;
+
+ TActorSystem actorSystem(setup);
+
+ actorSystem.Start();
+
+ TManualEvent doneEvent;
+ std::atomic<int> itemsProcessed{0};
+
+ auto responder = actorSystem.Register(TTaskActor::Create(SimpleResponder()));
+ auto requester = actorSystem.Register(TTaskActor::Create(SimpleRequester(responder, doneEvent, itemsProcessed)));
+ auto deadline = TMonotonic::Now() + duration;
+ while (itemsProcessed.load() < 10) {
+ UNIT_ASSERT_C(TMonotonic::Now() < deadline, "cannot observe 10 responses in " << duration);
+ Sleep(TDuration::MilliSeconds(100));
+ }
+ actorSystem.Send(requester, stopEvent.release());
+ doneEvent.WaitI();
+
+ UNIT_ASSERT_GE(itemsProcessed.load(), 10);
+
+ actorSystem.Stop();
+ }
+
+ Y_UNIT_TEST(Basic) {
+ Check(TDuration::Seconds(10), std::make_unique<TEvStop>());
+ }
+
+} // Y_UNIT_TEST_SUITE(TaskActor)
diff --git a/library/cpp/actors/cppcoro/task_group.cpp b/library/cpp/actors/cppcoro/task_group.cpp
new file mode 100644
index 0000000000..9ddd30d707
--- /dev/null
+++ b/library/cpp/actors/cppcoro/task_group.cpp
@@ -0,0 +1 @@
+#include "task_group.h"
diff --git a/library/cpp/actors/cppcoro/task_group.h b/library/cpp/actors/cppcoro/task_group.h
new file mode 100644
index 0000000000..b57cf59529
--- /dev/null
+++ b/library/cpp/actors/cppcoro/task_group.h
@@ -0,0 +1,333 @@
+#pragma once
+#include <util/generic/ptr.h>
+#include <util/system/compiler.h>
+#include <util/system/yassert.h>
+#include <coroutine>
+#include <exception>
+#include <variant>
+#include <atomic>
+#include <memory>
+
+namespace NActors {
+
+ namespace NDetail {
+
+ template<class T>
+ struct TTaskGroupResult final {
+ TTaskGroupResult* Next;
+ std::variant<std::exception_ptr, T> Result_;
+
+ void SetException() {
+ Result_.template emplace<0>(std::current_exception());
+ }
+
+ template<class TResult>
+ void SetValue(TResult&& result) {
+ Result_.template emplace<1>(std::forward<TResult>(result));
+ }
+
+ T Extract() {
+ switch (Result_.index()) {
+ case 0: {
+ std::rethrow_exception(std::get<0>(std::move(Result_)));
+ }
+ case 1: {
+ return std::get<1>(std::move(Result_));
+ }
+ }
+ std::terminate();
+ }
+ };
+
+ template<>
+ struct TTaskGroupResult<void> final {
+ TTaskGroupResult* Next;
+ std::exception_ptr Exception_;
+
+ void SetException() {
+ Exception_ = std::current_exception();
+ }
+
+ void SetValue() {
+ // nothing
+ }
+
+ void Extract() {
+ if (Exception_) {
+ std::rethrow_exception(std::move(Exception_));
+ }
+ }
+ };
+
+ template<class T>
+ struct TTaskGroupSink final
+ : public TAtomicRefCount<TTaskGroupSink<T>>
+ {
+ std::atomic<void*> LastReady{ nullptr };
+ TTaskGroupResult<T>* ReadyQueue = nullptr;
+ std::coroutine_handle<> Continuation;
+
+ static constexpr uintptr_t MarkerAwaiting = 1;
+ static constexpr uintptr_t MarkerDetached = 2;
+
+ std::coroutine_handle<> Push(std::unique_ptr<TTaskGroupResult<T>>&& result) noexcept {
+ void* currentValue = LastReady.load(std::memory_order_acquire);
+ for (;;) {
+ if (currentValue == (void*)MarkerAwaiting) {
+ if (Y_UNLIKELY(!LastReady.compare_exchange_weak(currentValue, nullptr, std::memory_order_acquire))) {
+ continue;
+ }
+ // We consume the awaiter
+ Y_VERIFY(ReadyQueue == nullptr, "TaskGroup is awaiting with non-empty ready queue");
+ result->Next = nullptr;
+ ReadyQueue = result.release();
+ return std::exchange(Continuation, {});
+ }
+ if (currentValue == (void*)MarkerDetached) {
+ // Task group is detached, discard the result
+ return std::noop_coroutine();
+ }
+ TTaskGroupResult<T>* current = reinterpret_cast<TTaskGroupResult<T>*>(currentValue);
+ result->Next = current;
+ void* nextValue = result.get();
+ if (Y_LIKELY(LastReady.compare_exchange_weak(currentValue, nextValue, std::memory_order_acq_rel))) {
+ // Result successfully added
+ result.release();
+ return std::noop_coroutine();
+ }
+ }
+ }
+
+ bool Ready() const noexcept {
+ return ReadyQueue != nullptr || LastReady.load(std::memory_order_acquire) != nullptr;
+ }
+
+ Y_NO_INLINE std::coroutine_handle<> Suspend(std::coroutine_handle<> h) noexcept {
+ Y_VERIFY(ReadyQueue == nullptr, "Caller suspending with non-empty ready queue");
+ Continuation = h;
+ void* currentValue = LastReady.load(std::memory_order_acquire);
+ for (;;) {
+ if (currentValue == nullptr) {
+ if (Y_UNLIKELY(!LastReady.compare_exchange_weak(currentValue, (void*)MarkerAwaiting, std::memory_order_release))) {
+ continue;
+ }
+ // Continuation may wake up on another thread
+ return std::noop_coroutine();
+ }
+ Y_VERIFY(currentValue != (void*)MarkerAwaiting, "TaskGroup is suspending with an awaiting marker");
+ Y_VERIFY(currentValue != (void*)MarkerDetached, "TaskGroup is suspending with a detached marker");
+ // Race: ready queue is not actually empty
+ Continuation = {};
+ return h;
+ }
+ }
+
+ std::unique_ptr<TTaskGroupResult<T>> Resume() noexcept {
+ std::unique_ptr<TTaskGroupResult<T>> result;
+ if (ReadyQueue == nullptr) {
+ void* headValue = LastReady.exchange(nullptr, std::memory_order_acq_rel);
+ Y_VERIFY(headValue != (void*)MarkerAwaiting, "TaskGroup is resuming with an awaiting marker");
+ Y_VERIFY(headValue != (void*)MarkerDetached, "TaskGroup is resuming with a detached marker");
+ Y_VERIFY(headValue, "TaskGroup is resuming with an empty queue");
+ TTaskGroupResult<T>* head = reinterpret_cast<TTaskGroupResult<T>*>(headValue);
+ while (head) {
+ auto* next = std::exchange(head->Next, nullptr);
+ head->Next = ReadyQueue;
+ ReadyQueue = head;
+ head = next;
+ }
+ }
+ Y_VERIFY(ReadyQueue != nullptr);
+ result.reset(ReadyQueue);
+ ReadyQueue = std::exchange(result->Next, nullptr);
+ return result;
+ }
+
+ static void Dispose(TTaskGroupResult<T>* head) noexcept {
+ while (head) {
+ auto* next = std::exchange(head->Next, nullptr);
+ std::unique_ptr<TTaskGroupResult<T>> ptr(head);
+ head = next;
+ }
+ }
+
+ void Detach() noexcept {
+ // After this exchange all new results will be discarded
+ void* headValue = LastReady.exchange((void*)MarkerDetached, std::memory_order_acq_rel);
+ Y_VERIFY(headValue != (void*)MarkerAwaiting, "TaskGroup is detaching with an awaiting marker");
+ Y_VERIFY(headValue != (void*)MarkerDetached, "TaskGroup is detaching with a detached marker");
+ if (headValue) {
+ Dispose(reinterpret_cast<TTaskGroupResult<T>*>(headValue));
+ }
+ if (ReadyQueue) {
+ Dispose(std::exchange(ReadyQueue, nullptr));
+ }
+ }
+ };
+
+ template<class T>
+ class TTaskGroupPromiseBase {
+ public:
+ static auto initial_suspend() noexcept { return std::suspend_always{}; }
+
+ class TFinalSuspend {
+ public:
+ TFinalSuspend(TTaskGroupPromiseBase& promise)
+ : Promise_(promise)
+ {}
+
+ static bool await_ready() noexcept { return false; }
+
+ Y_NO_INLINE std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept {
+ auto next = Promise_.Sink_->Push(std::move(Promise_.Result_));
+ h.destroy();
+ return next;
+ }
+
+ static void await_resume() noexcept { std::terminate(); }
+
+ private:
+ TTaskGroupPromiseBase& Promise_;
+ };
+
+ auto final_suspend() noexcept { return TFinalSuspend(*this); }
+
+ void unhandled_exception() noexcept {
+ Result_->SetException();
+ Sink_->Push(std::move(Result_));
+ }
+
+ void SetSink(const TIntrusivePtr<TTaskGroupSink<T>>& sink) {
+ Sink_ = sink;
+ }
+
+ protected:
+ std::unique_ptr<TTaskGroupResult<T>> Result_ = std::make_unique<TTaskGroupResult<T>>();
+ TIntrusivePtr<TTaskGroupSink<T>> Sink_;
+ };
+
+ template<class T>
+ class TTaskGroupPromise final : public TTaskGroupPromiseBase<T> {
+ public:
+ using THandle = std::coroutine_handle<TTaskGroupPromise<T>>;
+
+ THandle get_return_object() noexcept {
+ return THandle::from_promise(*this);
+ }
+
+ template<class TResult>
+ void return_value(TResult&& result) {
+ this->Result_->SetValue(std::forward<TResult>(result));
+ }
+ };
+
+ template<>
+ class TTaskGroupPromise<void> final : public TTaskGroupPromiseBase<void> {
+ public:
+ using THandle = std::coroutine_handle<TTaskGroupPromise<void>>;
+
+ THandle get_return_object() noexcept {
+ return THandle::from_promise(*this);
+ }
+
+ void return_void() {
+ this->Result_->SetValue();
+ }
+ };
+
+ template<class T>
+ class TTaskGroupTask final {
+ public:
+ using THandle = std::coroutine_handle<TTaskGroupPromise<T>>;
+ using promise_type = TTaskGroupPromise<T>;
+ using value_type = T;
+
+ public:
+ TTaskGroupTask(THandle handle)
+ : Handle_(handle)
+ {}
+
+ void Start(const TIntrusivePtr<TTaskGroupSink<T>>& sink) {
+ Handle_.promise().SetSink(sink);
+ Handle_.resume();
+ }
+
+ private:
+ THandle Handle_;
+ };
+
+ template<class T, class TAwaitable>
+ TTaskGroupTask<T> CreateTaskGroupTask(TAwaitable awaitable) {
+ co_return co_await std::move(awaitable);
+ }
+
+ } // namespace NDetail
+
+ /**
+ * A task group allows starting multiple subtasks of the same result type
+ * and awaiting them in a structured way. When task group is destroyed
+ * all subtasks are detached in a thread-safe way.
+ */
+ template<class T>
+ class TTaskGroup {
+ public:
+ TTaskGroup() = default;
+
+ ~TTaskGroup() {
+ Sink_->Detach();
+ }
+
+ /**
+ * Add task to the group that will await the result of awaitable
+ */
+ template<class TAwaitable>
+ void AddTask(TAwaitable&& awaitable) {
+ auto task = NDetail::CreateTaskGroupTask<T>(std::forward<TAwaitable>(awaitable));
+ task.Start(Sink_);
+ ++TaskCount_;
+ }
+
+ /**
+ * Returns the number of tasks left unawaited
+ */
+ size_t TaskCount() const {
+ return TaskCount_;
+ }
+
+ class TAwaiter {
+ public:
+ explicit TAwaiter(TTaskGroup& taskGroup) noexcept
+ : TaskGroup_(taskGroup)
+ {}
+
+ bool await_ready() const noexcept {
+ Y_VERIFY(TaskGroup_.TaskCount_ > 0, "Not enough tasks to await");
+ --TaskGroup_.TaskCount_;
+ return TaskGroup_.Sink_->Ready();
+ }
+
+ std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept {
+ return TaskGroup_.Sink_->Suspend(h);
+ }
+
+ T await_resume() {
+ return TaskGroup_.Sink_->Resume()->Extract();
+ }
+
+ private:
+ TTaskGroup& TaskGroup_;
+ };
+
+ /**
+ * Await result of the next task in the task group
+ */
+ TAwaiter operator co_await() noexcept {
+ return TAwaiter(*this);
+ }
+
+ private:
+ TIntrusivePtr<NDetail::TTaskGroupSink<T>> Sink_ = MakeIntrusive<NDetail::TTaskGroupSink<T>>();
+ size_t TaskCount_ = 0;
+ };
+
+} // namespace NActors
diff --git a/library/cpp/actors/cppcoro/task_ut.cpp b/library/cpp/actors/cppcoro/task_ut.cpp
new file mode 100644
index 0000000000..52c1b0e591
--- /dev/null
+++ b/library/cpp/actors/cppcoro/task_ut.cpp
@@ -0,0 +1,261 @@
+#include "task.h"
+#include "task_group.h"
+#include "await_callback.h"
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NActors;
+
+Y_UNIT_TEST_SUITE(Task) {
+
+ TTask<void> SimpleReturnVoid() {
+ co_return;
+ }
+
+ TTask<int> SimpleReturn42() {
+ co_return 42;
+ }
+
+ Y_UNIT_TEST(SimpleVoidCoroutine) {
+ bool finished = false;
+ AwaitThenCallback(SimpleReturnVoid(), [&]() {
+ finished = true;
+ });
+ UNIT_ASSERT(finished);
+ }
+
+ Y_UNIT_TEST(SimpleIntCoroutine) {
+ std::optional<int> result;
+ AwaitThenCallback(SimpleReturn42(), [&](int value) {
+ result = value;
+ });
+ UNIT_ASSERT(result);
+ UNIT_ASSERT_VALUES_EQUAL(*result, 42);
+ }
+
+ Y_UNIT_TEST(DoneAndWhenDone) {
+ auto task = SimpleReturn42();
+ UNIT_ASSERT(task);
+ UNIT_ASSERT(!task.Done());
+
+ bool whenDoneFinished = false;
+ AwaitThenCallback(task.WhenDone(), [&]() {
+ whenDoneFinished = true;
+ });
+ UNIT_ASSERT(whenDoneFinished);
+ UNIT_ASSERT(task.Done());
+
+ // WhenDone can be used even when task is already done
+ whenDoneFinished = false;
+ AwaitThenCallback(task.WhenDone(), [&]() {
+ whenDoneFinished = true;
+ });
+ UNIT_ASSERT(whenDoneFinished);
+
+ std::optional<int> result;
+ AwaitThenCallback(std::move(task), [&](int value) {
+ result = value;
+ });
+ UNIT_ASSERT(result);
+ UNIT_ASSERT_VALUES_EQUAL(*result, 42);
+ UNIT_ASSERT(!task);
+ }
+
+ Y_UNIT_TEST(ManualStart) {
+ auto task = SimpleReturn42();
+ UNIT_ASSERT(task && !task.Done());
+ task.Start();
+ UNIT_ASSERT(task.Done());
+ UNIT_ASSERT_VALUES_EQUAL(task.ExtractResult(), 42);
+ }
+
+ template<class TCallback>
+ TTask<int> CallTwice(TCallback&& callback) {
+ int a = co_await callback();
+ int b = co_await callback();
+ co_return a + b;
+ }
+
+ Y_UNIT_TEST(NestedAwait) {
+ auto task = CallTwice([]{
+ return SimpleReturn42();
+ });
+ std::optional<int> result;
+ AwaitThenCallback(std::move(task), [&](int value) {
+ result = value;
+ });
+ UNIT_ASSERT(result);
+ UNIT_ASSERT_VALUES_EQUAL(*result, 84);
+ }
+
+ struct TPauseState {
+ std::coroutine_handle<> Next;
+ int NextResult;
+
+ auto Wait() {
+ struct TAwaiter {
+ TPauseState* State;
+
+ bool await_ready() const noexcept { return false; }
+ int await_resume() const noexcept {
+ return State->NextResult;
+ }
+ void await_suspend(std::coroutine_handle<> c) {
+ State->Next = c;
+ }
+ };
+ return TAwaiter{ this };
+ }
+
+ explicit operator bool() const {
+ return bool(Next);
+ }
+
+ void Resume(int result) {
+ Y_VERIFY(Next && !Next.done());
+ NextResult = result;
+ std::exchange(Next, {}).resume();
+ }
+ };
+
+ Y_UNIT_TEST(PausedAwait) {
+ TPauseState state;
+ auto callback = [&]{
+ return state.Wait();
+ };
+ auto task = CallTwice(callback);
+ std::optional<int> result;
+ AwaitThenCallback(std::move(task), [&](int value) {
+ result = value;
+ });
+ UNIT_ASSERT(!result);
+ UNIT_ASSERT(state);
+ state.Resume(11);
+ UNIT_ASSERT(!result);
+ UNIT_ASSERT(state);
+ state.Resume(22);
+ UNIT_ASSERT(result);
+ UNIT_ASSERT_VALUES_EQUAL(*result, 33);
+ }
+
+ Y_UNIT_TEST(ManuallyStartThenWhenDone) {
+ TPauseState state;
+ auto next = [&]{
+ return state.Wait();
+ };
+
+ auto task = [](auto next) -> TTask<int> {
+ int value = co_await next();
+ co_return value * 2;
+ }(next);
+
+ UNIT_ASSERT(task && !task.Done());
+ task.Start();
+ UNIT_ASSERT(!task.Done() && state);
+ bool finished = false;
+ AwaitThenCallback(task.WhenDone(), [&]{
+ finished = true;
+ });
+ UNIT_ASSERT(!finished && !task.Done());
+ state.Resume(11);
+ UNIT_ASSERT(finished && task.Done());
+ UNIT_ASSERT_VALUES_EQUAL(task.ExtractResult(), 22);
+ }
+
+ Y_UNIT_TEST(ManuallyStartThenAwait) {
+ TPauseState state;
+ auto next = [&]{
+ return state.Wait();
+ };
+
+ auto task = [](auto next) -> TTask<int> {
+ int value = co_await next();
+ co_return value * 2;
+ }(next);
+
+ UNIT_ASSERT(task && !task.Done());
+ task.Start();
+ UNIT_ASSERT(!task.Done() && state);
+
+ auto awaitTask = [](auto task) -> TTask<int> {
+ int value = co_await std::move(task);
+ co_return value * 3;
+ }(std::move(task));
+ UNIT_ASSERT(awaitTask && !awaitTask.Done());
+ std::optional<int> result;
+ AwaitThenCallback(std::move(awaitTask), [&](int value) {
+ result = value;
+ });
+ UNIT_ASSERT(!result);
+ state.Resume(11);
+ UNIT_ASSERT(result);
+ UNIT_ASSERT_VALUES_EQUAL(*result, 66);
+ }
+
+ Y_UNIT_TEST(GroupWithTwoSubTasks) {
+ TPauseState state1;
+ TPauseState state2;
+
+ std::vector<int> results;
+ auto task = [](auto& state1, auto& state2, auto& results) -> TTask<int> {
+ TTaskGroup<int> group;
+ group.AddTask(state1.Wait());
+ group.AddTask(state2.Wait());
+ int a = co_await group;
+ results.push_back(a);
+ int b = co_await group;
+ results.push_back(b);
+ co_return a + b;
+ }(state1, state2, results);
+
+ std::optional<int> result;
+ AwaitThenCallback(std::move(task), [&](int value) {
+ result = value;
+ });
+
+ // We must be waiting for both states
+ UNIT_ASSERT(state1);
+ UNIT_ASSERT(state2);
+ state2.Resume(22);
+ UNIT_ASSERT_VALUES_EQUAL(results.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(results.at(0), 22);
+ UNIT_ASSERT(!result);
+ state1.Resume(11);
+ UNIT_ASSERT_VALUES_EQUAL(results.size(), 2u);
+ UNIT_ASSERT_VALUES_EQUAL(results.at(1), 11);
+ UNIT_ASSERT(result);
+ UNIT_ASSERT_VALUES_EQUAL(*result, 33);
+ }
+
+ Y_UNIT_TEST(GroupWithTwoSubTasksDetached) {
+ TPauseState state1;
+ TPauseState state2;
+
+ std::vector<int> results;
+ auto task = [](auto& state1, auto& state2, auto& results) -> TTask<int> {
+ TTaskGroup<int> group;
+ group.AddTask(state1.Wait());
+ group.AddTask(state2.Wait());
+ int a = co_await group;
+ results.push_back(a);
+ co_return a;
+ }(state1, state2, results);
+
+ std::optional<int> result;
+ AwaitThenCallback(std::move(task), [&](int value) {
+ result = value;
+ });
+
+ // We must be waiting for both states
+ UNIT_ASSERT(state1);
+ UNIT_ASSERT(state2);
+ state2.Resume(22);
+ UNIT_ASSERT_VALUES_EQUAL(results.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(results.at(0), 22);
+ UNIT_ASSERT(result);
+ UNIT_ASSERT_VALUES_EQUAL(*result, 22);
+
+ // We must resume the first state (otherwise memory leaks), but result is ignored
+ state1.Resume(11);
+ }
+
+} // Y_UNIT_TEST_SUITE(Task)
diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..7cebff01de
--- /dev/null
+++ b/library/cpp/actors/cppcoro/ut/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,69 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(library-cpp-actors-cppcoro-ut)
+target_include_directories(library-cpp-actors-cppcoro-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro
+)
+target_link_libraries(library-cpp-actors-cppcoro-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ cpp-actors-cppcoro
+ cpp-actors-testlib
+)
+target_link_options(library-cpp-actors-cppcoro-ut PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(library-cpp-actors-cppcoro-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_ut.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor_ut.cpp
+)
+set_property(
+ TARGET
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ library-cpp-actors-cppcoro-ut
+ TEST_TARGET
+ library-cpp-actors-cppcoro-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ LABELS
+ SMALL
+)
+set_yunittest_property(
+ TEST
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+target_allocator(library-cpp-actors-cppcoro-ut
+ system_allocator
+)
+vcs_info(library-cpp-actors-cppcoro-ut)
diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.linux-aarch64.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..4a11af3456
--- /dev/null
+++ b/library/cpp/actors/cppcoro/ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,72 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(library-cpp-actors-cppcoro-ut)
+target_include_directories(library-cpp-actors-cppcoro-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro
+)
+target_link_libraries(library-cpp-actors-cppcoro-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ cpp-actors-cppcoro
+ cpp-actors-testlib
+)
+target_link_options(library-cpp-actors-cppcoro-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(library-cpp-actors-cppcoro-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_ut.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor_ut.cpp
+)
+set_property(
+ TARGET
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ library-cpp-actors-cppcoro-ut
+ TEST_TARGET
+ library-cpp-actors-cppcoro-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ LABELS
+ SMALL
+)
+set_yunittest_property(
+ TEST
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+target_allocator(library-cpp-actors-cppcoro-ut
+ cpp-malloc-jemalloc
+)
+vcs_info(library-cpp-actors-cppcoro-ut)
diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.linux-x86_64.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..2e2412f989
--- /dev/null
+++ b/library/cpp/actors/cppcoro/ut/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,74 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(library-cpp-actors-cppcoro-ut)
+target_include_directories(library-cpp-actors-cppcoro-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro
+)
+target_link_libraries(library-cpp-actors-cppcoro-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ cpp-actors-cppcoro
+ cpp-actors-testlib
+)
+target_link_options(library-cpp-actors-cppcoro-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(library-cpp-actors-cppcoro-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_ut.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor_ut.cpp
+)
+set_property(
+ TARGET
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ library-cpp-actors-cppcoro-ut
+ TEST_TARGET
+ library-cpp-actors-cppcoro-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ LABELS
+ SMALL
+)
+set_yunittest_property(
+ TEST
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+target_allocator(library-cpp-actors-cppcoro-ut
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(library-cpp-actors-cppcoro-ut)
diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/library/cpp/actors/cppcoro/ut/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.windows-x86_64.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..e3b8b019c8
--- /dev/null
+++ b/library/cpp/actors/cppcoro/ut/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,62 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(library-cpp-actors-cppcoro-ut)
+target_include_directories(library-cpp-actors-cppcoro-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro
+)
+target_link_libraries(library-cpp-actors-cppcoro-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ cpp-actors-cppcoro
+ cpp-actors-testlib
+)
+target_sources(library-cpp-actors-cppcoro-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_ut.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor_ut.cpp
+)
+set_property(
+ TARGET
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ library-cpp-actors-cppcoro-ut
+ TEST_TARGET
+ library-cpp-actors-cppcoro-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ LABELS
+ SMALL
+)
+set_yunittest_property(
+ TEST
+ library-cpp-actors-cppcoro-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+target_allocator(library-cpp-actors-cppcoro-ut
+ system_allocator
+)
+vcs_info(library-cpp-actors-cppcoro-ut)
diff --git a/library/cpp/actors/cppcoro/ut/ya.make b/library/cpp/actors/cppcoro/ut/ya.make
new file mode 100644
index 0000000000..24a9c73613
--- /dev/null
+++ b/library/cpp/actors/cppcoro/ut/ya.make
@@ -0,0 +1,12 @@
+UNITTEST_FOR(library/cpp/actors/cppcoro)
+
+PEERDIR(
+ library/cpp/actors/testlib
+)
+
+SRCS(
+ task_ut.cpp
+ task_actor_ut.cpp
+)
+
+END()
diff --git a/library/cpp/actors/cppcoro/ya.make b/library/cpp/actors/cppcoro/ya.make
new file mode 100644
index 0000000000..9890eccbee
--- /dev/null
+++ b/library/cpp/actors/cppcoro/ya.make
@@ -0,0 +1,22 @@
+LIBRARY()
+
+PEERDIR(
+ library/cpp/actors/core
+)
+
+SRCS(
+ await_callback.cpp
+ await_callback.h
+ task_actor.cpp
+ task_actor.h
+ task_group.cpp
+ task_group.h
+ task.cpp
+ task.h
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/library/cpp/actors/ya.make b/library/cpp/actors/ya.make
index 2612c414cd..00d7801798 100644
--- a/library/cpp/actors/ya.make
+++ b/library/cpp/actors/ya.make
@@ -1,6 +1,7 @@
RECURSE(
log_backend
core
+ cppcoro
dnsresolver
examples
interconnect