diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/rain_check/core | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/rain_check/core')
25 files changed, 1485 insertions, 0 deletions
diff --git a/library/cpp/messagebus/rain_check/core/coro.cpp b/library/cpp/messagebus/rain_check/core/coro.cpp new file mode 100644 index 0000000000..500841dd5b --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/coro.cpp @@ -0,0 +1,60 @@ +#include "coro.h" + +#include "coro_stack.h" + +#include <util/system/tls.h> +#include <util/system/yassert.h> + +using namespace NRainCheck; + +TContClosure TCoroTaskRunner::ContClosure(TCoroTaskRunner* runner, TArrayRef<char> memRegion) { + TContClosure contClosure; + contClosure.TrampoLine = runner; + contClosure.Stack = memRegion; + return contClosure; +} + +TCoroTaskRunner::TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl) + : TTaskRunnerBase(env, parent, impl.Release()) + , Stack(GetImpl()->StackSize) + , ContMachineContext(ContClosure(this, Stack.MemRegion())) + , CoroDone(false) +{ +} + +TCoroTaskRunner::~TCoroTaskRunner() { + Y_ASSERT(CoroDone); +} + +Y_POD_STATIC_THREAD(TContMachineContext*) +CallerContext; +Y_POD_STATIC_THREAD(TCoroTaskRunner*) +Task; + +bool TCoroTaskRunner::ReplyReceived() { + Y_ASSERT(!CoroDone); + + TContMachineContext me; + + CallerContext = &me; + Task = this; + + me.SwitchTo(&ContMachineContext); + + Stack.VerifyNoStackOverflow(); + + Y_ASSERT(CallerContext == &me); + Y_ASSERT(Task == this); + + return !CoroDone; +} + +void NRainCheck::TCoroTaskRunner::DoRun() { + GetImpl()->Run(); + CoroDone = true; + ContMachineContext.SwitchTo(CallerContext); +} + +void NRainCheck::ICoroTask::WaitForSubtasks() { + Task->ContMachineContext.SwitchTo(CallerContext); +} diff --git a/library/cpp/messagebus/rain_check/core/coro.h b/library/cpp/messagebus/rain_check/core/coro.h new file mode 100644 index 0000000000..95e2a30f9b --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/coro.h @@ -0,0 +1,58 @@ +#pragma once + +#include "coro_stack.h" +#include "task.h" + +#include <util/generic/ptr.h> +#include <util/memory/alloc.h> +#include <util/system/align.h> +#include <util/system/context.h> +#include <util/system/valgrind.h> + +namespace NRainCheck { + class ICoroTask; + + class TCoroTaskRunner: public TTaskRunnerBase, private ITrampoLine { + friend class ICoroTask; + + private: + NPrivate::TCoroStack Stack; + TContMachineContext ContMachineContext; + bool CoroDone; + + public: + TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl); + ~TCoroTaskRunner() override; + + private: + static TContClosure ContClosure(TCoroTaskRunner* runner, TArrayRef<char> memRegion); + + bool ReplyReceived() override /* override */; + + void DoRun() override /* override */; + + ICoroTask* GetImpl() { + return (ICoroTask*)GetImplBase(); + } + }; + + class ICoroTask: public ITaskBase { + friend class TCoroTaskRunner; + + private: + size_t StackSize; + + public: + typedef TCoroTaskRunner TTaskRunner; + typedef ICoroTask ITask; + + ICoroTask(size_t stackSize = 0x2000) + : StackSize(stackSize) + { + } + + virtual void Run() = 0; + static void WaitForSubtasks(); + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.cpp b/library/cpp/messagebus/rain_check/core/coro_stack.cpp new file mode 100644 index 0000000000..83b984ca6e --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/coro_stack.cpp @@ -0,0 +1,41 @@ +#include "coro_stack.h" + +#include <util/generic/singleton.h> +#include <util/system/valgrind.h> + +#include <cstdlib> +#include <stdio.h> + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; + +TCoroStack::TCoroStack(size_t size) + : SizeValue(size) +{ + Y_VERIFY(size % sizeof(ui32) == 0); + Y_VERIFY(size >= 0x1000); + + DataHolder.Reset(malloc(size)); + + // register in valgrind + + *MagicNumberLocation() = MAGIC_NUMBER; + +#if defined(WITH_VALGRIND) + ValgrindStackId = VALGRIND_STACK_REGISTER(Data(), (char*)Data() + Size()); +#endif +} + +TCoroStack::~TCoroStack() { +#if defined(WITH_VALGRIND) + VALGRIND_STACK_DEREGISTER(ValgrindStackId); +#endif + + VerifyNoStackOverflow(); +} + +void TCoroStack::FailStackOverflow() { + static const char message[] = "stack overflow\n"; + fputs(message, stderr); + abort(); +} diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.h b/library/cpp/messagebus/rain_check/core/coro_stack.h new file mode 100644 index 0000000000..2f3520e6e4 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/coro_stack.h @@ -0,0 +1,54 @@ +#pragma once + +#include <util/generic/array_ref.h> +#include <util/generic/ptr.h> +#include <util/system/valgrind.h> + +namespace NRainCheck { + namespace NPrivate { + struct TCoroStack { + THolder<void, TFree> DataHolder; + size_t SizeValue; + +#if defined(WITH_VALGRIND) + size_t ValgrindStackId; +#endif + + TCoroStack(size_t size); + ~TCoroStack(); + + void* Data() { + return DataHolder.Get(); + } + + size_t Size() { + return SizeValue; + } + + TArrayRef<char> MemRegion() { + return TArrayRef((char*)Data(), Size()); + } + + ui32* MagicNumberLocation() { +#if STACK_GROW_DOWN == 1 + return (ui32*)Data(); +#elif STACK_GROW_DOWN == 0 + return ((ui32*)(((char*)Data()) + Size())) - 1; +#else +#error "unknown" +#endif + } + + static void FailStackOverflow(); + + inline void VerifyNoStackOverflow() noexcept { + if (Y_UNLIKELY(*MagicNumberLocation() != MAGIC_NUMBER)) { + FailStackOverflow(); + } + } + + static const ui32 MAGIC_NUMBER = 0xAB4D15FE; + }; + + } +} diff --git a/library/cpp/messagebus/rain_check/core/coro_ut.cpp b/library/cpp/messagebus/rain_check/core/coro_ut.cpp new file mode 100644 index 0000000000..61a33584a5 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/coro_ut.cpp @@ -0,0 +1,106 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "coro.h" +#include "spawn.h" + +#include <library/cpp/messagebus/rain_check/test/ut/test.h> + +using namespace NRainCheck; + +Y_UNIT_TEST_SUITE(RainCheckCoro) { + struct TSimpleCoroTask : ICoroTask { + TTestSync* const TestSync; + + TSimpleCoroTask(TTestEnv*, TTestSync* testSync) + : TestSync(testSync) + { + } + + void Run() override { + TestSync->WaitForAndIncrement(0); + } + }; + + Y_UNIT_TEST(Simple) { + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSimpleCoroTask>(&testSync); + testSync.WaitForAndIncrement(1); + } + + struct TSleepCoroTask : ICoroTask { + TTestEnv* const Env; + TTestSync* const TestSync; + + TSleepCoroTask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + + TSubtaskCompletion SleepCompletion; + + void Run() override { + Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1)); + WaitForSubtasks(); + TestSync->WaitForAndIncrement(0); + } + }; + + Y_UNIT_TEST(Sleep) { + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSleepCoroTask>(&testSync); + + testSync.WaitForAndIncrement(1); + } + + struct TSubtask : ICoroTask { + TTestEnv* const Env; + TTestSync* const TestSync; + + TSubtask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + + void Run() override { + TestSync->CheckAndIncrement(1); + } + }; + + struct TSpawnCoroTask : ICoroTask { + TTestEnv* const Env; + TTestSync* const TestSync; + + TSpawnCoroTask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + + TSubtaskCompletion SubtaskCompletion; + + void Run() override { + TestSync->CheckAndIncrement(0); + SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync); + WaitForSubtasks(); + TestSync->CheckAndIncrement(2); + } + }; + + Y_UNIT_TEST(Spawn) { + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSpawnCoroTask>(&testSync); + + testSync.WaitForAndIncrement(3); + } +} diff --git a/library/cpp/messagebus/rain_check/core/env.cpp b/library/cpp/messagebus/rain_check/core/env.cpp new file mode 100644 index 0000000000..fdc0000dbd --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/env.cpp @@ -0,0 +1,3 @@ +#include "env.h" + +using namespace NRainCheck; diff --git a/library/cpp/messagebus/rain_check/core/env.h b/library/cpp/messagebus/rain_check/core/env.h new file mode 100644 index 0000000000..f6dd7fceb6 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/env.h @@ -0,0 +1,47 @@ +#pragma once + +#include "sleep.h" +#include "spawn.h" + +#include <library/cpp/messagebus/actor/executor.h> + +#include <util/generic/ptr.h> + +namespace NRainCheck { + struct IEnv { + virtual ::NActor::TExecutor* GetExecutor() = 0; + virtual ~IEnv() { + } + }; + + template <typename TSelf> + struct TEnvTemplate: public IEnv { + template <typename TTask, typename TParam> + TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TParam param) { + return ::NRainCheck::SpawnTask<TTask, TSelf>((TSelf*)this, param); + } + }; + + template <typename TSelf> + struct TSimpleEnvTemplate: public TEnvTemplate<TSelf> { + ::NActor::TExecutorPtr Executor; + TSleepService SleepService; + + TSimpleEnvTemplate(unsigned threadCount = 0) + : Executor(new ::NActor::TExecutor(threadCount != 0 ? threadCount : 4)) + { + } + + ::NActor::TExecutor* GetExecutor() override { + return Executor.Get(); + } + }; + + struct TSimpleEnv: public TSimpleEnvTemplate<TSimpleEnv> { + TSimpleEnv(unsigned threadCount = 0) + : TSimpleEnvTemplate<TSimpleEnv>(threadCount) + { + } + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/fwd.h b/library/cpp/messagebus/rain_check/core/fwd.h new file mode 100644 index 0000000000..b43ff8c17c --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/fwd.h @@ -0,0 +1,18 @@ +#pragma once + +namespace NRainCheck { + namespace NPrivate { + } + + class ITaskBase; + class ISimpleTask; + class ICoroTask; + + struct ISubtaskListener; + + class TTaskRunnerBase; + + class TSubtaskCompletion; + struct IEnv; + +} diff --git a/library/cpp/messagebus/rain_check/core/rain_check.cpp b/library/cpp/messagebus/rain_check/core/rain_check.cpp new file mode 100644 index 0000000000..2ea1f9e21b --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/rain_check.cpp @@ -0,0 +1 @@ +#include "rain_check.h" diff --git a/library/cpp/messagebus/rain_check/core/rain_check.h b/library/cpp/messagebus/rain_check/core/rain_check.h new file mode 100644 index 0000000000..0f289717a2 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/rain_check.h @@ -0,0 +1,8 @@ +#pragma once + +#include "coro.h" +#include "env.h" +#include "simple.h" +#include "sleep.h" +#include "spawn.h" +#include "task.h" diff --git a/library/cpp/messagebus/rain_check/core/simple.cpp b/library/cpp/messagebus/rain_check/core/simple.cpp new file mode 100644 index 0000000000..70182b2f93 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/simple.cpp @@ -0,0 +1,18 @@ +#include "simple.h" + +using namespace NRainCheck; + +TSimpleTaskRunner::TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask> impl) + : TTaskRunnerBase(env, parentTask, impl.Release()) + , ContinueFunc(&ISimpleTask::Start) +{ +} + +TSimpleTaskRunner::~TSimpleTaskRunner() { + Y_ASSERT(!ContinueFunc); +} + +bool TSimpleTaskRunner::ReplyReceived() { + ContinueFunc = (GetImpl()->*(ContinueFunc.Func))(); + return !!ContinueFunc; +} diff --git a/library/cpp/messagebus/rain_check/core/simple.h b/library/cpp/messagebus/rain_check/core/simple.h new file mode 100644 index 0000000000..20e1bf19f5 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/simple.h @@ -0,0 +1,62 @@ +#pragma once + +#include "task.h" + +namespace NRainCheck { + class ISimpleTask; + + // Function called on continue + class TContinueFunc { + friend class TSimpleTaskRunner; + + typedef TContinueFunc (ISimpleTask::*TFunc)(); + TFunc Func; + + public: + TContinueFunc() + : Func(nullptr) + { + } + + TContinueFunc(void*) + : Func(nullptr) + { + } + + template <typename TTask> + TContinueFunc(TContinueFunc (TTask::*func)()) + : Func((TFunc)func) + { + static_assert((std::is_base_of<ISimpleTask, TTask>::value), "expect (std::is_base_of<ISimpleTask, TTask>::value)"); + } + + bool operator!() const { + return !Func; + } + }; + + class TSimpleTaskRunner: public TTaskRunnerBase { + public: + TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask>); + ~TSimpleTaskRunner() override; + + private: + // Function to be called on completion of all pending tasks. + TContinueFunc ContinueFunc; + + bool ReplyReceived() override /* override */; + + ISimpleTask* GetImpl() { + return (ISimpleTask*)GetImplBase(); + } + }; + + class ISimpleTask: public ITaskBase { + public: + typedef TSimpleTaskRunner TTaskRunner; + typedef ISimpleTask ITask; + + virtual TContinueFunc Start() = 0; + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/simple_ut.cpp b/library/cpp/messagebus/rain_check/core/simple_ut.cpp new file mode 100644 index 0000000000..d4545e05aa --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/simple_ut.cpp @@ -0,0 +1,59 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/rain_check/test/ut/test.h> + +#include <library/cpp/messagebus/latch.h> + +#include <util/system/event.h> + +using namespace NRainCheck; + +Y_UNIT_TEST_SUITE(RainCheckSimple) { + struct TTaskWithCompletionCallback: public ISimpleTask { + TTestEnv* const Env; + TTestSync* const TestSync; + + TTaskWithCompletionCallback(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + + TSubtaskCompletion SleepCompletion; + + TContinueFunc Start() override { + TestSync->CheckAndIncrement(0); + + Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1)); + SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::SleepCompletionCallback); + + return &TTaskWithCompletionCallback::Last; + } + + void SleepCompletionCallback(TSubtaskCompletion* completion) { + Y_VERIFY(completion == &SleepCompletion); + TestSync->CheckAndIncrement(1); + + Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1)); + SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::NextSleepCompletionCallback); + } + + void NextSleepCompletionCallback(TSubtaskCompletion*) { + TestSync->CheckAndIncrement(2); + } + + TContinueFunc Last() { + TestSync->CheckAndIncrement(3); + return nullptr; + } + }; + + Y_UNIT_TEST(CompletionCallback) { + TTestEnv env; + TTestSync testSync; + + env.SpawnTask<TTaskWithCompletionCallback>(&testSync); + + testSync.WaitForAndIncrement(4); + } +} diff --git a/library/cpp/messagebus/rain_check/core/sleep.cpp b/library/cpp/messagebus/rain_check/core/sleep.cpp new file mode 100644 index 0000000000..f5d0b4cac9 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/sleep.cpp @@ -0,0 +1,47 @@ +#include "rain_check.h" + +#include <util/system/yassert.h> + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; +using namespace NBus; +using namespace NBus::NPrivate; + +TSleepService::TSleepService(::NBus::NPrivate::TScheduler* scheduler) + : Scheduler(scheduler) +{ +} + +NRainCheck::TSleepService::TSleepService() + : SchedulerHolder(new TScheduler) + , Scheduler(SchedulerHolder.Get()) +{ +} + +NRainCheck::TSleepService::~TSleepService() { + if (!!SchedulerHolder) { + Scheduler->Stop(); + } +} + +namespace { + struct TSleepServiceScheduleItem: public IScheduleItem { + ISubtaskListener* const Parent; + + TSleepServiceScheduleItem(ISubtaskListener* parent, TInstant time) + : IScheduleItem(time) + , Parent(parent) + { + } + + void Do() override { + Parent->SetDone(); + } + }; +} + +void TSleepService::Sleep(TSubtaskCompletion* r, TDuration duration) { + TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); + r->SetRunning(current); + Scheduler->Schedule(new TSleepServiceScheduleItem(r, TInstant::Now() + duration)); +} diff --git a/library/cpp/messagebus/rain_check/core/sleep.h b/library/cpp/messagebus/rain_check/core/sleep.h new file mode 100644 index 0000000000..1a7a1f8674 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/sleep.h @@ -0,0 +1,24 @@ +#pragma once + +#include "fwd.h" + +#include <library/cpp/messagebus/scheduler/scheduler.h> + +#include <util/datetime/base.h> + +namespace NRainCheck { + class TSleepService { + private: + THolder< ::NBus::NPrivate::TScheduler> SchedulerHolder; + ::NBus::NPrivate::TScheduler* const Scheduler; + + public: + TSleepService(::NBus::NPrivate::TScheduler*); + TSleepService(); + ~TSleepService(); + + // Wake up a task after given duration. + void Sleep(TSubtaskCompletion* r, TDuration); + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/sleep_ut.cpp b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp new file mode 100644 index 0000000000..2ae85a87b1 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp @@ -0,0 +1,46 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/rain_check/test/ut/test.h> + +#include <util/system/event.h> + +using namespace NRainCheck; +using namespace NActor; + +Y_UNIT_TEST_SUITE(Sleep) { + struct TTestTask: public ISimpleTask { + TSimpleEnv* const Env; + TTestSync* const TestSync; + + TTestTask(TSimpleEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + + TSubtaskCompletion Sleep; + + TContinueFunc Start() override { + Env->SleepService.Sleep(&Sleep, TDuration::MilliSeconds(1)); + + TestSync->CheckAndIncrement(0); + + return &TTestTask::Continue; + } + + TContinueFunc Continue() { + TestSync->CheckAndIncrement(1); + return nullptr; + } + }; + + Y_UNIT_TEST(Test) { + TTestSync testSync; + + TSimpleEnv env; + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync); + + testSync.WaitForAndIncrement(2); + } +} diff --git a/library/cpp/messagebus/rain_check/core/spawn.cpp b/library/cpp/messagebus/rain_check/core/spawn.cpp new file mode 100644 index 0000000000..c570355fbe --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/spawn.cpp @@ -0,0 +1,5 @@ +#include "spawn.h" + +void NRainCheck::NPrivate::SpawnTaskImpl(TTaskRunnerBase* task) { + task->Schedule(); +} diff --git a/library/cpp/messagebus/rain_check/core/spawn.h b/library/cpp/messagebus/rain_check/core/spawn.h new file mode 100644 index 0000000000..f2b146bf29 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/spawn.h @@ -0,0 +1,50 @@ +#pragma once + +#include "coro.h" +#include "simple.h" +#include "task.h" + +namespace NRainCheck { + namespace NPrivate { + void SpawnTaskImpl(TTaskRunnerBase* task); + + template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam> + TIntrusivePtr<TRunner> SpawnTaskWithRunner(TEnv* env, TParam param1, ISubtaskListener* subtaskListener) { + static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)"); + TIntrusivePtr<TRunner> task(new TRunner(env, subtaskListener, new TTask(env, param1))); + NPrivate::SpawnTaskImpl(task.Get()); + return task; + } + + template <typename TTask, typename ITask, typename TRunner, typename TEnv> + void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion) { + static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)"); + TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); + completion->SetRunning(current); + NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env))); + } + + template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam> + void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion, TParam param) { + static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)"); + TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); + completion->SetRunning(current); + NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env, param))); + } + + } + + // Instantiate and start a task with given parameter. + template <typename TTask, typename TEnv, typename TParam> + TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TEnv* env, TParam param1, ISubtaskListener* subtaskListener = &TNopSubtaskListener::Instance) { + return NPrivate::SpawnTaskWithRunner< + TTask, typename TTask::ITask, typename TTask::TTaskRunner, TEnv, TParam>(env, param1, subtaskListener); + } + + // Instantiate and start subtask of given task. + template <typename TTask, typename TEnv, typename TParam> + void SpawnSubtask(TEnv* env, TSubtaskCompletion* completion, TParam param) { + return NPrivate::SpawnSubtaskWithRunner<TTask, typename TTask::ITask, typename TTask::TTaskRunner>(env, completion, param); + } + +} diff --git a/library/cpp/messagebus/rain_check/core/spawn_ut.cpp b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp new file mode 100644 index 0000000000..ba5a5e41cf --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp @@ -0,0 +1,145 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/rain_check/test/helper/misc.h> +#include <library/cpp/messagebus/rain_check/test/ut/test.h> + +#include <library/cpp/messagebus/latch.h> + +#include <util/system/event.h> + +#include <array> + +using namespace NRainCheck; +using namespace NActor; + +Y_UNIT_TEST_SUITE(Spawn) { + struct TTestTask: public ISimpleTask { + TTestSync* const TestSync; + + TTestTask(TSimpleEnv*, TTestSync* testSync) + : TestSync(testSync) + , I(0) + { + } + + TSystemEvent Started; + + unsigned I; + + TContinueFunc Start() override { + if (I < 4) { + I += 1; + return &TTestTask::Start; + } + TestSync->CheckAndIncrement(0); + return &TTestTask::Continue; + } + + TContinueFunc Continue() { + TestSync->CheckAndIncrement(1); + + Started.Signal(); + return nullptr; + } + }; + + Y_UNIT_TEST(Continuation) { + TTestSync testSync; + + TSimpleEnv env; + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync); + + testSync.WaitForAndIncrement(2); + } + + struct TSubtask: public ISimpleTask { + TTestEnv* const Env; + TTestSync* const TestSync; + + TSubtask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + + TContinueFunc Start() override { + Sleep(TDuration::MilliSeconds(1)); + TestSync->CheckAndIncrement(1); + return nullptr; + } + }; + + struct TSpawnTask: public ISimpleTask { + TTestEnv* const Env; + TTestSync* const TestSync; + + TSpawnTask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + + TSubtaskCompletion SubtaskCompletion; + + TContinueFunc Start() override { + TestSync->CheckAndIncrement(0); + SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync); + return &TSpawnTask::Continue; + } + + TContinueFunc Continue() { + TestSync->CheckAndIncrement(2); + return nullptr; + } + }; + + Y_UNIT_TEST(Subtask) { + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnTask>(&testSync); + + testSync.WaitForAndIncrement(3); + } + + struct TSpawnLongTask: public ISimpleTask { + TTestEnv* const Env; + TTestSync* const TestSync; + unsigned I; + + TSpawnLongTask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + , I(0) + { + } + + std::array<TSubtaskCompletion, 3> Subtasks; + + TContinueFunc Start() override { + if (I == 1000) { + TestSync->CheckAndIncrement(0); + return nullptr; + } + + for (auto& subtask : Subtasks) { + SpawnSubtask<TNopSimpleTask>(Env, &subtask, ""); + } + + ++I; + return &TSpawnLongTask::Start; + } + }; + + Y_UNIT_TEST(SubtaskLong) { + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnLongTask>(&testSync); + + testSync.WaitForAndIncrement(1); + } +} diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp new file mode 100644 index 0000000000..a098437d53 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/task.cpp @@ -0,0 +1,216 @@ +#include "rain_check.h" + +#include <library/cpp/messagebus/actor/temp_tls_vector.h> + +#include <util/system/type_name.h> +#include <util/system/tls.h> + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; + +using namespace NActor; + +namespace { + Y_POD_STATIC_THREAD(TTaskRunnerBase*) + ThreadCurrentTask; +} + +void TNopSubtaskListener::SetDone() { +} + +TNopSubtaskListener TNopSubtaskListener::Instance; + +TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl) + : TActor<TTaskRunnerBase>(env->GetExecutor()) + , Impl(impl) + , ParentTask(parentTask) + //, HoldsSelfReference(false) + , Done(false) + , SetDoneCalled(false) +{ +} + +TTaskRunnerBase::~TTaskRunnerBase() { + Y_ASSERT(Done); +} + +namespace { + struct TRunningInThisThreadGuard { + TTaskRunnerBase* const Task; + TRunningInThisThreadGuard(TTaskRunnerBase* task) + : Task(task) + { + Y_ASSERT(!ThreadCurrentTask); + ThreadCurrentTask = task; + } + + ~TRunningInThisThreadGuard() { + Y_ASSERT(ThreadCurrentTask == Task); + ThreadCurrentTask = nullptr; + } + }; +} + +void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) { + Y_ASSERT(RefCount() > 0); + + TRunningInThisThreadGuard g(this); + + //RetainRef(); + + for (;;) { + TTempTlsVector<TSubtaskCompletion*> temp; + + temp.GetVector()->swap(Pending); + + for (auto& pending : *temp.GetVector()) { + if (pending->IsComplete()) { + pending->FireCompletionCallback(GetImplBase()); + } else { + Pending.push_back(pending); + } + } + + if (!Pending.empty()) { + return; + } + + if (!Done) { + Done = !ReplyReceived(); + } else { + if (Pending.empty()) { + if (!SetDoneCalled) { + ParentTask->SetDone(); + SetDoneCalled = true; + } + //ReleaseRef(); + return; + } + } + } +} + +bool TTaskRunnerBase::IsRunningInThisThread() const { + return ThreadCurrentTask == this; +} + +TSubtaskCompletion::~TSubtaskCompletion() { + ESubtaskState state = State.Get(); + Y_ASSERT(state == CREATED || state == DONE || state == CANCELED); +} + +void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) { + Y_ASSERT(IsComplete()); + + if (!!CompletionFunc) { + TSubtaskCompletionFunc temp = CompletionFunc; + // completion func must be reset before calling it, + // because function may set it back + CompletionFunc = TSubtaskCompletionFunc(); + (task->*(temp.Func))(this); + } +} + +void NRainCheck::TSubtaskCompletion::Cancel() { + for (;;) { + ESubtaskState state = State.Get(); + if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) { + return; + } + if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) { + return; + } + if (state == DONE && State.CompareAndSet(DONE, CANCELED)) { + return; + } + if (state == CANCEL_REQUESTED || state == CANCELED) { + return; + } + } +} + +void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) { + Y_ASSERT(!TaskRunner); + Y_ASSERT(!!parent); + + TaskRunner = parent; + + parent->Pending.push_back(this); + + parent->RefV(); + + for (;;) { + ESubtaskState current = State.Get(); + if (current != CREATED && current != DONE) { + Y_FAIL("current state should be CREATED or DONE: %s", ToCString(current)); + } + if (State.CompareAndSet(current, RUNNING)) { + return; + } + } +} + +void TSubtaskCompletion::SetDone() { + Y_ASSERT(!!TaskRunner); + TTaskRunnerBase* temp = TaskRunner; + TaskRunner = nullptr; + + for (;;) { + ESubtaskState state = State.Get(); + if (state == RUNNING) { + if (State.CompareAndSet(RUNNING, DONE)) { + break; + } + } else if (state == CANCEL_REQUESTED) { + if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) { + break; + } + } else { + Y_FAIL("cannot SetDone: unknown state: %s", ToCString(state)); + } + } + + temp->ScheduleV(); + temp->UnRefV(); +} + +#if 0 +void NRainCheck::TTaskRunnerBase::RetainRef() +{ + if (HoldsSelfReference) { + return; + } + HoldsSelfReference = true; + Ref(); +} + +void NRainCheck::TTaskRunnerBase::ReleaseRef() +{ + if (!HoldsSelfReference) { + return; + } + HoldsSelfReference = false; + DecRef(); +} +#endif + +void TTaskRunnerBase::AssertInThisThread() const { + Y_ASSERT(IsRunningInThisThread()); +} + +TTaskRunnerBase* TTaskRunnerBase::CurrentTask() { + Y_VERIFY(!!ThreadCurrentTask); + return ThreadCurrentTask; +} + +ITaskBase* TTaskRunnerBase::CurrentTaskImpl() { + return CurrentTask()->GetImplBase(); +} + +TString TTaskRunnerBase::GetStatusSingleLine() { + return TypeName(*Impl); +} + +bool NRainCheck::AreWeInsideTask() { + return ThreadCurrentTask != nullptr; +} diff --git a/library/cpp/messagebus/rain_check/core/task.h b/library/cpp/messagebus/rain_check/core/task.h new file mode 100644 index 0000000000..7d8778bcda --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/task.h @@ -0,0 +1,184 @@ +#pragma once + +#include "fwd.h" + +#include <library/cpp/messagebus/actor/actor.h> +#include <library/cpp/messagebus/misc/atomic_box.h> + +#include <library/cpp/deprecated/enum_codegen/enum_codegen.h> + +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/thread/lfstack.h> + +namespace NRainCheck { + struct ISubtaskListener { + virtual void SetDone() = 0; + virtual ~ISubtaskListener() { + } + }; + + struct TNopSubtaskListener: public ISubtaskListener { + void SetDone() override; + + static TNopSubtaskListener Instance; + }; + + class TSubtaskCompletionFunc { + friend class TSubtaskCompletion; + + typedef void (ITaskBase::*TFunc)(TSubtaskCompletion*); + TFunc Func; + + public: + TSubtaskCompletionFunc() + : Func(nullptr) + { + } + + TSubtaskCompletionFunc(void*) + : Func(nullptr) + { + } + + template <typename TTask> + TSubtaskCompletionFunc(void (TTask::*func)(TSubtaskCompletion*)) + : Func((TFunc)func) + { + static_assert((std::is_base_of<ITaskBase, TTask>::value), "expect (std::is_base_of<ITaskBase, TTask>::value)"); + } + + bool operator!() const { + return !Func; + } + }; + + template <typename T> + class TTaskFuture; + +#define SUBTASK_STATE_MAP(XX) \ + XX(CREATED, "Initial") \ + XX(RUNNING, "Running") \ + XX(DONE, "Completed") \ + XX(CANCEL_REQUESTED, "Cancel requested, but still executing") \ + XX(CANCELED, "Canceled") \ + /**/ + + enum ESubtaskState { + SUBTASK_STATE_MAP(ENUM_VALUE_GEN_NO_VALUE) + }; + + ENUM_TO_STRING(ESubtaskState, SUBTASK_STATE_MAP) + + class TSubtaskCompletion : TNonCopyable, public ISubtaskListener { + friend struct TTaskAccessor; + + private: + TAtomicBox<ESubtaskState> State; + TTaskRunnerBase* volatile TaskRunner; + TSubtaskCompletionFunc CompletionFunc; + + public: + TSubtaskCompletion() + : State(CREATED) + , TaskRunner() + { + } + ~TSubtaskCompletion() override; + + // Either done or cancel requested or cancelled + bool IsComplete() const { + ESubtaskState state = State.Get(); + switch (state) { + case RUNNING: + return false; + case DONE: + return true; + case CANCEL_REQUESTED: + return false; + case CANCELED: + return true; + case CREATED: + Y_FAIL("not started"); + default: + Y_FAIL("unknown value: %u", (unsigned)state); + } + } + + void FireCompletionCallback(ITaskBase*); + + void SetCompletionCallback(TSubtaskCompletionFunc func) { + CompletionFunc = func; + } + + // Completed, but not cancelled + bool IsDone() const { + return State.Get() == DONE; + } + + // Request cancel by actor + // Does nothing but marks task cancelled, + // and allows proceeding to next callback + void Cancel(); + + // called by service provider implementations + // must not be called by actor + void SetRunning(TTaskRunnerBase* parent); + void SetDone() override; + }; + + // See ISimpleTask, ICoroTask + class TTaskRunnerBase: public TAtomicRefCount<TTaskRunnerBase>, public NActor::TActor<TTaskRunnerBase> { + friend class NActor::TActor<TTaskRunnerBase>; + friend class TContinueFunc; + friend struct TTaskAccessor; + friend class TSubtaskCompletion; + + private: + THolder<ITaskBase> Impl; + + ISubtaskListener* const ParentTask; + // While task is running, it holds extra reference to self. + //bool HoldsSelfReference; + bool Done; + bool SetDoneCalled; + + // Subtasks currently executed. + TVector<TSubtaskCompletion*> Pending; + + void Act(NActor::TDefaultTag); + + public: + // Construct task. Task is not automatically started. + TTaskRunnerBase(IEnv*, ISubtaskListener* parent, TAutoPtr<ITaskBase> impl); + ~TTaskRunnerBase() override; + + bool IsRunningInThisThread() const; + void AssertInThisThread() const; + static TTaskRunnerBase* CurrentTask(); + static ITaskBase* CurrentTaskImpl(); + + TString GetStatusSingleLine(); + + protected: + //void RetainRef(); + //void ReleaseRef(); + ITaskBase* GetImplBase() { + return Impl.Get(); + } + + private: + // true if need to call again + virtual bool ReplyReceived() = 0; + }; + + class ITaskBase { + public: + virtual ~ITaskBase() { + } + }; + + // Check that current method executed inside some task. + bool AreWeInsideTask(); + +} diff --git a/library/cpp/messagebus/rain_check/core/track.cpp b/library/cpp/messagebus/rain_check/core/track.cpp new file mode 100644 index 0000000000..092a51a214 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/track.cpp @@ -0,0 +1,66 @@ +#include "track.h" + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; + +void TTaskTrackerReceipt::SetDone() { + TaskTracker->GetQueue<TTaskTrackerReceipt*>()->EnqueueAndSchedule(this); +} + +TString TTaskTrackerReceipt::GetStatusSingleLine() { + return Task->GetStatusSingleLine(); +} + +TTaskTracker::TTaskTracker(NActor::TExecutor* executor) + : NActor::TActor<TTaskTracker>(executor) +{ +} + +TTaskTracker::~TTaskTracker() { + Y_ASSERT(Tasks.Empty()); +} + +void TTaskTracker::Shutdown() { + ShutdownFlag.Set(true); + Schedule(); + ShutdownEvent.WaitI(); +} + +void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, ITaskFactory* taskFactory) { + THolder<ITaskFactory> holder(taskFactory); + + THolder<TTaskTrackerReceipt> receipt(new TTaskTrackerReceipt(this)); + receipt->Task = taskFactory->NewTask(receipt.Get()); + + Tasks.PushBack(receipt.Release()); +} + +void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TTaskTrackerReceipt* receipt) { + Y_ASSERT(!receipt->Empty()); + receipt->Unlink(); + delete receipt; +} + +void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<TTaskTrackerStatus>* status) { + TTaskTrackerStatus s; + s.Size = Tasks.Size(); + status->SetResult(s); +} + +void TTaskTracker::Act(NActor::TDefaultTag) { + GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->DequeueAll(); + GetQueue<ITaskFactory*>()->DequeueAll(); + GetQueue<TTaskTrackerReceipt*>()->DequeueAll(); + + if (ShutdownFlag.Get()) { + if (Tasks.Empty()) { + ShutdownEvent.Signal(); + } + } +} + +ui32 TTaskTracker::Size() { + TAsyncResult<TTaskTrackerStatus> r; + GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->EnqueueAndSchedule(&r); + return r.GetResult().Size; +} diff --git a/library/cpp/messagebus/rain_check/core/track.h b/library/cpp/messagebus/rain_check/core/track.h new file mode 100644 index 0000000000..d387de7574 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/track.h @@ -0,0 +1,97 @@ +#pragma once + +#include "spawn.h" +#include "task.h" + +#include <library/cpp/messagebus/async_result.h> +#include <library/cpp/messagebus/actor/queue_in_actor.h> +#include <library/cpp/messagebus/misc/atomic_box.h> + +#include <util/generic/intrlist.h> +#include <util/system/event.h> + +namespace NRainCheck { + class TTaskTracker; + + namespace NPrivate { + struct ITaskFactory { + virtual TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener*) = 0; + virtual ~ITaskFactory() { + } + }; + + struct TTaskTrackerReceipt: public ISubtaskListener, public TIntrusiveListItem<TTaskTrackerReceipt> { + TTaskTracker* const TaskTracker; + TIntrusivePtr<TTaskRunnerBase> Task; + + TTaskTrackerReceipt(TTaskTracker* taskTracker) + : TaskTracker(taskTracker) + { + } + + void SetDone() override; + + TString GetStatusSingleLine(); + }; + + struct TTaskTrackerStatus { + ui32 Size; + }; + + } + + class TTaskTracker + : public TAtomicRefCount<TTaskTracker>, + public NActor::TActor<TTaskTracker>, + public NActor::TQueueInActor<TTaskTracker, NPrivate::ITaskFactory*>, + public NActor::TQueueInActor<TTaskTracker, NPrivate::TTaskTrackerReceipt*>, + public NActor::TQueueInActor<TTaskTracker, TAsyncResult<NPrivate::TTaskTrackerStatus>*> { + friend struct NPrivate::TTaskTrackerReceipt; + + private: + TAtomicBox<bool> ShutdownFlag; + TSystemEvent ShutdownEvent; + + TIntrusiveList<NPrivate::TTaskTrackerReceipt> Tasks; + + template <typename TItem> + NActor::TQueueInActor<TTaskTracker, TItem>* GetQueue() { + return this; + } + + public: + TTaskTracker(NActor::TExecutor* executor); + ~TTaskTracker() override; + + void Shutdown(); + + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::ITaskFactory*); + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::TTaskTrackerReceipt*); + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<NPrivate::TTaskTrackerStatus>*); + + void Act(NActor::TDefaultTag); + + template <typename TTask, typename TEnv, typename TParam> + void Spawn(TEnv* env, TParam param) { + struct TTaskFactory: public NPrivate::ITaskFactory { + TEnv* const Env; + TParam Param; + + TTaskFactory(TEnv* env, TParam param) + : Env(env) + , Param(param) + { + } + + TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener* subtaskListener) override { + return NRainCheck::SpawnTask<TTask>(Env, Param, subtaskListener).Get(); + } + }; + + GetQueue<NPrivate::ITaskFactory*>()->EnqueueAndSchedule(new TTaskFactory(env, param)); + } + + ui32 Size(); + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/track_ut.cpp b/library/cpp/messagebus/rain_check/core/track_ut.cpp new file mode 100644 index 0000000000..05f7de1319 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/track_ut.cpp @@ -0,0 +1,45 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "track.h" + +#include <library/cpp/messagebus/rain_check/test/helper/misc.h> +#include <library/cpp/messagebus/rain_check/test/ut/test.h> + +using namespace NRainCheck; + +Y_UNIT_TEST_SUITE(TaskTracker) { + struct TTaskForTracker: public ISimpleTask { + TTestSync* const TestSync; + + TTaskForTracker(TTestEnv*, TTestSync* testSync) + : TestSync(testSync) + { + } + + TContinueFunc Start() override { + TestSync->WaitForAndIncrement(0); + TestSync->WaitForAndIncrement(2); + return nullptr; + } + }; + + Y_UNIT_TEST(Simple) { + TTestEnv env; + + TIntrusivePtr<TTaskTracker> tracker(new TTaskTracker(env.GetExecutor())); + + TTestSync testSync; + + tracker->Spawn<TTaskForTracker>(&env, &testSync); + + testSync.WaitFor(1); + + UNIT_ASSERT_VALUES_EQUAL(1u, tracker->Size()); + + testSync.CheckAndIncrement(1); + + testSync.WaitForAndIncrement(3); + + tracker->Shutdown(); + } +} diff --git a/library/cpp/messagebus/rain_check/core/ya.make b/library/cpp/messagebus/rain_check/core/ya.make new file mode 100644 index 0000000000..c6fb5640d4 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/ya.make @@ -0,0 +1,25 @@ +LIBRARY() + +OWNER(g:messagebus) + +PEERDIR( + library/cpp/coroutine/engine + library/cpp/deprecated/enum_codegen + library/cpp/messagebus + library/cpp/messagebus/actor + library/cpp/messagebus/scheduler +) + +SRCS( + coro.cpp + coro_stack.cpp + env.cpp + rain_check.cpp + simple.cpp + sleep.cpp + spawn.cpp + task.cpp + track.cpp +) + +END() |