diff options
author | snaury <snaury@ydb.tech> | 2023-08-07 13:35:52 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-08-07 14:26:11 +0300 |
commit | 884279adc343b71eaadb4d75d088c1338aeda378 (patch) | |
tree | 483c1ecc0788d8d597dd60cee1e1c2adce5b6345 /library/cpp | |
parent | d79a2a297724679b1adf2ab348defdbdae3e73be (diff) | |
download | ydb-884279adc343b71eaadb4d75d088c1338aeda378.tar.gz |
Support running C++ coroutines as actors KIKIMR-18962
Diffstat (limited to 'library/cpp')
24 files changed, 1783 insertions, 0 deletions
diff --git a/library/cpp/actors/CMakeLists.txt b/library/cpp/actors/CMakeLists.txt index 45fe46a94d..becd73cd24 100644 --- a/library/cpp/actors/CMakeLists.txt +++ b/library/cpp/actors/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory(actor_type) add_subdirectory(core) +add_subdirectory(cppcoro) add_subdirectory(dnscachelib) add_subdirectory(dnsresolver) add_subdirectory(examples) diff --git a/library/cpp/actors/cppcoro/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/cppcoro/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..ecac0aa784 --- /dev/null +++ b/library/cpp/actors/cppcoro/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(cpp-actors-cppcoro) +target_link_libraries(cpp-actors-cppcoro PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core +) +target_sources(cpp-actors-cppcoro PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/await_callback.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_group.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task.cpp +) diff --git a/library/cpp/actors/cppcoro/CMakeLists.linux-aarch64.txt b/library/cpp/actors/cppcoro/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..ff385af6fa --- /dev/null +++ b/library/cpp/actors/cppcoro/CMakeLists.linux-aarch64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(cpp-actors-cppcoro) +target_link_libraries(cpp-actors-cppcoro PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core +) +target_sources(cpp-actors-cppcoro PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/await_callback.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_group.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task.cpp +) diff --git a/library/cpp/actors/cppcoro/CMakeLists.linux-x86_64.txt b/library/cpp/actors/cppcoro/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..ff385af6fa --- /dev/null +++ b/library/cpp/actors/cppcoro/CMakeLists.linux-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(cpp-actors-cppcoro) +target_link_libraries(cpp-actors-cppcoro PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core +) +target_sources(cpp-actors-cppcoro PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/await_callback.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_group.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task.cpp +) diff --git a/library/cpp/actors/cppcoro/CMakeLists.txt b/library/cpp/actors/cppcoro/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/library/cpp/actors/cppcoro/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/actors/cppcoro/CMakeLists.windows-x86_64.txt b/library/cpp/actors/cppcoro/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..ecac0aa784 --- /dev/null +++ b/library/cpp/actors/cppcoro/CMakeLists.windows-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(cpp-actors-cppcoro) +target_link_libraries(cpp-actors-cppcoro PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core +) +target_sources(cpp-actors-cppcoro PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/await_callback.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_group.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task.cpp +) diff --git a/library/cpp/actors/cppcoro/await_callback.cpp b/library/cpp/actors/cppcoro/await_callback.cpp new file mode 100644 index 0000000000..5132131a8e --- /dev/null +++ b/library/cpp/actors/cppcoro/await_callback.cpp @@ -0,0 +1 @@ +#include "await_callback.h" diff --git a/library/cpp/actors/cppcoro/await_callback.h b/library/cpp/actors/cppcoro/await_callback.h new file mode 100644 index 0000000000..9f23d5e0db --- /dev/null +++ b/library/cpp/actors/cppcoro/await_callback.h @@ -0,0 +1,98 @@ +#include <coroutine> +#include <exception> +#include <concepts> + +namespace NActors { + + namespace NDetail { + template<class TAwaitable> + decltype(auto) GetAwaiter(TAwaitable&& awaitable) { + if constexpr (requires { ((TAwaitable&&) awaitable).operator co_await(); }) { + return ((TAwaitable&&) awaitable).operator co_await(); + } else if constexpr (requires { operator co_await((TAwaitable&&) awaitable); }) { + return operator co_await((TAwaitable&&) awaitable); + } else { + return ((TAwaitable&&) awaitable); + } + } + + template<class TAwaitable> + using TAwaitResult = decltype(GetAwaiter(std::declval<TAwaitable>()).await_resume()); + + template<class TCallback, class TResult> + class TCallbackResult { + public: + TCallbackResult(TCallback& callback) + : Callback_(callback) + {} + + template<class TRealResult> + void return_value(TRealResult&& result) noexcept { + Callback_(std::forward<TRealResult>(result)); + } + + private: + TCallback& Callback_; + }; + + template<class TCallback> + class TCallbackResult<TCallback, void> { + public: + TCallbackResult(TCallback& callback) + : Callback_(callback) + {} + + void return_void() noexcept { + Callback_(); + } + + private: + TCallback& Callback_; + }; + + template<class TAwaitable, class TCallback> + class TAwaitThenCallbackPromise + : public TCallbackResult<TCallback, TAwaitResult<TAwaitable>> + { + public: + using THandle = std::coroutine_handle<TAwaitThenCallbackPromise<TAwaitable, TCallback>>; + + TAwaitThenCallbackPromise(TAwaitable&, TCallback& callback) + : TCallbackResult<TCallback, TAwaitResult<TAwaitable>>(callback) + {} + + THandle get_return_object() noexcept { + return THandle::from_promise(*this); + } + + static auto initial_suspend() noexcept { return std::suspend_never{}; } + static auto final_suspend() noexcept { return std::suspend_never{}; } + + void unhandled_exception() noexcept { + std::terminate(); + } + }; + + template<class TAwaitable, class TCallback> + class TAwaitThenCallback { + public: + using promise_type = TAwaitThenCallbackPromise<TAwaitable, TCallback>; + + using THandle = typename promise_type::THandle; + + TAwaitThenCallback(THandle) noexcept {} + }; + } + + /** + * Awaits the awaitable and calls callback with the result. + * + * Note: program terminates if awaitable or callback throw an exception. + */ + template<class TAwaitable, class TCallback> + NDetail::TAwaitThenCallback<TAwaitable, TCallback> AwaitThenCallback(TAwaitable awaitable, TCallback) { + // Note: underlying promise takes callback argument address and calls it when we return + co_return co_await std::move(awaitable); + } + +} // namespace NActors diff --git a/library/cpp/actors/cppcoro/task.cpp b/library/cpp/actors/cppcoro/task.cpp new file mode 100644 index 0000000000..204c27c573 --- /dev/null +++ b/library/cpp/actors/cppcoro/task.cpp @@ -0,0 +1 @@ +#include "task.h" diff --git a/library/cpp/actors/cppcoro/task.h b/library/cpp/actors/cppcoro/task.h new file mode 100644 index 0000000000..dade638ddb --- /dev/null +++ b/library/cpp/actors/cppcoro/task.h @@ -0,0 +1,312 @@ +#pragma once +#include <util/system/compiler.h> +#include <util/system/yassert.h> +#include <coroutine> +#include <exception> +#include <variant> + +namespace NActors { + + template<class T> + class TTask; + + namespace NDetail { + + class TTaskPromiseBase { + public: + static auto initial_suspend() noexcept { + return std::suspend_always{}; + } + + struct TFinalSuspend { + static bool await_ready() noexcept { return false; } + static void await_resume() noexcept { std::terminate(); } + + template<class TPromise> + static std::coroutine_handle<> await_suspend(std::coroutine_handle<TPromise> h) noexcept { + TTaskPromiseBase& promise = h.promise(); + return std::exchange(promise.Continuation_, {}); + } + }; + + static auto final_suspend() noexcept { + return TFinalSuspend{}; + } + + bool HasStarted() const noexcept { + return Flags_ & 1; + } + + void SetStarted() noexcept { + Flags_ |= 1; + } + + bool HasContinuation() const noexcept { + return Flags_ & 2; + } + + void SetContinuation(std::coroutine_handle<> continuation) noexcept { + Y_VERIFY_DEBUG(continuation, "Attempt to set an invalid continuation"); + Y_VERIFY_DEBUG(!HasContinuation(), "Attempt to set multiple continuations"); + Continuation_ = continuation; + Flags_ |= 2; + } + + private: + // Default is used when task is resumed without a continuation + std::coroutine_handle<> Continuation_ = std::noop_coroutine(); + unsigned char Flags_ = 0; + }; + + template<class T> + class TTaskPromise; + + template<class T> + using TTaskHandle = std::coroutine_handle<TTaskPromise<T>>; + + template<class T> + class TTaskPromise final : public TTaskPromiseBase { + public: + TTask<T> get_return_object() noexcept; + + std::coroutine_handle<> Start() noexcept { + if (Y_LIKELY(!HasStarted())) { + SetStarted(); + return TTaskHandle<T>::from_promise(*this); + } else { + // After coroutine starts is cannot be safely resumed, because + // it is waiting for something and must be resumed via its + // continuation. + return std::noop_coroutine(); + } + } + + void unhandled_exception() noexcept { + Result_.template emplace<std::exception_ptr>(std::current_exception()); + } + + template<class TResult> + void return_value(TResult&& result) { + Result_.template emplace<T>(std::forward<TResult>(result)); + } + + T ExtractResult() { + switch (Result_.index()) { + case 0: { + std::rethrow_exception(std::get<0>(std::move(Result_))); + } + case 1: { + return std::get<1>(std::move(Result_)); + } + } + std::terminate(); + } + + private: + std::variant<std::exception_ptr, T> Result_; + }; + + template<> + class TTaskPromise<void> final : public TTaskPromiseBase { + public: + TTask<void> get_return_object() noexcept; + + std::coroutine_handle<> Start() noexcept { + if (Y_LIKELY(!HasStarted())) { + SetStarted(); + return TTaskHandle<void>::from_promise(*this); + } else { + // After coroutine starts is cannot be safely resumed, because + // it is waiting for something and must be resumed via its + // continuation. + return std::noop_coroutine(); + } + } + + void unhandled_exception() noexcept { + Exception_ = std::current_exception(); + } + + void return_void() noexcept { + Exception_ = nullptr; + } + + void ExtractResult() { + if (Exception_) { + std::rethrow_exception(std::move(Exception_)); + } + } + + private: + std::exception_ptr Exception_; + }; + + } // namespace NDetail + + /** + * A bare-bones lazy task implementation + * + * This task is not thread safe and assumes external synchronization, e.g. + * races between destructor and await resume are not allowed and not safe. + */ + template<class T> + class TTask final { + public: + using promise_type = NDetail::TTaskPromise<T>; + using value_type = T; + + public: + TTask() noexcept = default; + + explicit TTask(NDetail::TTaskHandle<T> handle) noexcept + : Handle_(handle) + {} + + TTask(TTask&& rhs) noexcept + : Handle_(std::exchange(rhs.Handle_, {})) + {} + + ~TTask() { + if (Handle_) { + Handle_.destroy(); + } + } + + TTask& operator=(TTask&& rhs) noexcept { + if (Y_LIKELY(this != &rhs)) { + auto handle = std::exchange(Handle_, {}); + Handle_ = std::exchange(rhs.Handle_, {}); + if (handle) { + handle.destroy(); + } + } + return *this; + } + + /** + * Returns true for a valid task object + */ + explicit operator bool() const noexcept { + return bool(Handle_); + } + + /** + * Returns true if the task finished executing (produced a result) + */ + bool Done() const { + Y_VERIFY_DEBUG(Handle_); + return Handle_.done(); + } + + /** + * Manually start the task, only possible once + */ + void Start() const { + Y_VERIFY_DEBUG(!Done()); + Y_VERIFY_DEBUG(!Promise().HasStarted()); + Y_VERIFY_DEBUG(!Promise().HasContinuation()); + Promise().Start().resume(); + } + + /** + * Implementation of awaiter for WhenDone + */ + class TWhenDoneAwaiter { + public: + TWhenDoneAwaiter(NDetail::TTaskHandle<T> handle) noexcept + : Handle_(handle) + {} + + bool await_ready() const noexcept { + return Handle_.done(); + } + + std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation) const noexcept { + Handle_.promise().SetContinuation(continuation); + return Handle_.promise().Start(); + } + + void await_resume() const noexcept { + // nothing + } + + private: + NDetail::TTaskHandle<T> Handle_; + }; + + /** + * Returns an awaitable that completes when task finishes executing + * + * Note the result of the task is not consumed. + */ + auto WhenDone() const noexcept { + return TWhenDoneAwaiter(Handle_); + } + + /** + * Extracts result of the task + */ + T ExtractResult() { + Y_VERIFY_DEBUG(Done()); + return Promise().ExtractResult(); + } + + /** + * Implementation of awaiter for co_await + */ + class TAwaiter { + public: + TAwaiter(TTask&& task) noexcept + : Task_(std::move(task)) + {} + + bool await_ready() const noexcept { + return Task_.Done(); + } + + std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation) const noexcept { + Task_.Promise().SetContinuation(continuation); + return Task_.Promise().Start(); + } + + T await_resume() { + // We destroy task state before we return + TTask task(std::move(Task_)); + return task.ExtractResult(); + } + + private: + TTask Task_; + }; + + /** + * Returns the task result when it finishes + */ + auto operator co_await() && noexcept { + return TAwaiter(std::move(*this)); + } + + private: + NDetail::TTaskPromise<T>& Promise() const noexcept { + Y_VERIFY_DEBUG(Handle_); + return Handle_.promise(); + } + + private: + NDetail::TTaskHandle<T> Handle_; + }; + + namespace NDetail { + + template<class T> + inline TTask<T> TTaskPromise<T>::get_return_object() noexcept { + return TTask<T>(TTaskHandle<T>::from_promise(*this)); + } + + inline TTask<void> TTaskPromise<void>::get_return_object() noexcept { + return TTask<void>(TTaskHandle<void>::from_promise(*this)); + } + + } // namespace NDetail + +} // namespace NActors diff --git a/library/cpp/actors/cppcoro/task_actor.cpp b/library/cpp/actors/cppcoro/task_actor.cpp new file mode 100644 index 0000000000..756d7e2f32 --- /dev/null +++ b/library/cpp/actors/cppcoro/task_actor.cpp @@ -0,0 +1,157 @@ +#include "task_actor.h" +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/hfunc.h> + +namespace NActors { + + class TTaskActorImpl; + + static Y_POD_THREAD(TTaskActorImpl*) TlsCurrentTaskActor{nullptr}; + + struct TCurrentTaskActorGuard { + TCurrentTaskActorGuard(TTaskActorImpl* current) noexcept { + Y_VERIFY(TlsCurrentTaskActor == nullptr); + TlsCurrentTaskActor = current; + } + + ~TCurrentTaskActorGuard() noexcept { + TlsCurrentTaskActor = nullptr; + } + }; + + enum : ui32 { + EvResumeTask = EventSpaceBegin(TEvents::ES_SYSTEM) + 256, + }; + + struct TEvResumeTask : public TEventLocal<TEvResumeTask, EvResumeTask> { + std::coroutine_handle<> Handle; + + explicit TEvResumeTask(std::coroutine_handle<> handle) noexcept + : Handle(handle) + {} + + ~TEvResumeTask() noexcept { + // TODO: actor may be dead already + } + }; + + class TTaskActorImpl : public TActor<TTaskActorImpl> { + friend class TTaskActor; + friend struct TAfterAwaiter; + friend struct TBindAwaiter; + + public: + TTaskActorImpl(TTask<void>&& task) + : TActor(&TThis::StateBoot) + , Task(std::move(task)) + { + Y_VERIFY(Task); + } + + void Registered(TActorSystem* sys, const TActorId& parent) override { + ParentId = parent; + sys->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, SelfId(), SelfId(), {}, 0)); + } + + STATEFN(StateBoot) { + Y_VERIFY(ev->GetTypeRewrite() == TEvents::TSystem::Bootstrap, "Expected bootstrap event"); + TCurrentTaskActorGuard guard(this); + Become(&TThis::StateWork); + Task.Start(); + Check(); + } + + STATEFN(StateWork) { + TCurrentTaskActorGuard guard(this); + switch (ev->GetTypeRewrite()) { + hFunc(TEvResumeTask, Handle); + default: + Y_VERIFY(EventWaiter); + Event.reset(ev.Release()); + std::exchange(EventWaiter, {}).resume(); + } + Check(); + } + + void Handle(TEvResumeTask::TPtr& ev) { + auto* msg = ev->Get(); + std::exchange(msg->Handle, {}).resume(); + } + + bool Check() { + if (Task.Done()) { + Y_VERIFY(!EventWaiter, "Task terminated while waiting for the next event"); + Task.ExtractResult(); + PassAway(); + return false; + } + + Y_VERIFY(EventWaiter, "Task suspended without waiting for the next event"); + Event.reset(); + return true; + } + + void WaitForEvent(std::coroutine_handle<> h) noexcept { + Y_VERIFY(!EventWaiter, "Task cannot have multiple waiters for the next event"); + EventWaiter = h; + } + + std::unique_ptr<IEventHandle> FinishWaitForEvent() noexcept { + Y_VERIFY(Event, "Task does not have current event"); + return std::move(Event); + } + + private: + TTask<void> Task; + TActorId ParentId; + std::coroutine_handle<> EventWaiter; + std::unique_ptr<IEventHandle> Event; + }; + + void TTaskActorNextEvent::await_suspend(std::coroutine_handle<> h) noexcept { + Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context"); + TlsCurrentTaskActor->WaitForEvent(h); + } + + std::unique_ptr<IEventHandle> TTaskActorNextEvent::await_resume() noexcept { + Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context"); + return TlsCurrentTaskActor->FinishWaitForEvent(); + } + + IActor* TTaskActor::Create(TTask<void>&& task) { + return new TTaskActorImpl(std::move(task)); + } + + TActorIdentity TTaskActor::SelfId() noexcept { + Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context"); + return TlsCurrentTaskActor->SelfId(); + } + + TActorId TTaskActor::ParentId() noexcept { + Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context"); + return TlsCurrentTaskActor->ParentId; + } + + void TAfterAwaiter::await_suspend(std::coroutine_handle<> h) noexcept { + Y_VERIFY(TlsCurrentTaskActor, "Not in a task actor context"); + TlsCurrentTaskActor->Schedule(Duration, new TEvResumeTask(h)); + } + + void TAfterAwaiter::await_resume() { + } + + bool TBindAwaiter::await_ready() noexcept { + if (TlsCurrentTaskActor && TlsCurrentTaskActor->SelfId() == ActorId) { + return true; + } + return false; + } + + void TBindAwaiter::await_suspend(std::coroutine_handle<> h) noexcept { + Sys->Send(new IEventHandle(ActorId, ActorId, new TEvResumeTask(h))); + } + + void TBindAwaiter::await_resume() { + } + +} // namespace NActors diff --git a/library/cpp/actors/cppcoro/task_actor.h b/library/cpp/actors/cppcoro/task_actor.h new file mode 100644 index 0000000000..e4a1c9df3e --- /dev/null +++ b/library/cpp/actors/cppcoro/task_actor.h @@ -0,0 +1,89 @@ +#include <library/cpp/actors/core/actor.h> +#include "task.h" + +namespace NActors { + + struct TTaskActorNextEvent { + static constexpr bool await_ready() noexcept { return false; } + + static void await_suspend(std::coroutine_handle<> h) noexcept; + + static std::unique_ptr<IEventHandle> await_resume() noexcept; + }; + + struct TAfterAwaiter { + TDuration Duration; + + static constexpr bool await_ready() noexcept { return false; } + + void await_suspend(std::coroutine_handle<> h) noexcept; + + void await_resume(); + }; + + struct TBindAwaiter { + TActorSystem* Sys; + TActorId ActorId; + + bool await_ready() noexcept; + + void await_suspend(std::coroutine_handle<> h) noexcept; + + void await_resume(); + }; + + class TTaskActor { + public: + /** + * Creates a new actor that will run the specified task. + */ + static IActor* Create(TTask<void>&& task); + + /** + * Returns the next actor event when awaited + */ + static constexpr TTaskActorNextEvent NextEvent{}; + + /** + * Returns the identity of current task actor. + */ + static TActorIdentity SelfId() noexcept; + + /** + * Returns an actor id of the actor that registered current task actor. + */ + static TActorId ParentId() noexcept; + + /** + * Returns awaiter that completes after the specified timeout. + */ + static TAfterAwaiter After(TDuration duration) noexcept { + return TAfterAwaiter{ duration }; + } + + /** + * Returns awaiter that completes on actor thread when awaited. + */ + static TBindAwaiter Bind() noexcept { + TActorId actorId = SelfId(); + TActorSystem* sys = TActivationContext::ActorSystem(); + return TBindAwaiter{ sys, actorId }; + } + + /** + * Returns a task that runs the specified task, but binds the result + * back to the actor thread. Useful when the specified task may be + * working with non-actor coroutines. + */ + template<class T> + static TTask<T> Bind(TTask<T>&& task) { + // TODO: may run on non-actor thread, protect from unwind + return [](TTask<T> task, TBindAwaiter bindTask) -> TTask<T> { + co_await task.WhenDone(); + co_await bindTask; + co_return task.ExtractResult(); + }(std::move(task), Bind()); + } + }; + +} // namespace NActors diff --git a/library/cpp/actors/cppcoro/task_actor_ut.cpp b/library/cpp/actors/cppcoro/task_actor_ut.cpp new file mode 100644 index 0000000000..8037dd418d --- /dev/null +++ b/library/cpp/actors/cppcoro/task_actor_ut.cpp @@ -0,0 +1,93 @@ +#include "task_actor.h" +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/scheduler_basic.h> + +#include <library/cpp/testing/unittest/registar.h> + +Y_UNIT_TEST_SUITE(TaskActor) { + + using namespace NActors; + + enum : ui32 { + EvBegin = EventSpaceBegin(TEvents::ES_USERSPACE), + EvRequest, + EvResponse, + EvStop, + }; + + struct TEvRequest: public TEventLocal<TEvRequest, EvRequest> { + }; + + struct TEvResponse: public TEventLocal<TEvResponse, EvResponse> { + }; + + struct TEvStop: public TEventLocal<TEvStop, EvStop> { + }; + + TTask<void> SimpleResponder() { + for (;;) { + auto ev = co_await TTaskActor::NextEvent; + Y_VERIFY(ev->GetTypeRewrite() == TEvRequest::EventType); + auto* msg = ev->Get<TEvRequest>(); + Y_UNUSED(msg); + TTaskActor::SelfId().Send(ev->Sender, new TEvResponse); + } + } + + TTask<void> SimpleRequester(TActorId responder, TManualEvent& doneEvent, std::atomic<int>& itemsProcessed) { + // Note: it's ok to use lambda capture because captures outlive this coroutine + auto singleRequest = [&]() -> TTask<bool> { + TTaskActor::SelfId().Send(responder, new TEvRequest); + auto ev = co_await TTaskActor::NextEvent; + switch (ev->GetTypeRewrite()) { + case TEvResponse::EventType: + co_return true; + case TEvStop::EventType: + co_return false; + default: + Y_FAIL("Unexpected event"); + } + }; + while (co_await singleRequest()) { + ++itemsProcessed; + } + doneEvent.Signal(); + } + + void Check(TDuration duration, std::unique_ptr<IEventBase> stopEvent) { + THolder<TActorSystemSetup> setup = MakeHolder<TActorSystemSetup>(); + setup->NodeId = 0; + setup->ExecutorsCount = 1; + setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]); + for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { + setup->Executors[i] = new TBasicExecutorPool(i, 5, 10, "basic"); + } + setup->Scheduler = new TBasicSchedulerThread; + + TActorSystem actorSystem(setup); + + actorSystem.Start(); + + TManualEvent doneEvent; + std::atomic<int> itemsProcessed{0}; + + auto responder = actorSystem.Register(TTaskActor::Create(SimpleResponder())); + auto requester = actorSystem.Register(TTaskActor::Create(SimpleRequester(responder, doneEvent, itemsProcessed))); + auto deadline = TMonotonic::Now() + duration; + while (itemsProcessed.load() < 10) { + UNIT_ASSERT_C(TMonotonic::Now() < deadline, "cannot observe 10 responses in " << duration); + Sleep(TDuration::MilliSeconds(100)); + } + actorSystem.Send(requester, stopEvent.release()); + doneEvent.WaitI(); + + UNIT_ASSERT_GE(itemsProcessed.load(), 10); + + actorSystem.Stop(); + } + + Y_UNIT_TEST(Basic) { + Check(TDuration::Seconds(10), std::make_unique<TEvStop>()); + } + +} // Y_UNIT_TEST_SUITE(TaskActor) diff --git a/library/cpp/actors/cppcoro/task_group.cpp b/library/cpp/actors/cppcoro/task_group.cpp new file mode 100644 index 0000000000..9ddd30d707 --- /dev/null +++ b/library/cpp/actors/cppcoro/task_group.cpp @@ -0,0 +1 @@ +#include "task_group.h" diff --git a/library/cpp/actors/cppcoro/task_group.h b/library/cpp/actors/cppcoro/task_group.h new file mode 100644 index 0000000000..b57cf59529 --- /dev/null +++ b/library/cpp/actors/cppcoro/task_group.h @@ -0,0 +1,333 @@ +#pragma once +#include <util/generic/ptr.h> +#include <util/system/compiler.h> +#include <util/system/yassert.h> +#include <coroutine> +#include <exception> +#include <variant> +#include <atomic> +#include <memory> + +namespace NActors { + + namespace NDetail { + + template<class T> + struct TTaskGroupResult final { + TTaskGroupResult* Next; + std::variant<std::exception_ptr, T> Result_; + + void SetException() { + Result_.template emplace<0>(std::current_exception()); + } + + template<class TResult> + void SetValue(TResult&& result) { + Result_.template emplace<1>(std::forward<TResult>(result)); + } + + T Extract() { + switch (Result_.index()) { + case 0: { + std::rethrow_exception(std::get<0>(std::move(Result_))); + } + case 1: { + return std::get<1>(std::move(Result_)); + } + } + std::terminate(); + } + }; + + template<> + struct TTaskGroupResult<void> final { + TTaskGroupResult* Next; + std::exception_ptr Exception_; + + void SetException() { + Exception_ = std::current_exception(); + } + + void SetValue() { + // nothing + } + + void Extract() { + if (Exception_) { + std::rethrow_exception(std::move(Exception_)); + } + } + }; + + template<class T> + struct TTaskGroupSink final + : public TAtomicRefCount<TTaskGroupSink<T>> + { + std::atomic<void*> LastReady{ nullptr }; + TTaskGroupResult<T>* ReadyQueue = nullptr; + std::coroutine_handle<> Continuation; + + static constexpr uintptr_t MarkerAwaiting = 1; + static constexpr uintptr_t MarkerDetached = 2; + + std::coroutine_handle<> Push(std::unique_ptr<TTaskGroupResult<T>>&& result) noexcept { + void* currentValue = LastReady.load(std::memory_order_acquire); + for (;;) { + if (currentValue == (void*)MarkerAwaiting) { + if (Y_UNLIKELY(!LastReady.compare_exchange_weak(currentValue, nullptr, std::memory_order_acquire))) { + continue; + } + // We consume the awaiter + Y_VERIFY(ReadyQueue == nullptr, "TaskGroup is awaiting with non-empty ready queue"); + result->Next = nullptr; + ReadyQueue = result.release(); + return std::exchange(Continuation, {}); + } + if (currentValue == (void*)MarkerDetached) { + // Task group is detached, discard the result + return std::noop_coroutine(); + } + TTaskGroupResult<T>* current = reinterpret_cast<TTaskGroupResult<T>*>(currentValue); + result->Next = current; + void* nextValue = result.get(); + if (Y_LIKELY(LastReady.compare_exchange_weak(currentValue, nextValue, std::memory_order_acq_rel))) { + // Result successfully added + result.release(); + return std::noop_coroutine(); + } + } + } + + bool Ready() const noexcept { + return ReadyQueue != nullptr || LastReady.load(std::memory_order_acquire) != nullptr; + } + + Y_NO_INLINE std::coroutine_handle<> Suspend(std::coroutine_handle<> h) noexcept { + Y_VERIFY(ReadyQueue == nullptr, "Caller suspending with non-empty ready queue"); + Continuation = h; + void* currentValue = LastReady.load(std::memory_order_acquire); + for (;;) { + if (currentValue == nullptr) { + if (Y_UNLIKELY(!LastReady.compare_exchange_weak(currentValue, (void*)MarkerAwaiting, std::memory_order_release))) { + continue; + } + // Continuation may wake up on another thread + return std::noop_coroutine(); + } + Y_VERIFY(currentValue != (void*)MarkerAwaiting, "TaskGroup is suspending with an awaiting marker"); + Y_VERIFY(currentValue != (void*)MarkerDetached, "TaskGroup is suspending with a detached marker"); + // Race: ready queue is not actually empty + Continuation = {}; + return h; + } + } + + std::unique_ptr<TTaskGroupResult<T>> Resume() noexcept { + std::unique_ptr<TTaskGroupResult<T>> result; + if (ReadyQueue == nullptr) { + void* headValue = LastReady.exchange(nullptr, std::memory_order_acq_rel); + Y_VERIFY(headValue != (void*)MarkerAwaiting, "TaskGroup is resuming with an awaiting marker"); + Y_VERIFY(headValue != (void*)MarkerDetached, "TaskGroup is resuming with a detached marker"); + Y_VERIFY(headValue, "TaskGroup is resuming with an empty queue"); + TTaskGroupResult<T>* head = reinterpret_cast<TTaskGroupResult<T>*>(headValue); + while (head) { + auto* next = std::exchange(head->Next, nullptr); + head->Next = ReadyQueue; + ReadyQueue = head; + head = next; + } + } + Y_VERIFY(ReadyQueue != nullptr); + result.reset(ReadyQueue); + ReadyQueue = std::exchange(result->Next, nullptr); + return result; + } + + static void Dispose(TTaskGroupResult<T>* head) noexcept { + while (head) { + auto* next = std::exchange(head->Next, nullptr); + std::unique_ptr<TTaskGroupResult<T>> ptr(head); + head = next; + } + } + + void Detach() noexcept { + // After this exchange all new results will be discarded + void* headValue = LastReady.exchange((void*)MarkerDetached, std::memory_order_acq_rel); + Y_VERIFY(headValue != (void*)MarkerAwaiting, "TaskGroup is detaching with an awaiting marker"); + Y_VERIFY(headValue != (void*)MarkerDetached, "TaskGroup is detaching with a detached marker"); + if (headValue) { + Dispose(reinterpret_cast<TTaskGroupResult<T>*>(headValue)); + } + if (ReadyQueue) { + Dispose(std::exchange(ReadyQueue, nullptr)); + } + } + }; + + template<class T> + class TTaskGroupPromiseBase { + public: + static auto initial_suspend() noexcept { return std::suspend_always{}; } + + class TFinalSuspend { + public: + TFinalSuspend(TTaskGroupPromiseBase& promise) + : Promise_(promise) + {} + + static bool await_ready() noexcept { return false; } + + Y_NO_INLINE std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept { + auto next = Promise_.Sink_->Push(std::move(Promise_.Result_)); + h.destroy(); + return next; + } + + static void await_resume() noexcept { std::terminate(); } + + private: + TTaskGroupPromiseBase& Promise_; + }; + + auto final_suspend() noexcept { return TFinalSuspend(*this); } + + void unhandled_exception() noexcept { + Result_->SetException(); + Sink_->Push(std::move(Result_)); + } + + void SetSink(const TIntrusivePtr<TTaskGroupSink<T>>& sink) { + Sink_ = sink; + } + + protected: + std::unique_ptr<TTaskGroupResult<T>> Result_ = std::make_unique<TTaskGroupResult<T>>(); + TIntrusivePtr<TTaskGroupSink<T>> Sink_; + }; + + template<class T> + class TTaskGroupPromise final : public TTaskGroupPromiseBase<T> { + public: + using THandle = std::coroutine_handle<TTaskGroupPromise<T>>; + + THandle get_return_object() noexcept { + return THandle::from_promise(*this); + } + + template<class TResult> + void return_value(TResult&& result) { + this->Result_->SetValue(std::forward<TResult>(result)); + } + }; + + template<> + class TTaskGroupPromise<void> final : public TTaskGroupPromiseBase<void> { + public: + using THandle = std::coroutine_handle<TTaskGroupPromise<void>>; + + THandle get_return_object() noexcept { + return THandle::from_promise(*this); + } + + void return_void() { + this->Result_->SetValue(); + } + }; + + template<class T> + class TTaskGroupTask final { + public: + using THandle = std::coroutine_handle<TTaskGroupPromise<T>>; + using promise_type = TTaskGroupPromise<T>; + using value_type = T; + + public: + TTaskGroupTask(THandle handle) + : Handle_(handle) + {} + + void Start(const TIntrusivePtr<TTaskGroupSink<T>>& sink) { + Handle_.promise().SetSink(sink); + Handle_.resume(); + } + + private: + THandle Handle_; + }; + + template<class T, class TAwaitable> + TTaskGroupTask<T> CreateTaskGroupTask(TAwaitable awaitable) { + co_return co_await std::move(awaitable); + } + + } // namespace NDetail + + /** + * A task group allows starting multiple subtasks of the same result type + * and awaiting them in a structured way. When task group is destroyed + * all subtasks are detached in a thread-safe way. + */ + template<class T> + class TTaskGroup { + public: + TTaskGroup() = default; + + ~TTaskGroup() { + Sink_->Detach(); + } + + /** + * Add task to the group that will await the result of awaitable + */ + template<class TAwaitable> + void AddTask(TAwaitable&& awaitable) { + auto task = NDetail::CreateTaskGroupTask<T>(std::forward<TAwaitable>(awaitable)); + task.Start(Sink_); + ++TaskCount_; + } + + /** + * Returns the number of tasks left unawaited + */ + size_t TaskCount() const { + return TaskCount_; + } + + class TAwaiter { + public: + explicit TAwaiter(TTaskGroup& taskGroup) noexcept + : TaskGroup_(taskGroup) + {} + + bool await_ready() const noexcept { + Y_VERIFY(TaskGroup_.TaskCount_ > 0, "Not enough tasks to await"); + --TaskGroup_.TaskCount_; + return TaskGroup_.Sink_->Ready(); + } + + std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept { + return TaskGroup_.Sink_->Suspend(h); + } + + T await_resume() { + return TaskGroup_.Sink_->Resume()->Extract(); + } + + private: + TTaskGroup& TaskGroup_; + }; + + /** + * Await result of the next task in the task group + */ + TAwaiter operator co_await() noexcept { + return TAwaiter(*this); + } + + private: + TIntrusivePtr<NDetail::TTaskGroupSink<T>> Sink_ = MakeIntrusive<NDetail::TTaskGroupSink<T>>(); + size_t TaskCount_ = 0; + }; + +} // namespace NActors diff --git a/library/cpp/actors/cppcoro/task_ut.cpp b/library/cpp/actors/cppcoro/task_ut.cpp new file mode 100644 index 0000000000..52c1b0e591 --- /dev/null +++ b/library/cpp/actors/cppcoro/task_ut.cpp @@ -0,0 +1,261 @@ +#include "task.h" +#include "task_group.h" +#include "await_callback.h" +#include <library/cpp/testing/unittest/registar.h> + +using namespace NActors; + +Y_UNIT_TEST_SUITE(Task) { + + TTask<void> SimpleReturnVoid() { + co_return; + } + + TTask<int> SimpleReturn42() { + co_return 42; + } + + Y_UNIT_TEST(SimpleVoidCoroutine) { + bool finished = false; + AwaitThenCallback(SimpleReturnVoid(), [&]() { + finished = true; + }); + UNIT_ASSERT(finished); + } + + Y_UNIT_TEST(SimpleIntCoroutine) { + std::optional<int> result; + AwaitThenCallback(SimpleReturn42(), [&](int value) { + result = value; + }); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 42); + } + + Y_UNIT_TEST(DoneAndWhenDone) { + auto task = SimpleReturn42(); + UNIT_ASSERT(task); + UNIT_ASSERT(!task.Done()); + + bool whenDoneFinished = false; + AwaitThenCallback(task.WhenDone(), [&]() { + whenDoneFinished = true; + }); + UNIT_ASSERT(whenDoneFinished); + UNIT_ASSERT(task.Done()); + + // WhenDone can be used even when task is already done + whenDoneFinished = false; + AwaitThenCallback(task.WhenDone(), [&]() { + whenDoneFinished = true; + }); + UNIT_ASSERT(whenDoneFinished); + + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 42); + UNIT_ASSERT(!task); + } + + Y_UNIT_TEST(ManualStart) { + auto task = SimpleReturn42(); + UNIT_ASSERT(task && !task.Done()); + task.Start(); + UNIT_ASSERT(task.Done()); + UNIT_ASSERT_VALUES_EQUAL(task.ExtractResult(), 42); + } + + template<class TCallback> + TTask<int> CallTwice(TCallback&& callback) { + int a = co_await callback(); + int b = co_await callback(); + co_return a + b; + } + + Y_UNIT_TEST(NestedAwait) { + auto task = CallTwice([]{ + return SimpleReturn42(); + }); + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 84); + } + + struct TPauseState { + std::coroutine_handle<> Next; + int NextResult; + + auto Wait() { + struct TAwaiter { + TPauseState* State; + + bool await_ready() const noexcept { return false; } + int await_resume() const noexcept { + return State->NextResult; + } + void await_suspend(std::coroutine_handle<> c) { + State->Next = c; + } + }; + return TAwaiter{ this }; + } + + explicit operator bool() const { + return bool(Next); + } + + void Resume(int result) { + Y_VERIFY(Next && !Next.done()); + NextResult = result; + std::exchange(Next, {}).resume(); + } + }; + + Y_UNIT_TEST(PausedAwait) { + TPauseState state; + auto callback = [&]{ + return state.Wait(); + }; + auto task = CallTwice(callback); + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + UNIT_ASSERT(!result); + UNIT_ASSERT(state); + state.Resume(11); + UNIT_ASSERT(!result); + UNIT_ASSERT(state); + state.Resume(22); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 33); + } + + Y_UNIT_TEST(ManuallyStartThenWhenDone) { + TPauseState state; + auto next = [&]{ + return state.Wait(); + }; + + auto task = [](auto next) -> TTask<int> { + int value = co_await next(); + co_return value * 2; + }(next); + + UNIT_ASSERT(task && !task.Done()); + task.Start(); + UNIT_ASSERT(!task.Done() && state); + bool finished = false; + AwaitThenCallback(task.WhenDone(), [&]{ + finished = true; + }); + UNIT_ASSERT(!finished && !task.Done()); + state.Resume(11); + UNIT_ASSERT(finished && task.Done()); + UNIT_ASSERT_VALUES_EQUAL(task.ExtractResult(), 22); + } + + Y_UNIT_TEST(ManuallyStartThenAwait) { + TPauseState state; + auto next = [&]{ + return state.Wait(); + }; + + auto task = [](auto next) -> TTask<int> { + int value = co_await next(); + co_return value * 2; + }(next); + + UNIT_ASSERT(task && !task.Done()); + task.Start(); + UNIT_ASSERT(!task.Done() && state); + + auto awaitTask = [](auto task) -> TTask<int> { + int value = co_await std::move(task); + co_return value * 3; + }(std::move(task)); + UNIT_ASSERT(awaitTask && !awaitTask.Done()); + std::optional<int> result; + AwaitThenCallback(std::move(awaitTask), [&](int value) { + result = value; + }); + UNIT_ASSERT(!result); + state.Resume(11); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 66); + } + + Y_UNIT_TEST(GroupWithTwoSubTasks) { + TPauseState state1; + TPauseState state2; + + std::vector<int> results; + auto task = [](auto& state1, auto& state2, auto& results) -> TTask<int> { + TTaskGroup<int> group; + group.AddTask(state1.Wait()); + group.AddTask(state2.Wait()); + int a = co_await group; + results.push_back(a); + int b = co_await group; + results.push_back(b); + co_return a + b; + }(state1, state2, results); + + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + + // We must be waiting for both states + UNIT_ASSERT(state1); + UNIT_ASSERT(state2); + state2.Resume(22); + UNIT_ASSERT_VALUES_EQUAL(results.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(results.at(0), 22); + UNIT_ASSERT(!result); + state1.Resume(11); + UNIT_ASSERT_VALUES_EQUAL(results.size(), 2u); + UNIT_ASSERT_VALUES_EQUAL(results.at(1), 11); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 33); + } + + Y_UNIT_TEST(GroupWithTwoSubTasksDetached) { + TPauseState state1; + TPauseState state2; + + std::vector<int> results; + auto task = [](auto& state1, auto& state2, auto& results) -> TTask<int> { + TTaskGroup<int> group; + group.AddTask(state1.Wait()); + group.AddTask(state2.Wait()); + int a = co_await group; + results.push_back(a); + co_return a; + }(state1, state2, results); + + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + + // We must be waiting for both states + UNIT_ASSERT(state1); + UNIT_ASSERT(state2); + state2.Resume(22); + UNIT_ASSERT_VALUES_EQUAL(results.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(results.at(0), 22); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 22); + + // We must resume the first state (otherwise memory leaks), but result is ignored + state1.Resume(11); + } + +} // Y_UNIT_TEST_SUITE(Task) diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..7cebff01de --- /dev/null +++ b/library/cpp/actors/cppcoro/ut/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,69 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-cppcoro-ut) +target_include_directories(library-cpp-actors-cppcoro-ut PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro +) +target_link_libraries(library-cpp-actors-cppcoro-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + cpp-actors-cppcoro + cpp-actors-testlib +) +target_link_options(library-cpp-actors-cppcoro-ut PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(library-cpp-actors-cppcoro-ut PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_ut.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor_ut.cpp +) +set_property( + TARGET + library-cpp-actors-cppcoro-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + library-cpp-actors-cppcoro-ut + TEST_TARGET + library-cpp-actors-cppcoro-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + library-cpp-actors-cppcoro-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + library-cpp-actors-cppcoro-ut + PROPERTY + PROCESSORS + 1 +) +target_allocator(library-cpp-actors-cppcoro-ut + system_allocator +) +vcs_info(library-cpp-actors-cppcoro-ut) diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.linux-aarch64.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..4a11af3456 --- /dev/null +++ b/library/cpp/actors/cppcoro/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,72 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-cppcoro-ut) +target_include_directories(library-cpp-actors-cppcoro-ut PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro +) +target_link_libraries(library-cpp-actors-cppcoro-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + cpp-actors-cppcoro + cpp-actors-testlib +) +target_link_options(library-cpp-actors-cppcoro-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(library-cpp-actors-cppcoro-ut PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_ut.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor_ut.cpp +) +set_property( + TARGET + library-cpp-actors-cppcoro-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + library-cpp-actors-cppcoro-ut + TEST_TARGET + library-cpp-actors-cppcoro-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + library-cpp-actors-cppcoro-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + library-cpp-actors-cppcoro-ut + PROPERTY + PROCESSORS + 1 +) +target_allocator(library-cpp-actors-cppcoro-ut + cpp-malloc-jemalloc +) +vcs_info(library-cpp-actors-cppcoro-ut) diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.linux-x86_64.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..2e2412f989 --- /dev/null +++ b/library/cpp/actors/cppcoro/ut/CMakeLists.linux-x86_64.txt @@ -0,0 +1,74 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-cppcoro-ut) +target_include_directories(library-cpp-actors-cppcoro-ut PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro +) +target_link_libraries(library-cpp-actors-cppcoro-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + cpp-actors-cppcoro + cpp-actors-testlib +) +target_link_options(library-cpp-actors-cppcoro-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(library-cpp-actors-cppcoro-ut PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_ut.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor_ut.cpp +) +set_property( + TARGET + library-cpp-actors-cppcoro-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + library-cpp-actors-cppcoro-ut + TEST_TARGET + library-cpp-actors-cppcoro-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + library-cpp-actors-cppcoro-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + library-cpp-actors-cppcoro-ut + PROPERTY + PROCESSORS + 1 +) +target_allocator(library-cpp-actors-cppcoro-ut + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(library-cpp-actors-cppcoro-ut) diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/library/cpp/actors/cppcoro/ut/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/actors/cppcoro/ut/CMakeLists.windows-x86_64.txt b/library/cpp/actors/cppcoro/ut/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..e3b8b019c8 --- /dev/null +++ b/library/cpp/actors/cppcoro/ut/CMakeLists.windows-x86_64.txt @@ -0,0 +1,62 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-cpp-actors-cppcoro-ut) +target_include_directories(library-cpp-actors-cppcoro-ut PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro +) +target_link_libraries(library-cpp-actors-cppcoro-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + cpp-actors-cppcoro + cpp-actors-testlib +) +target_sources(library-cpp-actors-cppcoro-ut PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_ut.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/cppcoro/task_actor_ut.cpp +) +set_property( + TARGET + library-cpp-actors-cppcoro-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + library-cpp-actors-cppcoro-ut + TEST_TARGET + library-cpp-actors-cppcoro-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + library-cpp-actors-cppcoro-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + library-cpp-actors-cppcoro-ut + PROPERTY + PROCESSORS + 1 +) +target_allocator(library-cpp-actors-cppcoro-ut + system_allocator +) +vcs_info(library-cpp-actors-cppcoro-ut) diff --git a/library/cpp/actors/cppcoro/ut/ya.make b/library/cpp/actors/cppcoro/ut/ya.make new file mode 100644 index 0000000000..24a9c73613 --- /dev/null +++ b/library/cpp/actors/cppcoro/ut/ya.make @@ -0,0 +1,12 @@ +UNITTEST_FOR(library/cpp/actors/cppcoro) + +PEERDIR( + library/cpp/actors/testlib +) + +SRCS( + task_ut.cpp + task_actor_ut.cpp +) + +END() diff --git a/library/cpp/actors/cppcoro/ya.make b/library/cpp/actors/cppcoro/ya.make new file mode 100644 index 0000000000..9890eccbee --- /dev/null +++ b/library/cpp/actors/cppcoro/ya.make @@ -0,0 +1,22 @@ +LIBRARY() + +PEERDIR( + library/cpp/actors/core +) + +SRCS( + await_callback.cpp + await_callback.h + task_actor.cpp + task_actor.h + task_group.cpp + task_group.h + task.cpp + task.h +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/library/cpp/actors/ya.make b/library/cpp/actors/ya.make index 2612c414cd..00d7801798 100644 --- a/library/cpp/actors/ya.make +++ b/library/cpp/actors/ya.make @@ -1,6 +1,7 @@ RECURSE( log_backend core + cppcoro dnsresolver examples interconnect |