aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/rain_check/core
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/rain_check/core
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/rain_check/core')
-rw-r--r--library/cpp/messagebus/rain_check/core/coro.cpp60
-rw-r--r--library/cpp/messagebus/rain_check/core/coro.h58
-rw-r--r--library/cpp/messagebus/rain_check/core/coro_stack.cpp41
-rw-r--r--library/cpp/messagebus/rain_check/core/coro_stack.h54
-rw-r--r--library/cpp/messagebus/rain_check/core/coro_ut.cpp106
-rw-r--r--library/cpp/messagebus/rain_check/core/env.cpp3
-rw-r--r--library/cpp/messagebus/rain_check/core/env.h47
-rw-r--r--library/cpp/messagebus/rain_check/core/fwd.h18
-rw-r--r--library/cpp/messagebus/rain_check/core/rain_check.cpp1
-rw-r--r--library/cpp/messagebus/rain_check/core/rain_check.h8
-rw-r--r--library/cpp/messagebus/rain_check/core/simple.cpp18
-rw-r--r--library/cpp/messagebus/rain_check/core/simple.h62
-rw-r--r--library/cpp/messagebus/rain_check/core/simple_ut.cpp59
-rw-r--r--library/cpp/messagebus/rain_check/core/sleep.cpp47
-rw-r--r--library/cpp/messagebus/rain_check/core/sleep.h24
-rw-r--r--library/cpp/messagebus/rain_check/core/sleep_ut.cpp46
-rw-r--r--library/cpp/messagebus/rain_check/core/spawn.cpp5
-rw-r--r--library/cpp/messagebus/rain_check/core/spawn.h50
-rw-r--r--library/cpp/messagebus/rain_check/core/spawn_ut.cpp145
-rw-r--r--library/cpp/messagebus/rain_check/core/task.cpp216
-rw-r--r--library/cpp/messagebus/rain_check/core/task.h184
-rw-r--r--library/cpp/messagebus/rain_check/core/track.cpp66
-rw-r--r--library/cpp/messagebus/rain_check/core/track.h97
-rw-r--r--library/cpp/messagebus/rain_check/core/track_ut.cpp45
-rw-r--r--library/cpp/messagebus/rain_check/core/ya.make25
25 files changed, 1485 insertions, 0 deletions
diff --git a/library/cpp/messagebus/rain_check/core/coro.cpp b/library/cpp/messagebus/rain_check/core/coro.cpp
new file mode 100644
index 0000000000..500841dd5b
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/coro.cpp
@@ -0,0 +1,60 @@
+#include "coro.h"
+
+#include "coro_stack.h"
+
+#include <util/system/tls.h>
+#include <util/system/yassert.h>
+
+using namespace NRainCheck;
+
+TContClosure TCoroTaskRunner::ContClosure(TCoroTaskRunner* runner, TArrayRef<char> memRegion) {
+ TContClosure contClosure;
+ contClosure.TrampoLine = runner;
+ contClosure.Stack = memRegion;
+ return contClosure;
+}
+
+TCoroTaskRunner::TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl)
+ : TTaskRunnerBase(env, parent, impl.Release())
+ , Stack(GetImpl()->StackSize)
+ , ContMachineContext(ContClosure(this, Stack.MemRegion()))
+ , CoroDone(false)
+{
+}
+
+TCoroTaskRunner::~TCoroTaskRunner() {
+ Y_ASSERT(CoroDone);
+}
+
+Y_POD_STATIC_THREAD(TContMachineContext*)
+CallerContext;
+Y_POD_STATIC_THREAD(TCoroTaskRunner*)
+Task;
+
+bool TCoroTaskRunner::ReplyReceived() {
+ Y_ASSERT(!CoroDone);
+
+ TContMachineContext me;
+
+ CallerContext = &me;
+ Task = this;
+
+ me.SwitchTo(&ContMachineContext);
+
+ Stack.VerifyNoStackOverflow();
+
+ Y_ASSERT(CallerContext == &me);
+ Y_ASSERT(Task == this);
+
+ return !CoroDone;
+}
+
+void NRainCheck::TCoroTaskRunner::DoRun() {
+ GetImpl()->Run();
+ CoroDone = true;
+ ContMachineContext.SwitchTo(CallerContext);
+}
+
+void NRainCheck::ICoroTask::WaitForSubtasks() {
+ Task->ContMachineContext.SwitchTo(CallerContext);
+}
diff --git a/library/cpp/messagebus/rain_check/core/coro.h b/library/cpp/messagebus/rain_check/core/coro.h
new file mode 100644
index 0000000000..95e2a30f9b
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/coro.h
@@ -0,0 +1,58 @@
+#pragma once
+
+#include "coro_stack.h"
+#include "task.h"
+
+#include <util/generic/ptr.h>
+#include <util/memory/alloc.h>
+#include <util/system/align.h>
+#include <util/system/context.h>
+#include <util/system/valgrind.h>
+
+namespace NRainCheck {
+ class ICoroTask;
+
+ class TCoroTaskRunner: public TTaskRunnerBase, private ITrampoLine {
+ friend class ICoroTask;
+
+ private:
+ NPrivate::TCoroStack Stack;
+ TContMachineContext ContMachineContext;
+ bool CoroDone;
+
+ public:
+ TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl);
+ ~TCoroTaskRunner() override;
+
+ private:
+ static TContClosure ContClosure(TCoroTaskRunner* runner, TArrayRef<char> memRegion);
+
+ bool ReplyReceived() override /* override */;
+
+ void DoRun() override /* override */;
+
+ ICoroTask* GetImpl() {
+ return (ICoroTask*)GetImplBase();
+ }
+ };
+
+ class ICoroTask: public ITaskBase {
+ friend class TCoroTaskRunner;
+
+ private:
+ size_t StackSize;
+
+ public:
+ typedef TCoroTaskRunner TTaskRunner;
+ typedef ICoroTask ITask;
+
+ ICoroTask(size_t stackSize = 0x2000)
+ : StackSize(stackSize)
+ {
+ }
+
+ virtual void Run() = 0;
+ static void WaitForSubtasks();
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.cpp b/library/cpp/messagebus/rain_check/core/coro_stack.cpp
new file mode 100644
index 0000000000..83b984ca6e
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/coro_stack.cpp
@@ -0,0 +1,41 @@
+#include "coro_stack.h"
+
+#include <util/generic/singleton.h>
+#include <util/system/valgrind.h>
+
+#include <cstdlib>
+#include <stdio.h>
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+
+TCoroStack::TCoroStack(size_t size)
+ : SizeValue(size)
+{
+ Y_VERIFY(size % sizeof(ui32) == 0);
+ Y_VERIFY(size >= 0x1000);
+
+ DataHolder.Reset(malloc(size));
+
+ // register in valgrind
+
+ *MagicNumberLocation() = MAGIC_NUMBER;
+
+#if defined(WITH_VALGRIND)
+ ValgrindStackId = VALGRIND_STACK_REGISTER(Data(), (char*)Data() + Size());
+#endif
+}
+
+TCoroStack::~TCoroStack() {
+#if defined(WITH_VALGRIND)
+ VALGRIND_STACK_DEREGISTER(ValgrindStackId);
+#endif
+
+ VerifyNoStackOverflow();
+}
+
+void TCoroStack::FailStackOverflow() {
+ static const char message[] = "stack overflow\n";
+ fputs(message, stderr);
+ abort();
+}
diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.h b/library/cpp/messagebus/rain_check/core/coro_stack.h
new file mode 100644
index 0000000000..2f3520e6e4
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/coro_stack.h
@@ -0,0 +1,54 @@
+#pragma once
+
+#include <util/generic/array_ref.h>
+#include <util/generic/ptr.h>
+#include <util/system/valgrind.h>
+
+namespace NRainCheck {
+ namespace NPrivate {
+ struct TCoroStack {
+ THolder<void, TFree> DataHolder;
+ size_t SizeValue;
+
+#if defined(WITH_VALGRIND)
+ size_t ValgrindStackId;
+#endif
+
+ TCoroStack(size_t size);
+ ~TCoroStack();
+
+ void* Data() {
+ return DataHolder.Get();
+ }
+
+ size_t Size() {
+ return SizeValue;
+ }
+
+ TArrayRef<char> MemRegion() {
+ return TArrayRef((char*)Data(), Size());
+ }
+
+ ui32* MagicNumberLocation() {
+#if STACK_GROW_DOWN == 1
+ return (ui32*)Data();
+#elif STACK_GROW_DOWN == 0
+ return ((ui32*)(((char*)Data()) + Size())) - 1;
+#else
+#error "unknown"
+#endif
+ }
+
+ static void FailStackOverflow();
+
+ inline void VerifyNoStackOverflow() noexcept {
+ if (Y_UNLIKELY(*MagicNumberLocation() != MAGIC_NUMBER)) {
+ FailStackOverflow();
+ }
+ }
+
+ static const ui32 MAGIC_NUMBER = 0xAB4D15FE;
+ };
+
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/coro_ut.cpp b/library/cpp/messagebus/rain_check/core/coro_ut.cpp
new file mode 100644
index 0000000000..61a33584a5
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/coro_ut.cpp
@@ -0,0 +1,106 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "coro.h"
+#include "spawn.h"
+
+#include <library/cpp/messagebus/rain_check/test/ut/test.h>
+
+using namespace NRainCheck;
+
+Y_UNIT_TEST_SUITE(RainCheckCoro) {
+ struct TSimpleCoroTask : ICoroTask {
+ TTestSync* const TestSync;
+
+ TSimpleCoroTask(TTestEnv*, TTestSync* testSync)
+ : TestSync(testSync)
+ {
+ }
+
+ void Run() override {
+ TestSync->WaitForAndIncrement(0);
+ }
+ };
+
+ Y_UNIT_TEST(Simple) {
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSimpleCoroTask>(&testSync);
+ testSync.WaitForAndIncrement(1);
+ }
+
+ struct TSleepCoroTask : ICoroTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSleepCoroTask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
+ TSubtaskCompletion SleepCompletion;
+
+ void Run() override {
+ Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1));
+ WaitForSubtasks();
+ TestSync->WaitForAndIncrement(0);
+ }
+ };
+
+ Y_UNIT_TEST(Sleep) {
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSleepCoroTask>(&testSync);
+
+ testSync.WaitForAndIncrement(1);
+ }
+
+ struct TSubtask : ICoroTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSubtask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
+ void Run() override {
+ TestSync->CheckAndIncrement(1);
+ }
+ };
+
+ struct TSpawnCoroTask : ICoroTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSpawnCoroTask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
+ TSubtaskCompletion SubtaskCompletion;
+
+ void Run() override {
+ TestSync->CheckAndIncrement(0);
+ SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync);
+ WaitForSubtasks();
+ TestSync->CheckAndIncrement(2);
+ }
+ };
+
+ Y_UNIT_TEST(Spawn) {
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSpawnCoroTask>(&testSync);
+
+ testSync.WaitForAndIncrement(3);
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/env.cpp b/library/cpp/messagebus/rain_check/core/env.cpp
new file mode 100644
index 0000000000..fdc0000dbd
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/env.cpp
@@ -0,0 +1,3 @@
+#include "env.h"
+
+using namespace NRainCheck;
diff --git a/library/cpp/messagebus/rain_check/core/env.h b/library/cpp/messagebus/rain_check/core/env.h
new file mode 100644
index 0000000000..f6dd7fceb6
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/env.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include "sleep.h"
+#include "spawn.h"
+
+#include <library/cpp/messagebus/actor/executor.h>
+
+#include <util/generic/ptr.h>
+
+namespace NRainCheck {
+ struct IEnv {
+ virtual ::NActor::TExecutor* GetExecutor() = 0;
+ virtual ~IEnv() {
+ }
+ };
+
+ template <typename TSelf>
+ struct TEnvTemplate: public IEnv {
+ template <typename TTask, typename TParam>
+ TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TParam param) {
+ return ::NRainCheck::SpawnTask<TTask, TSelf>((TSelf*)this, param);
+ }
+ };
+
+ template <typename TSelf>
+ struct TSimpleEnvTemplate: public TEnvTemplate<TSelf> {
+ ::NActor::TExecutorPtr Executor;
+ TSleepService SleepService;
+
+ TSimpleEnvTemplate(unsigned threadCount = 0)
+ : Executor(new ::NActor::TExecutor(threadCount != 0 ? threadCount : 4))
+ {
+ }
+
+ ::NActor::TExecutor* GetExecutor() override {
+ return Executor.Get();
+ }
+ };
+
+ struct TSimpleEnv: public TSimpleEnvTemplate<TSimpleEnv> {
+ TSimpleEnv(unsigned threadCount = 0)
+ : TSimpleEnvTemplate<TSimpleEnv>(threadCount)
+ {
+ }
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/fwd.h b/library/cpp/messagebus/rain_check/core/fwd.h
new file mode 100644
index 0000000000..b43ff8c17c
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/fwd.h
@@ -0,0 +1,18 @@
+#pragma once
+
+namespace NRainCheck {
+ namespace NPrivate {
+ }
+
+ class ITaskBase;
+ class ISimpleTask;
+ class ICoroTask;
+
+ struct ISubtaskListener;
+
+ class TTaskRunnerBase;
+
+ class TSubtaskCompletion;
+ struct IEnv;
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/rain_check.cpp b/library/cpp/messagebus/rain_check/core/rain_check.cpp
new file mode 100644
index 0000000000..2ea1f9e21b
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/rain_check.cpp
@@ -0,0 +1 @@
+#include "rain_check.h"
diff --git a/library/cpp/messagebus/rain_check/core/rain_check.h b/library/cpp/messagebus/rain_check/core/rain_check.h
new file mode 100644
index 0000000000..0f289717a2
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/rain_check.h
@@ -0,0 +1,8 @@
+#pragma once
+
+#include "coro.h"
+#include "env.h"
+#include "simple.h"
+#include "sleep.h"
+#include "spawn.h"
+#include "task.h"
diff --git a/library/cpp/messagebus/rain_check/core/simple.cpp b/library/cpp/messagebus/rain_check/core/simple.cpp
new file mode 100644
index 0000000000..70182b2f93
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/simple.cpp
@@ -0,0 +1,18 @@
+#include "simple.h"
+
+using namespace NRainCheck;
+
+TSimpleTaskRunner::TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask> impl)
+ : TTaskRunnerBase(env, parentTask, impl.Release())
+ , ContinueFunc(&ISimpleTask::Start)
+{
+}
+
+TSimpleTaskRunner::~TSimpleTaskRunner() {
+ Y_ASSERT(!ContinueFunc);
+}
+
+bool TSimpleTaskRunner::ReplyReceived() {
+ ContinueFunc = (GetImpl()->*(ContinueFunc.Func))();
+ return !!ContinueFunc;
+}
diff --git a/library/cpp/messagebus/rain_check/core/simple.h b/library/cpp/messagebus/rain_check/core/simple.h
new file mode 100644
index 0000000000..20e1bf19f5
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/simple.h
@@ -0,0 +1,62 @@
+#pragma once
+
+#include "task.h"
+
+namespace NRainCheck {
+ class ISimpleTask;
+
+ // Function called on continue
+ class TContinueFunc {
+ friend class TSimpleTaskRunner;
+
+ typedef TContinueFunc (ISimpleTask::*TFunc)();
+ TFunc Func;
+
+ public:
+ TContinueFunc()
+ : Func(nullptr)
+ {
+ }
+
+ TContinueFunc(void*)
+ : Func(nullptr)
+ {
+ }
+
+ template <typename TTask>
+ TContinueFunc(TContinueFunc (TTask::*func)())
+ : Func((TFunc)func)
+ {
+ static_assert((std::is_base_of<ISimpleTask, TTask>::value), "expect (std::is_base_of<ISimpleTask, TTask>::value)");
+ }
+
+ bool operator!() const {
+ return !Func;
+ }
+ };
+
+ class TSimpleTaskRunner: public TTaskRunnerBase {
+ public:
+ TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask>);
+ ~TSimpleTaskRunner() override;
+
+ private:
+ // Function to be called on completion of all pending tasks.
+ TContinueFunc ContinueFunc;
+
+ bool ReplyReceived() override /* override */;
+
+ ISimpleTask* GetImpl() {
+ return (ISimpleTask*)GetImplBase();
+ }
+ };
+
+ class ISimpleTask: public ITaskBase {
+ public:
+ typedef TSimpleTaskRunner TTaskRunner;
+ typedef ISimpleTask ITask;
+
+ virtual TContinueFunc Start() = 0;
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/simple_ut.cpp b/library/cpp/messagebus/rain_check/core/simple_ut.cpp
new file mode 100644
index 0000000000..d4545e05aa
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/simple_ut.cpp
@@ -0,0 +1,59 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <library/cpp/messagebus/rain_check/test/ut/test.h>
+
+#include <library/cpp/messagebus/latch.h>
+
+#include <util/system/event.h>
+
+using namespace NRainCheck;
+
+Y_UNIT_TEST_SUITE(RainCheckSimple) {
+ struct TTaskWithCompletionCallback: public ISimpleTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TTaskWithCompletionCallback(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
+ TSubtaskCompletion SleepCompletion;
+
+ TContinueFunc Start() override {
+ TestSync->CheckAndIncrement(0);
+
+ Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1));
+ SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::SleepCompletionCallback);
+
+ return &TTaskWithCompletionCallback::Last;
+ }
+
+ void SleepCompletionCallback(TSubtaskCompletion* completion) {
+ Y_VERIFY(completion == &SleepCompletion);
+ TestSync->CheckAndIncrement(1);
+
+ Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1));
+ SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::NextSleepCompletionCallback);
+ }
+
+ void NextSleepCompletionCallback(TSubtaskCompletion*) {
+ TestSync->CheckAndIncrement(2);
+ }
+
+ TContinueFunc Last() {
+ TestSync->CheckAndIncrement(3);
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(CompletionCallback) {
+ TTestEnv env;
+ TTestSync testSync;
+
+ env.SpawnTask<TTaskWithCompletionCallback>(&testSync);
+
+ testSync.WaitForAndIncrement(4);
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/sleep.cpp b/library/cpp/messagebus/rain_check/core/sleep.cpp
new file mode 100644
index 0000000000..f5d0b4cac9
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/sleep.cpp
@@ -0,0 +1,47 @@
+#include "rain_check.h"
+
+#include <util/system/yassert.h>
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TSleepService::TSleepService(::NBus::NPrivate::TScheduler* scheduler)
+ : Scheduler(scheduler)
+{
+}
+
+NRainCheck::TSleepService::TSleepService()
+ : SchedulerHolder(new TScheduler)
+ , Scheduler(SchedulerHolder.Get())
+{
+}
+
+NRainCheck::TSleepService::~TSleepService() {
+ if (!!SchedulerHolder) {
+ Scheduler->Stop();
+ }
+}
+
+namespace {
+ struct TSleepServiceScheduleItem: public IScheduleItem {
+ ISubtaskListener* const Parent;
+
+ TSleepServiceScheduleItem(ISubtaskListener* parent, TInstant time)
+ : IScheduleItem(time)
+ , Parent(parent)
+ {
+ }
+
+ void Do() override {
+ Parent->SetDone();
+ }
+ };
+}
+
+void TSleepService::Sleep(TSubtaskCompletion* r, TDuration duration) {
+ TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
+ r->SetRunning(current);
+ Scheduler->Schedule(new TSleepServiceScheduleItem(r, TInstant::Now() + duration));
+}
diff --git a/library/cpp/messagebus/rain_check/core/sleep.h b/library/cpp/messagebus/rain_check/core/sleep.h
new file mode 100644
index 0000000000..1a7a1f8674
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/sleep.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <library/cpp/messagebus/scheduler/scheduler.h>
+
+#include <util/datetime/base.h>
+
+namespace NRainCheck {
+ class TSleepService {
+ private:
+ THolder< ::NBus::NPrivate::TScheduler> SchedulerHolder;
+ ::NBus::NPrivate::TScheduler* const Scheduler;
+
+ public:
+ TSleepService(::NBus::NPrivate::TScheduler*);
+ TSleepService();
+ ~TSleepService();
+
+ // Wake up a task after given duration.
+ void Sleep(TSubtaskCompletion* r, TDuration);
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/sleep_ut.cpp b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp
new file mode 100644
index 0000000000..2ae85a87b1
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp
@@ -0,0 +1,46 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <library/cpp/messagebus/rain_check/test/ut/test.h>
+
+#include <util/system/event.h>
+
+using namespace NRainCheck;
+using namespace NActor;
+
+Y_UNIT_TEST_SUITE(Sleep) {
+ struct TTestTask: public ISimpleTask {
+ TSimpleEnv* const Env;
+ TTestSync* const TestSync;
+
+ TTestTask(TSimpleEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
+ TSubtaskCompletion Sleep;
+
+ TContinueFunc Start() override {
+ Env->SleepService.Sleep(&Sleep, TDuration::MilliSeconds(1));
+
+ TestSync->CheckAndIncrement(0);
+
+ return &TTestTask::Continue;
+ }
+
+ TContinueFunc Continue() {
+ TestSync->CheckAndIncrement(1);
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(Test) {
+ TTestSync testSync;
+
+ TSimpleEnv env;
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync);
+
+ testSync.WaitForAndIncrement(2);
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/spawn.cpp b/library/cpp/messagebus/rain_check/core/spawn.cpp
new file mode 100644
index 0000000000..c570355fbe
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/spawn.cpp
@@ -0,0 +1,5 @@
+#include "spawn.h"
+
+void NRainCheck::NPrivate::SpawnTaskImpl(TTaskRunnerBase* task) {
+ task->Schedule();
+}
diff --git a/library/cpp/messagebus/rain_check/core/spawn.h b/library/cpp/messagebus/rain_check/core/spawn.h
new file mode 100644
index 0000000000..f2b146bf29
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/spawn.h
@@ -0,0 +1,50 @@
+#pragma once
+
+#include "coro.h"
+#include "simple.h"
+#include "task.h"
+
+namespace NRainCheck {
+ namespace NPrivate {
+ void SpawnTaskImpl(TTaskRunnerBase* task);
+
+ template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam>
+ TIntrusivePtr<TRunner> SpawnTaskWithRunner(TEnv* env, TParam param1, ISubtaskListener* subtaskListener) {
+ static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)");
+ TIntrusivePtr<TRunner> task(new TRunner(env, subtaskListener, new TTask(env, param1)));
+ NPrivate::SpawnTaskImpl(task.Get());
+ return task;
+ }
+
+ template <typename TTask, typename ITask, typename TRunner, typename TEnv>
+ void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion) {
+ static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)");
+ TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
+ completion->SetRunning(current);
+ NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env)));
+ }
+
+ template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam>
+ void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion, TParam param) {
+ static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)");
+ TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
+ completion->SetRunning(current);
+ NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env, param)));
+ }
+
+ }
+
+ // Instantiate and start a task with given parameter.
+ template <typename TTask, typename TEnv, typename TParam>
+ TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TEnv* env, TParam param1, ISubtaskListener* subtaskListener = &TNopSubtaskListener::Instance) {
+ return NPrivate::SpawnTaskWithRunner<
+ TTask, typename TTask::ITask, typename TTask::TTaskRunner, TEnv, TParam>(env, param1, subtaskListener);
+ }
+
+ // Instantiate and start subtask of given task.
+ template <typename TTask, typename TEnv, typename TParam>
+ void SpawnSubtask(TEnv* env, TSubtaskCompletion* completion, TParam param) {
+ return NPrivate::SpawnSubtaskWithRunner<TTask, typename TTask::ITask, typename TTask::TTaskRunner>(env, completion, param);
+ }
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/spawn_ut.cpp b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp
new file mode 100644
index 0000000000..ba5a5e41cf
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp
@@ -0,0 +1,145 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <library/cpp/messagebus/rain_check/test/helper/misc.h>
+#include <library/cpp/messagebus/rain_check/test/ut/test.h>
+
+#include <library/cpp/messagebus/latch.h>
+
+#include <util/system/event.h>
+
+#include <array>
+
+using namespace NRainCheck;
+using namespace NActor;
+
+Y_UNIT_TEST_SUITE(Spawn) {
+ struct TTestTask: public ISimpleTask {
+ TTestSync* const TestSync;
+
+ TTestTask(TSimpleEnv*, TTestSync* testSync)
+ : TestSync(testSync)
+ , I(0)
+ {
+ }
+
+ TSystemEvent Started;
+
+ unsigned I;
+
+ TContinueFunc Start() override {
+ if (I < 4) {
+ I += 1;
+ return &TTestTask::Start;
+ }
+ TestSync->CheckAndIncrement(0);
+ return &TTestTask::Continue;
+ }
+
+ TContinueFunc Continue() {
+ TestSync->CheckAndIncrement(1);
+
+ Started.Signal();
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(Continuation) {
+ TTestSync testSync;
+
+ TSimpleEnv env;
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync);
+
+ testSync.WaitForAndIncrement(2);
+ }
+
+ struct TSubtask: public ISimpleTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSubtask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
+ TContinueFunc Start() override {
+ Sleep(TDuration::MilliSeconds(1));
+ TestSync->CheckAndIncrement(1);
+ return nullptr;
+ }
+ };
+
+ struct TSpawnTask: public ISimpleTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSpawnTask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
+ TSubtaskCompletion SubtaskCompletion;
+
+ TContinueFunc Start() override {
+ TestSync->CheckAndIncrement(0);
+ SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync);
+ return &TSpawnTask::Continue;
+ }
+
+ TContinueFunc Continue() {
+ TestSync->CheckAndIncrement(2);
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(Subtask) {
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnTask>(&testSync);
+
+ testSync.WaitForAndIncrement(3);
+ }
+
+ struct TSpawnLongTask: public ISimpleTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+ unsigned I;
+
+ TSpawnLongTask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ , I(0)
+ {
+ }
+
+ std::array<TSubtaskCompletion, 3> Subtasks;
+
+ TContinueFunc Start() override {
+ if (I == 1000) {
+ TestSync->CheckAndIncrement(0);
+ return nullptr;
+ }
+
+ for (auto& subtask : Subtasks) {
+ SpawnSubtask<TNopSimpleTask>(Env, &subtask, "");
+ }
+
+ ++I;
+ return &TSpawnLongTask::Start;
+ }
+ };
+
+ Y_UNIT_TEST(SubtaskLong) {
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnLongTask>(&testSync);
+
+ testSync.WaitForAndIncrement(1);
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp
new file mode 100644
index 0000000000..a098437d53
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/task.cpp
@@ -0,0 +1,216 @@
+#include "rain_check.h"
+
+#include <library/cpp/messagebus/actor/temp_tls_vector.h>
+
+#include <util/system/type_name.h>
+#include <util/system/tls.h>
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+
+using namespace NActor;
+
+namespace {
+ Y_POD_STATIC_THREAD(TTaskRunnerBase*)
+ ThreadCurrentTask;
+}
+
+void TNopSubtaskListener::SetDone() {
+}
+
+TNopSubtaskListener TNopSubtaskListener::Instance;
+
+TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl)
+ : TActor<TTaskRunnerBase>(env->GetExecutor())
+ , Impl(impl)
+ , ParentTask(parentTask)
+ //, HoldsSelfReference(false)
+ , Done(false)
+ , SetDoneCalled(false)
+{
+}
+
+TTaskRunnerBase::~TTaskRunnerBase() {
+ Y_ASSERT(Done);
+}
+
+namespace {
+ struct TRunningInThisThreadGuard {
+ TTaskRunnerBase* const Task;
+ TRunningInThisThreadGuard(TTaskRunnerBase* task)
+ : Task(task)
+ {
+ Y_ASSERT(!ThreadCurrentTask);
+ ThreadCurrentTask = task;
+ }
+
+ ~TRunningInThisThreadGuard() {
+ Y_ASSERT(ThreadCurrentTask == Task);
+ ThreadCurrentTask = nullptr;
+ }
+ };
+}
+
+void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) {
+ Y_ASSERT(RefCount() > 0);
+
+ TRunningInThisThreadGuard g(this);
+
+ //RetainRef();
+
+ for (;;) {
+ TTempTlsVector<TSubtaskCompletion*> temp;
+
+ temp.GetVector()->swap(Pending);
+
+ for (auto& pending : *temp.GetVector()) {
+ if (pending->IsComplete()) {
+ pending->FireCompletionCallback(GetImplBase());
+ } else {
+ Pending.push_back(pending);
+ }
+ }
+
+ if (!Pending.empty()) {
+ return;
+ }
+
+ if (!Done) {
+ Done = !ReplyReceived();
+ } else {
+ if (Pending.empty()) {
+ if (!SetDoneCalled) {
+ ParentTask->SetDone();
+ SetDoneCalled = true;
+ }
+ //ReleaseRef();
+ return;
+ }
+ }
+ }
+}
+
+bool TTaskRunnerBase::IsRunningInThisThread() const {
+ return ThreadCurrentTask == this;
+}
+
+TSubtaskCompletion::~TSubtaskCompletion() {
+ ESubtaskState state = State.Get();
+ Y_ASSERT(state == CREATED || state == DONE || state == CANCELED);
+}
+
+void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) {
+ Y_ASSERT(IsComplete());
+
+ if (!!CompletionFunc) {
+ TSubtaskCompletionFunc temp = CompletionFunc;
+ // completion func must be reset before calling it,
+ // because function may set it back
+ CompletionFunc = TSubtaskCompletionFunc();
+ (task->*(temp.Func))(this);
+ }
+}
+
+void NRainCheck::TSubtaskCompletion::Cancel() {
+ for (;;) {
+ ESubtaskState state = State.Get();
+ if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) {
+ return;
+ }
+ if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) {
+ return;
+ }
+ if (state == DONE && State.CompareAndSet(DONE, CANCELED)) {
+ return;
+ }
+ if (state == CANCEL_REQUESTED || state == CANCELED) {
+ return;
+ }
+ }
+}
+
+void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) {
+ Y_ASSERT(!TaskRunner);
+ Y_ASSERT(!!parent);
+
+ TaskRunner = parent;
+
+ parent->Pending.push_back(this);
+
+ parent->RefV();
+
+ for (;;) {
+ ESubtaskState current = State.Get();
+ if (current != CREATED && current != DONE) {
+ Y_FAIL("current state should be CREATED or DONE: %s", ToCString(current));
+ }
+ if (State.CompareAndSet(current, RUNNING)) {
+ return;
+ }
+ }
+}
+
+void TSubtaskCompletion::SetDone() {
+ Y_ASSERT(!!TaskRunner);
+ TTaskRunnerBase* temp = TaskRunner;
+ TaskRunner = nullptr;
+
+ for (;;) {
+ ESubtaskState state = State.Get();
+ if (state == RUNNING) {
+ if (State.CompareAndSet(RUNNING, DONE)) {
+ break;
+ }
+ } else if (state == CANCEL_REQUESTED) {
+ if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) {
+ break;
+ }
+ } else {
+ Y_FAIL("cannot SetDone: unknown state: %s", ToCString(state));
+ }
+ }
+
+ temp->ScheduleV();
+ temp->UnRefV();
+}
+
+#if 0
+void NRainCheck::TTaskRunnerBase::RetainRef()
+{
+ if (HoldsSelfReference) {
+ return;
+ }
+ HoldsSelfReference = true;
+ Ref();
+}
+
+void NRainCheck::TTaskRunnerBase::ReleaseRef()
+{
+ if (!HoldsSelfReference) {
+ return;
+ }
+ HoldsSelfReference = false;
+ DecRef();
+}
+#endif
+
+void TTaskRunnerBase::AssertInThisThread() const {
+ Y_ASSERT(IsRunningInThisThread());
+}
+
+TTaskRunnerBase* TTaskRunnerBase::CurrentTask() {
+ Y_VERIFY(!!ThreadCurrentTask);
+ return ThreadCurrentTask;
+}
+
+ITaskBase* TTaskRunnerBase::CurrentTaskImpl() {
+ return CurrentTask()->GetImplBase();
+}
+
+TString TTaskRunnerBase::GetStatusSingleLine() {
+ return TypeName(*Impl);
+}
+
+bool NRainCheck::AreWeInsideTask() {
+ return ThreadCurrentTask != nullptr;
+}
diff --git a/library/cpp/messagebus/rain_check/core/task.h b/library/cpp/messagebus/rain_check/core/task.h
new file mode 100644
index 0000000000..7d8778bcda
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/task.h
@@ -0,0 +1,184 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <library/cpp/messagebus/actor/actor.h>
+#include <library/cpp/messagebus/misc/atomic_box.h>
+
+#include <library/cpp/deprecated/enum_codegen/enum_codegen.h>
+
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/thread/lfstack.h>
+
+namespace NRainCheck {
+ struct ISubtaskListener {
+ virtual void SetDone() = 0;
+ virtual ~ISubtaskListener() {
+ }
+ };
+
+ struct TNopSubtaskListener: public ISubtaskListener {
+ void SetDone() override;
+
+ static TNopSubtaskListener Instance;
+ };
+
+ class TSubtaskCompletionFunc {
+ friend class TSubtaskCompletion;
+
+ typedef void (ITaskBase::*TFunc)(TSubtaskCompletion*);
+ TFunc Func;
+
+ public:
+ TSubtaskCompletionFunc()
+ : Func(nullptr)
+ {
+ }
+
+ TSubtaskCompletionFunc(void*)
+ : Func(nullptr)
+ {
+ }
+
+ template <typename TTask>
+ TSubtaskCompletionFunc(void (TTask::*func)(TSubtaskCompletion*))
+ : Func((TFunc)func)
+ {
+ static_assert((std::is_base_of<ITaskBase, TTask>::value), "expect (std::is_base_of<ITaskBase, TTask>::value)");
+ }
+
+ bool operator!() const {
+ return !Func;
+ }
+ };
+
+ template <typename T>
+ class TTaskFuture;
+
+#define SUBTASK_STATE_MAP(XX) \
+ XX(CREATED, "Initial") \
+ XX(RUNNING, "Running") \
+ XX(DONE, "Completed") \
+ XX(CANCEL_REQUESTED, "Cancel requested, but still executing") \
+ XX(CANCELED, "Canceled") \
+ /**/
+
+ enum ESubtaskState {
+ SUBTASK_STATE_MAP(ENUM_VALUE_GEN_NO_VALUE)
+ };
+
+ ENUM_TO_STRING(ESubtaskState, SUBTASK_STATE_MAP)
+
+ class TSubtaskCompletion : TNonCopyable, public ISubtaskListener {
+ friend struct TTaskAccessor;
+
+ private:
+ TAtomicBox<ESubtaskState> State;
+ TTaskRunnerBase* volatile TaskRunner;
+ TSubtaskCompletionFunc CompletionFunc;
+
+ public:
+ TSubtaskCompletion()
+ : State(CREATED)
+ , TaskRunner()
+ {
+ }
+ ~TSubtaskCompletion() override;
+
+ // Either done or cancel requested or cancelled
+ bool IsComplete() const {
+ ESubtaskState state = State.Get();
+ switch (state) {
+ case RUNNING:
+ return false;
+ case DONE:
+ return true;
+ case CANCEL_REQUESTED:
+ return false;
+ case CANCELED:
+ return true;
+ case CREATED:
+ Y_FAIL("not started");
+ default:
+ Y_FAIL("unknown value: %u", (unsigned)state);
+ }
+ }
+
+ void FireCompletionCallback(ITaskBase*);
+
+ void SetCompletionCallback(TSubtaskCompletionFunc func) {
+ CompletionFunc = func;
+ }
+
+ // Completed, but not cancelled
+ bool IsDone() const {
+ return State.Get() == DONE;
+ }
+
+ // Request cancel by actor
+ // Does nothing but marks task cancelled,
+ // and allows proceeding to next callback
+ void Cancel();
+
+ // called by service provider implementations
+ // must not be called by actor
+ void SetRunning(TTaskRunnerBase* parent);
+ void SetDone() override;
+ };
+
+ // See ISimpleTask, ICoroTask
+ class TTaskRunnerBase: public TAtomicRefCount<TTaskRunnerBase>, public NActor::TActor<TTaskRunnerBase> {
+ friend class NActor::TActor<TTaskRunnerBase>;
+ friend class TContinueFunc;
+ friend struct TTaskAccessor;
+ friend class TSubtaskCompletion;
+
+ private:
+ THolder<ITaskBase> Impl;
+
+ ISubtaskListener* const ParentTask;
+ // While task is running, it holds extra reference to self.
+ //bool HoldsSelfReference;
+ bool Done;
+ bool SetDoneCalled;
+
+ // Subtasks currently executed.
+ TVector<TSubtaskCompletion*> Pending;
+
+ void Act(NActor::TDefaultTag);
+
+ public:
+ // Construct task. Task is not automatically started.
+ TTaskRunnerBase(IEnv*, ISubtaskListener* parent, TAutoPtr<ITaskBase> impl);
+ ~TTaskRunnerBase() override;
+
+ bool IsRunningInThisThread() const;
+ void AssertInThisThread() const;
+ static TTaskRunnerBase* CurrentTask();
+ static ITaskBase* CurrentTaskImpl();
+
+ TString GetStatusSingleLine();
+
+ protected:
+ //void RetainRef();
+ //void ReleaseRef();
+ ITaskBase* GetImplBase() {
+ return Impl.Get();
+ }
+
+ private:
+ // true if need to call again
+ virtual bool ReplyReceived() = 0;
+ };
+
+ class ITaskBase {
+ public:
+ virtual ~ITaskBase() {
+ }
+ };
+
+ // Check that current method executed inside some task.
+ bool AreWeInsideTask();
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/track.cpp b/library/cpp/messagebus/rain_check/core/track.cpp
new file mode 100644
index 0000000000..092a51a214
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/track.cpp
@@ -0,0 +1,66 @@
+#include "track.h"
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+
+void TTaskTrackerReceipt::SetDone() {
+ TaskTracker->GetQueue<TTaskTrackerReceipt*>()->EnqueueAndSchedule(this);
+}
+
+TString TTaskTrackerReceipt::GetStatusSingleLine() {
+ return Task->GetStatusSingleLine();
+}
+
+TTaskTracker::TTaskTracker(NActor::TExecutor* executor)
+ : NActor::TActor<TTaskTracker>(executor)
+{
+}
+
+TTaskTracker::~TTaskTracker() {
+ Y_ASSERT(Tasks.Empty());
+}
+
+void TTaskTracker::Shutdown() {
+ ShutdownFlag.Set(true);
+ Schedule();
+ ShutdownEvent.WaitI();
+}
+
+void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, ITaskFactory* taskFactory) {
+ THolder<ITaskFactory> holder(taskFactory);
+
+ THolder<TTaskTrackerReceipt> receipt(new TTaskTrackerReceipt(this));
+ receipt->Task = taskFactory->NewTask(receipt.Get());
+
+ Tasks.PushBack(receipt.Release());
+}
+
+void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TTaskTrackerReceipt* receipt) {
+ Y_ASSERT(!receipt->Empty());
+ receipt->Unlink();
+ delete receipt;
+}
+
+void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<TTaskTrackerStatus>* status) {
+ TTaskTrackerStatus s;
+ s.Size = Tasks.Size();
+ status->SetResult(s);
+}
+
+void TTaskTracker::Act(NActor::TDefaultTag) {
+ GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->DequeueAll();
+ GetQueue<ITaskFactory*>()->DequeueAll();
+ GetQueue<TTaskTrackerReceipt*>()->DequeueAll();
+
+ if (ShutdownFlag.Get()) {
+ if (Tasks.Empty()) {
+ ShutdownEvent.Signal();
+ }
+ }
+}
+
+ui32 TTaskTracker::Size() {
+ TAsyncResult<TTaskTrackerStatus> r;
+ GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->EnqueueAndSchedule(&r);
+ return r.GetResult().Size;
+}
diff --git a/library/cpp/messagebus/rain_check/core/track.h b/library/cpp/messagebus/rain_check/core/track.h
new file mode 100644
index 0000000000..d387de7574
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/track.h
@@ -0,0 +1,97 @@
+#pragma once
+
+#include "spawn.h"
+#include "task.h"
+
+#include <library/cpp/messagebus/async_result.h>
+#include <library/cpp/messagebus/actor/queue_in_actor.h>
+#include <library/cpp/messagebus/misc/atomic_box.h>
+
+#include <util/generic/intrlist.h>
+#include <util/system/event.h>
+
+namespace NRainCheck {
+ class TTaskTracker;
+
+ namespace NPrivate {
+ struct ITaskFactory {
+ virtual TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener*) = 0;
+ virtual ~ITaskFactory() {
+ }
+ };
+
+ struct TTaskTrackerReceipt: public ISubtaskListener, public TIntrusiveListItem<TTaskTrackerReceipt> {
+ TTaskTracker* const TaskTracker;
+ TIntrusivePtr<TTaskRunnerBase> Task;
+
+ TTaskTrackerReceipt(TTaskTracker* taskTracker)
+ : TaskTracker(taskTracker)
+ {
+ }
+
+ void SetDone() override;
+
+ TString GetStatusSingleLine();
+ };
+
+ struct TTaskTrackerStatus {
+ ui32 Size;
+ };
+
+ }
+
+ class TTaskTracker
+ : public TAtomicRefCount<TTaskTracker>,
+ public NActor::TActor<TTaskTracker>,
+ public NActor::TQueueInActor<TTaskTracker, NPrivate::ITaskFactory*>,
+ public NActor::TQueueInActor<TTaskTracker, NPrivate::TTaskTrackerReceipt*>,
+ public NActor::TQueueInActor<TTaskTracker, TAsyncResult<NPrivate::TTaskTrackerStatus>*> {
+ friend struct NPrivate::TTaskTrackerReceipt;
+
+ private:
+ TAtomicBox<bool> ShutdownFlag;
+ TSystemEvent ShutdownEvent;
+
+ TIntrusiveList<NPrivate::TTaskTrackerReceipt> Tasks;
+
+ template <typename TItem>
+ NActor::TQueueInActor<TTaskTracker, TItem>* GetQueue() {
+ return this;
+ }
+
+ public:
+ TTaskTracker(NActor::TExecutor* executor);
+ ~TTaskTracker() override;
+
+ void Shutdown();
+
+ void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::ITaskFactory*);
+ void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::TTaskTrackerReceipt*);
+ void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<NPrivate::TTaskTrackerStatus>*);
+
+ void Act(NActor::TDefaultTag);
+
+ template <typename TTask, typename TEnv, typename TParam>
+ void Spawn(TEnv* env, TParam param) {
+ struct TTaskFactory: public NPrivate::ITaskFactory {
+ TEnv* const Env;
+ TParam Param;
+
+ TTaskFactory(TEnv* env, TParam param)
+ : Env(env)
+ , Param(param)
+ {
+ }
+
+ TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener* subtaskListener) override {
+ return NRainCheck::SpawnTask<TTask>(Env, Param, subtaskListener).Get();
+ }
+ };
+
+ GetQueue<NPrivate::ITaskFactory*>()->EnqueueAndSchedule(new TTaskFactory(env, param));
+ }
+
+ ui32 Size();
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/track_ut.cpp b/library/cpp/messagebus/rain_check/core/track_ut.cpp
new file mode 100644
index 0000000000..05f7de1319
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/track_ut.cpp
@@ -0,0 +1,45 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "track.h"
+
+#include <library/cpp/messagebus/rain_check/test/helper/misc.h>
+#include <library/cpp/messagebus/rain_check/test/ut/test.h>
+
+using namespace NRainCheck;
+
+Y_UNIT_TEST_SUITE(TaskTracker) {
+ struct TTaskForTracker: public ISimpleTask {
+ TTestSync* const TestSync;
+
+ TTaskForTracker(TTestEnv*, TTestSync* testSync)
+ : TestSync(testSync)
+ {
+ }
+
+ TContinueFunc Start() override {
+ TestSync->WaitForAndIncrement(0);
+ TestSync->WaitForAndIncrement(2);
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(Simple) {
+ TTestEnv env;
+
+ TIntrusivePtr<TTaskTracker> tracker(new TTaskTracker(env.GetExecutor()));
+
+ TTestSync testSync;
+
+ tracker->Spawn<TTaskForTracker>(&env, &testSync);
+
+ testSync.WaitFor(1);
+
+ UNIT_ASSERT_VALUES_EQUAL(1u, tracker->Size());
+
+ testSync.CheckAndIncrement(1);
+
+ testSync.WaitForAndIncrement(3);
+
+ tracker->Shutdown();
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/ya.make b/library/cpp/messagebus/rain_check/core/ya.make
new file mode 100644
index 0000000000..c6fb5640d4
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/ya.make
@@ -0,0 +1,25 @@
+LIBRARY()
+
+OWNER(g:messagebus)
+
+PEERDIR(
+ library/cpp/coroutine/engine
+ library/cpp/deprecated/enum_codegen
+ library/cpp/messagebus
+ library/cpp/messagebus/actor
+ library/cpp/messagebus/scheduler
+)
+
+SRCS(
+ coro.cpp
+ coro_stack.cpp
+ env.cpp
+ rain_check.cpp
+ simple.cpp
+ sleep.cpp
+ spawn.cpp
+ task.cpp
+ track.cpp
+)
+
+END()