diff options
author | snaury <snaury@ydb.tech> | 2023-08-07 13:35:52 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-08-07 14:26:11 +0300 |
commit | 884279adc343b71eaadb4d75d088c1338aeda378 (patch) | |
tree | 483c1ecc0788d8d597dd60cee1e1c2adce5b6345 /library/cpp/actors/cppcoro/task_ut.cpp | |
parent | d79a2a297724679b1adf2ab348defdbdae3e73be (diff) | |
download | ydb-884279adc343b71eaadb4d75d088c1338aeda378.tar.gz |
Support running C++ coroutines as actors KIKIMR-18962
Diffstat (limited to 'library/cpp/actors/cppcoro/task_ut.cpp')
-rw-r--r-- | library/cpp/actors/cppcoro/task_ut.cpp | 261 |
1 files changed, 261 insertions, 0 deletions
diff --git a/library/cpp/actors/cppcoro/task_ut.cpp b/library/cpp/actors/cppcoro/task_ut.cpp new file mode 100644 index 0000000000..52c1b0e591 --- /dev/null +++ b/library/cpp/actors/cppcoro/task_ut.cpp @@ -0,0 +1,261 @@ +#include "task.h" +#include "task_group.h" +#include "await_callback.h" +#include <library/cpp/testing/unittest/registar.h> + +using namespace NActors; + +Y_UNIT_TEST_SUITE(Task) { + + TTask<void> SimpleReturnVoid() { + co_return; + } + + TTask<int> SimpleReturn42() { + co_return 42; + } + + Y_UNIT_TEST(SimpleVoidCoroutine) { + bool finished = false; + AwaitThenCallback(SimpleReturnVoid(), [&]() { + finished = true; + }); + UNIT_ASSERT(finished); + } + + Y_UNIT_TEST(SimpleIntCoroutine) { + std::optional<int> result; + AwaitThenCallback(SimpleReturn42(), [&](int value) { + result = value; + }); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 42); + } + + Y_UNIT_TEST(DoneAndWhenDone) { + auto task = SimpleReturn42(); + UNIT_ASSERT(task); + UNIT_ASSERT(!task.Done()); + + bool whenDoneFinished = false; + AwaitThenCallback(task.WhenDone(), [&]() { + whenDoneFinished = true; + }); + UNIT_ASSERT(whenDoneFinished); + UNIT_ASSERT(task.Done()); + + // WhenDone can be used even when task is already done + whenDoneFinished = false; + AwaitThenCallback(task.WhenDone(), [&]() { + whenDoneFinished = true; + }); + UNIT_ASSERT(whenDoneFinished); + + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 42); + UNIT_ASSERT(!task); + } + + Y_UNIT_TEST(ManualStart) { + auto task = SimpleReturn42(); + UNIT_ASSERT(task && !task.Done()); + task.Start(); + UNIT_ASSERT(task.Done()); + UNIT_ASSERT_VALUES_EQUAL(task.ExtractResult(), 42); + } + + template<class TCallback> + TTask<int> CallTwice(TCallback&& callback) { + int a = co_await callback(); + int b = co_await callback(); + co_return a + b; + } + + Y_UNIT_TEST(NestedAwait) { + auto task = CallTwice([]{ + return SimpleReturn42(); + }); + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 84); + } + + struct TPauseState { + std::coroutine_handle<> Next; + int NextResult; + + auto Wait() { + struct TAwaiter { + TPauseState* State; + + bool await_ready() const noexcept { return false; } + int await_resume() const noexcept { + return State->NextResult; + } + void await_suspend(std::coroutine_handle<> c) { + State->Next = c; + } + }; + return TAwaiter{ this }; + } + + explicit operator bool() const { + return bool(Next); + } + + void Resume(int result) { + Y_VERIFY(Next && !Next.done()); + NextResult = result; + std::exchange(Next, {}).resume(); + } + }; + + Y_UNIT_TEST(PausedAwait) { + TPauseState state; + auto callback = [&]{ + return state.Wait(); + }; + auto task = CallTwice(callback); + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + UNIT_ASSERT(!result); + UNIT_ASSERT(state); + state.Resume(11); + UNIT_ASSERT(!result); + UNIT_ASSERT(state); + state.Resume(22); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 33); + } + + Y_UNIT_TEST(ManuallyStartThenWhenDone) { + TPauseState state; + auto next = [&]{ + return state.Wait(); + }; + + auto task = [](auto next) -> TTask<int> { + int value = co_await next(); + co_return value * 2; + }(next); + + UNIT_ASSERT(task && !task.Done()); + task.Start(); + UNIT_ASSERT(!task.Done() && state); + bool finished = false; + AwaitThenCallback(task.WhenDone(), [&]{ + finished = true; + }); + UNIT_ASSERT(!finished && !task.Done()); + state.Resume(11); + UNIT_ASSERT(finished && task.Done()); + UNIT_ASSERT_VALUES_EQUAL(task.ExtractResult(), 22); + } + + Y_UNIT_TEST(ManuallyStartThenAwait) { + TPauseState state; + auto next = [&]{ + return state.Wait(); + }; + + auto task = [](auto next) -> TTask<int> { + int value = co_await next(); + co_return value * 2; + }(next); + + UNIT_ASSERT(task && !task.Done()); + task.Start(); + UNIT_ASSERT(!task.Done() && state); + + auto awaitTask = [](auto task) -> TTask<int> { + int value = co_await std::move(task); + co_return value * 3; + }(std::move(task)); + UNIT_ASSERT(awaitTask && !awaitTask.Done()); + std::optional<int> result; + AwaitThenCallback(std::move(awaitTask), [&](int value) { + result = value; + }); + UNIT_ASSERT(!result); + state.Resume(11); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 66); + } + + Y_UNIT_TEST(GroupWithTwoSubTasks) { + TPauseState state1; + TPauseState state2; + + std::vector<int> results; + auto task = [](auto& state1, auto& state2, auto& results) -> TTask<int> { + TTaskGroup<int> group; + group.AddTask(state1.Wait()); + group.AddTask(state2.Wait()); + int a = co_await group; + results.push_back(a); + int b = co_await group; + results.push_back(b); + co_return a + b; + }(state1, state2, results); + + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + + // We must be waiting for both states + UNIT_ASSERT(state1); + UNIT_ASSERT(state2); + state2.Resume(22); + UNIT_ASSERT_VALUES_EQUAL(results.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(results.at(0), 22); + UNIT_ASSERT(!result); + state1.Resume(11); + UNIT_ASSERT_VALUES_EQUAL(results.size(), 2u); + UNIT_ASSERT_VALUES_EQUAL(results.at(1), 11); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 33); + } + + Y_UNIT_TEST(GroupWithTwoSubTasksDetached) { + TPauseState state1; + TPauseState state2; + + std::vector<int> results; + auto task = [](auto& state1, auto& state2, auto& results) -> TTask<int> { + TTaskGroup<int> group; + group.AddTask(state1.Wait()); + group.AddTask(state2.Wait()); + int a = co_await group; + results.push_back(a); + co_return a; + }(state1, state2, results); + + std::optional<int> result; + AwaitThenCallback(std::move(task), [&](int value) { + result = value; + }); + + // We must be waiting for both states + UNIT_ASSERT(state1); + UNIT_ASSERT(state2); + state2.Resume(22); + UNIT_ASSERT_VALUES_EQUAL(results.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(results.at(0), 22); + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL(*result, 22); + + // We must resume the first state (otherwise memory leaks), but result is ignored + state1.Resume(11); + } + +} // Y_UNIT_TEST_SUITE(Task) |