diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/rain_check | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/rain_check')
42 files changed, 1706 insertions, 1706 deletions
diff --git a/library/cpp/messagebus/rain_check/core/coro.cpp b/library/cpp/messagebus/rain_check/core/coro.cpp index 500841dd5b..eda2fab402 100644 --- a/library/cpp/messagebus/rain_check/core/coro.cpp +++ b/library/cpp/messagebus/rain_check/core/coro.cpp @@ -1,60 +1,60 @@ #include "coro.h" - -#include "coro_stack.h" - + +#include "coro_stack.h" + #include <util/system/tls.h> #include <util/system/yassert.h> - -using namespace NRainCheck; - + +using namespace NRainCheck; + TContClosure TCoroTaskRunner::ContClosure(TCoroTaskRunner* runner, TArrayRef<char> memRegion) { - TContClosure contClosure; - contClosure.TrampoLine = runner; - contClosure.Stack = memRegion; - return contClosure; -} - -TCoroTaskRunner::TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl) - : TTaskRunnerBase(env, parent, impl.Release()) - , Stack(GetImpl()->StackSize) - , ContMachineContext(ContClosure(this, Stack.MemRegion())) - , CoroDone(false) -{ -} - -TCoroTaskRunner::~TCoroTaskRunner() { + TContClosure contClosure; + contClosure.TrampoLine = runner; + contClosure.Stack = memRegion; + return contClosure; +} + +TCoroTaskRunner::TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl) + : TTaskRunnerBase(env, parent, impl.Release()) + , Stack(GetImpl()->StackSize) + , ContMachineContext(ContClosure(this, Stack.MemRegion())) + , CoroDone(false) +{ +} + +TCoroTaskRunner::~TCoroTaskRunner() { Y_ASSERT(CoroDone); -} - +} + Y_POD_STATIC_THREAD(TContMachineContext*) CallerContext; Y_POD_STATIC_THREAD(TCoroTaskRunner*) Task; - + bool TCoroTaskRunner::ReplyReceived() { Y_ASSERT(!CoroDone); - - TContMachineContext me; - - CallerContext = &me; - Task = this; - - me.SwitchTo(&ContMachineContext); - - Stack.VerifyNoStackOverflow(); - + + TContMachineContext me; + + CallerContext = &me; + Task = this; + + me.SwitchTo(&ContMachineContext); + + Stack.VerifyNoStackOverflow(); + Y_ASSERT(CallerContext == &me); Y_ASSERT(Task == this); - - return !CoroDone; -} - + + return !CoroDone; +} + void NRainCheck::TCoroTaskRunner::DoRun() { - GetImpl()->Run(); - CoroDone = true; - ContMachineContext.SwitchTo(CallerContext); -} - + GetImpl()->Run(); + CoroDone = true; + ContMachineContext.SwitchTo(CallerContext); +} + void NRainCheck::ICoroTask::WaitForSubtasks() { - Task->ContMachineContext.SwitchTo(CallerContext); -} + Task->ContMachineContext.SwitchTo(CallerContext); +} diff --git a/library/cpp/messagebus/rain_check/core/coro.h b/library/cpp/messagebus/rain_check/core/coro.h index 95e2a30f9b..bf2fca54bd 100644 --- a/library/cpp/messagebus/rain_check/core/coro.h +++ b/library/cpp/messagebus/rain_check/core/coro.h @@ -1,58 +1,58 @@ -#pragma once - +#pragma once + #include "coro_stack.h" #include "task.h" #include <util/generic/ptr.h> -#include <util/memory/alloc.h> -#include <util/system/align.h> +#include <util/memory/alloc.h> +#include <util/system/align.h> #include <util/system/context.h> #include <util/system/valgrind.h> - -namespace NRainCheck { - class ICoroTask; - - class TCoroTaskRunner: public TTaskRunnerBase, private ITrampoLine { - friend class ICoroTask; - - private: - NPrivate::TCoroStack Stack; - TContMachineContext ContMachineContext; - bool CoroDone; - - public: - TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl); + +namespace NRainCheck { + class ICoroTask; + + class TCoroTaskRunner: public TTaskRunnerBase, private ITrampoLine { + friend class ICoroTask; + + private: + NPrivate::TCoroStack Stack; + TContMachineContext ContMachineContext; + bool CoroDone; + + public: + TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl); ~TCoroTaskRunner() override; - - private: + + private: static TContClosure ContClosure(TCoroTaskRunner* runner, TArrayRef<char> memRegion); - + bool ReplyReceived() override /* override */; - + void DoRun() override /* override */; - + ICoroTask* GetImpl() { return (ICoroTask*)GetImplBase(); } - }; - - class ICoroTask: public ITaskBase { - friend class TCoroTaskRunner; - - private: - size_t StackSize; - - public: - typedef TCoroTaskRunner TTaskRunner; - typedef ICoroTask ITask; - + }; + + class ICoroTask: public ITaskBase { + friend class TCoroTaskRunner; + + private: + size_t StackSize; + + public: + typedef TCoroTaskRunner TTaskRunner; + typedef ICoroTask ITask; + ICoroTask(size_t stackSize = 0x2000) : StackSize(stackSize) { } - - virtual void Run() = 0; - static void WaitForSubtasks(); - }; - -} + + virtual void Run() = 0; + static void WaitForSubtasks(); + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.cpp b/library/cpp/messagebus/rain_check/core/coro_stack.cpp index 83b984ca6e..888d965a23 100644 --- a/library/cpp/messagebus/rain_check/core/coro_stack.cpp +++ b/library/cpp/messagebus/rain_check/core/coro_stack.cpp @@ -1,41 +1,41 @@ #include "coro_stack.h" - -#include <util/generic/singleton.h> -#include <util/system/valgrind.h> - + +#include <util/generic/singleton.h> +#include <util/system/valgrind.h> + #include <cstdlib> #include <stdio.h> - -using namespace NRainCheck; -using namespace NRainCheck::NPrivate; - -TCoroStack::TCoroStack(size_t size) - : SizeValue(size) -{ + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; + +TCoroStack::TCoroStack(size_t size) + : SizeValue(size) +{ Y_VERIFY(size % sizeof(ui32) == 0); Y_VERIFY(size >= 0x1000); - - DataHolder.Reset(malloc(size)); - - // register in valgrind - - *MagicNumberLocation() = MAGIC_NUMBER; - -#if defined(WITH_VALGRIND) + + DataHolder.Reset(malloc(size)); + + // register in valgrind + + *MagicNumberLocation() = MAGIC_NUMBER; + +#if defined(WITH_VALGRIND) ValgrindStackId = VALGRIND_STACK_REGISTER(Data(), (char*)Data() + Size()); -#endif -} - +#endif +} + TCoroStack::~TCoroStack() { -#if defined(WITH_VALGRIND) - VALGRIND_STACK_DEREGISTER(ValgrindStackId); -#endif - - VerifyNoStackOverflow(); -} - -void TCoroStack::FailStackOverflow() { - static const char message[] = "stack overflow\n"; - fputs(message, stderr); - abort(); -} +#if defined(WITH_VALGRIND) + VALGRIND_STACK_DEREGISTER(ValgrindStackId); +#endif + + VerifyNoStackOverflow(); +} + +void TCoroStack::FailStackOverflow() { + static const char message[] = "stack overflow\n"; + fputs(message, stderr); + abort(); +} diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.h b/library/cpp/messagebus/rain_check/core/coro_stack.h index 2f3520e6e4..41ac786470 100644 --- a/library/cpp/messagebus/rain_check/core/coro_stack.h +++ b/library/cpp/messagebus/rain_check/core/coro_stack.h @@ -1,54 +1,54 @@ -#pragma once - +#pragma once + #include <util/generic/array_ref.h> #include <util/generic/ptr.h> -#include <util/system/valgrind.h> - +#include <util/system/valgrind.h> + namespace NRainCheck { namespace NPrivate { struct TCoroStack { THolder<void, TFree> DataHolder; size_t SizeValue; - -#if defined(WITH_VALGRIND) + +#if defined(WITH_VALGRIND) size_t ValgrindStackId; -#endif - +#endif + TCoroStack(size_t size); ~TCoroStack(); - + void* Data() { return DataHolder.Get(); } - + size_t Size() { return SizeValue; } - + TArrayRef<char> MemRegion() { return TArrayRef((char*)Data(), Size()); } - + ui32* MagicNumberLocation() { -#if STACK_GROW_DOWN == 1 +#if STACK_GROW_DOWN == 1 return (ui32*)Data(); -#elif STACK_GROW_DOWN == 0 +#elif STACK_GROW_DOWN == 0 return ((ui32*)(((char*)Data()) + Size())) - 1; -#else -#error "unknown" -#endif +#else +#error "unknown" +#endif } - + static void FailStackOverflow(); - + inline void VerifyNoStackOverflow() noexcept { if (Y_UNLIKELY(*MagicNumberLocation() != MAGIC_NUMBER)) { FailStackOverflow(); } - } - + } + static const ui32 MAGIC_NUMBER = 0xAB4D15FE; }; - + } } diff --git a/library/cpp/messagebus/rain_check/core/coro_ut.cpp b/library/cpp/messagebus/rain_check/core/coro_ut.cpp index 61a33584a5..4ee688f4c1 100644 --- a/library/cpp/messagebus/rain_check/core/coro_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/coro_ut.cpp @@ -1,106 +1,106 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "coro.h" -#include "spawn.h" - +#include "spawn.h" + #include <library/cpp/messagebus/rain_check/test/ut/test.h> - -using namespace NRainCheck; - + +using namespace NRainCheck; + Y_UNIT_TEST_SUITE(RainCheckCoro) { struct TSimpleCoroTask : ICoroTask { - TTestSync* const TestSync; - - TSimpleCoroTask(TTestEnv*, TTestSync* testSync) - : TestSync(testSync) - { - } - + TTestSync* const TestSync; + + TSimpleCoroTask(TTestEnv*, TTestSync* testSync) + : TestSync(testSync) + { + } + void Run() override { - TestSync->WaitForAndIncrement(0); - } - }; - + TestSync->WaitForAndIncrement(0); + } + }; + Y_UNIT_TEST(Simple) { - TTestSync testSync; - - TTestEnv env; - - TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSimpleCoroTask>(&testSync); - testSync.WaitForAndIncrement(1); - } - + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSimpleCoroTask>(&testSync); + testSync.WaitForAndIncrement(1); + } + struct TSleepCoroTask : ICoroTask { - TTestEnv* const Env; - TTestSync* const TestSync; - - TSleepCoroTask(TTestEnv* env, TTestSync* testSync) - : Env(env) - , TestSync(testSync) - { - } - - TSubtaskCompletion SleepCompletion; - + TTestEnv* const Env; + TTestSync* const TestSync; + + TSleepCoroTask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + + TSubtaskCompletion SleepCompletion; + void Run() override { - Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1)); - WaitForSubtasks(); - TestSync->WaitForAndIncrement(0); - } - }; - + Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1)); + WaitForSubtasks(); + TestSync->WaitForAndIncrement(0); + } + }; + Y_UNIT_TEST(Sleep) { - TTestSync testSync; - - TTestEnv env; - - TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSleepCoroTask>(&testSync); - - testSync.WaitForAndIncrement(1); - } - + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSleepCoroTask>(&testSync); + + testSync.WaitForAndIncrement(1); + } + struct TSubtask : ICoroTask { - TTestEnv* const Env; - TTestSync* const TestSync; - - TSubtask(TTestEnv* env, TTestSync* testSync) - : Env(env) - , TestSync(testSync) - { - } - + TTestEnv* const Env; + TTestSync* const TestSync; + + TSubtask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + void Run() override { - TestSync->CheckAndIncrement(1); - } - }; - + TestSync->CheckAndIncrement(1); + } + }; + struct TSpawnCoroTask : ICoroTask { - TTestEnv* const Env; - TTestSync* const TestSync; - - TSpawnCoroTask(TTestEnv* env, TTestSync* testSync) - : Env(env) - , TestSync(testSync) - { - } - - TSubtaskCompletion SubtaskCompletion; - + TTestEnv* const Env; + TTestSync* const TestSync; + + TSpawnCoroTask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + { + } + + TSubtaskCompletion SubtaskCompletion; + void Run() override { - TestSync->CheckAndIncrement(0); - SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync); - WaitForSubtasks(); - TestSync->CheckAndIncrement(2); - } - }; - + TestSync->CheckAndIncrement(0); + SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync); + WaitForSubtasks(); + TestSync->CheckAndIncrement(2); + } + }; + Y_UNIT_TEST(Spawn) { - TTestSync testSync; - - TTestEnv env; - - TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSpawnCoroTask>(&testSync); - - testSync.WaitForAndIncrement(3); - } -} + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSpawnCoroTask>(&testSync); + + testSync.WaitForAndIncrement(3); + } +} diff --git a/library/cpp/messagebus/rain_check/core/env.cpp b/library/cpp/messagebus/rain_check/core/env.cpp index fdc0000dbd..150d63d9bb 100644 --- a/library/cpp/messagebus/rain_check/core/env.cpp +++ b/library/cpp/messagebus/rain_check/core/env.cpp @@ -1,3 +1,3 @@ -#include "env.h" - -using namespace NRainCheck; +#include "env.h" + +using namespace NRainCheck; diff --git a/library/cpp/messagebus/rain_check/core/env.h b/library/cpp/messagebus/rain_check/core/env.h index f6dd7fceb6..4e289dbd3d 100644 --- a/library/cpp/messagebus/rain_check/core/env.h +++ b/library/cpp/messagebus/rain_check/core/env.h @@ -1,47 +1,47 @@ -#pragma once - +#pragma once + #include "sleep.h" #include "spawn.h" - + #include <library/cpp/messagebus/actor/executor.h> - + #include <util/generic/ptr.h> - -namespace NRainCheck { - struct IEnv { - virtual ::NActor::TExecutor* GetExecutor() = 0; + +namespace NRainCheck { + struct IEnv { + virtual ::NActor::TExecutor* GetExecutor() = 0; virtual ~IEnv() { } - }; - - template <typename TSelf> - struct TEnvTemplate: public IEnv { - template <typename TTask, typename TParam> - TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TParam param) { + }; + + template <typename TSelf> + struct TEnvTemplate: public IEnv { + template <typename TTask, typename TParam> + TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TParam param) { return ::NRainCheck::SpawnTask<TTask, TSelf>((TSelf*)this, param); - } - }; - - template <typename TSelf> - struct TSimpleEnvTemplate: public TEnvTemplate<TSelf> { - ::NActor::TExecutorPtr Executor; - TSleepService SleepService; - - TSimpleEnvTemplate(unsigned threadCount = 0) - : Executor(new ::NActor::TExecutor(threadCount != 0 ? threadCount : 4)) + } + }; + + template <typename TSelf> + struct TSimpleEnvTemplate: public TEnvTemplate<TSelf> { + ::NActor::TExecutorPtr Executor; + TSleepService SleepService; + + TSimpleEnvTemplate(unsigned threadCount = 0) + : Executor(new ::NActor::TExecutor(threadCount != 0 ? threadCount : 4)) { } - + ::NActor::TExecutor* GetExecutor() override { return Executor.Get(); } - }; - - struct TSimpleEnv: public TSimpleEnvTemplate<TSimpleEnv> { + }; + + struct TSimpleEnv: public TSimpleEnvTemplate<TSimpleEnv> { TSimpleEnv(unsigned threadCount = 0) : TSimpleEnvTemplate<TSimpleEnv>(threadCount) { } - }; - -} + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/fwd.h b/library/cpp/messagebus/rain_check/core/fwd.h index b43ff8c17c..2f8f1d4754 100644 --- a/library/cpp/messagebus/rain_check/core/fwd.h +++ b/library/cpp/messagebus/rain_check/core/fwd.h @@ -1,18 +1,18 @@ -#pragma once - -namespace NRainCheck { - namespace NPrivate { - } - - class ITaskBase; - class ISimpleTask; - class ICoroTask; - - struct ISubtaskListener; - - class TTaskRunnerBase; - - class TSubtaskCompletion; - struct IEnv; - -} +#pragma once + +namespace NRainCheck { + namespace NPrivate { + } + + class ITaskBase; + class ISimpleTask; + class ICoroTask; + + struct ISubtaskListener; + + class TTaskRunnerBase; + + class TSubtaskCompletion; + struct IEnv; + +} diff --git a/library/cpp/messagebus/rain_check/core/rain_check.cpp b/library/cpp/messagebus/rain_check/core/rain_check.cpp index 2ea1f9e21b..63bc300554 100644 --- a/library/cpp/messagebus/rain_check/core/rain_check.cpp +++ b/library/cpp/messagebus/rain_check/core/rain_check.cpp @@ -1 +1 @@ -#include "rain_check.h" +#include "rain_check.h" diff --git a/library/cpp/messagebus/rain_check/core/rain_check.h b/library/cpp/messagebus/rain_check/core/rain_check.h index 0f289717a2..a97de4537e 100644 --- a/library/cpp/messagebus/rain_check/core/rain_check.h +++ b/library/cpp/messagebus/rain_check/core/rain_check.h @@ -1,8 +1,8 @@ -#pragma once - -#include "coro.h" +#pragma once + +#include "coro.h" #include "env.h" -#include "simple.h" -#include "sleep.h" -#include "spawn.h" +#include "simple.h" +#include "sleep.h" +#include "spawn.h" #include "task.h" diff --git a/library/cpp/messagebus/rain_check/core/simple.cpp b/library/cpp/messagebus/rain_check/core/simple.cpp index 70182b2f93..8dc71a84ee 100644 --- a/library/cpp/messagebus/rain_check/core/simple.cpp +++ b/library/cpp/messagebus/rain_check/core/simple.cpp @@ -1,18 +1,18 @@ -#include "simple.h" - -using namespace NRainCheck; - -TSimpleTaskRunner::TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask> impl) - : TTaskRunnerBase(env, parentTask, impl.Release()) - , ContinueFunc(&ISimpleTask::Start) -{ -} - -TSimpleTaskRunner::~TSimpleTaskRunner() { +#include "simple.h" + +using namespace NRainCheck; + +TSimpleTaskRunner::TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask> impl) + : TTaskRunnerBase(env, parentTask, impl.Release()) + , ContinueFunc(&ISimpleTask::Start) +{ +} + +TSimpleTaskRunner::~TSimpleTaskRunner() { Y_ASSERT(!ContinueFunc); -} - +} + bool TSimpleTaskRunner::ReplyReceived() { - ContinueFunc = (GetImpl()->*(ContinueFunc.Func))(); - return !!ContinueFunc; -} + ContinueFunc = (GetImpl()->*(ContinueFunc.Func))(); + return !!ContinueFunc; +} diff --git a/library/cpp/messagebus/rain_check/core/simple.h b/library/cpp/messagebus/rain_check/core/simple.h index 20e1bf19f5..eeee4c8c23 100644 --- a/library/cpp/messagebus/rain_check/core/simple.h +++ b/library/cpp/messagebus/rain_check/core/simple.h @@ -1,62 +1,62 @@ -#pragma once - -#include "task.h" - -namespace NRainCheck { - class ISimpleTask; - - // Function called on continue - class TContinueFunc { - friend class TSimpleTaskRunner; - - typedef TContinueFunc (ISimpleTask::*TFunc)(); - TFunc Func; - - public: - TContinueFunc() +#pragma once + +#include "task.h" + +namespace NRainCheck { + class ISimpleTask; + + // Function called on continue + class TContinueFunc { + friend class TSimpleTaskRunner; + + typedef TContinueFunc (ISimpleTask::*TFunc)(); + TFunc Func; + + public: + TContinueFunc() : Func(nullptr) { } - - TContinueFunc(void*) + + TContinueFunc(void*) : Func(nullptr) { } - - template <typename TTask> - TContinueFunc(TContinueFunc (TTask::*func)()) + + template <typename TTask> + TContinueFunc(TContinueFunc (TTask::*func)()) : Func((TFunc)func) - { + { static_assert((std::is_base_of<ISimpleTask, TTask>::value), "expect (std::is_base_of<ISimpleTask, TTask>::value)"); - } - - bool operator!() const { - return !Func; - } - }; - - class TSimpleTaskRunner: public TTaskRunnerBase { - public: - TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask>); + } + + bool operator!() const { + return !Func; + } + }; + + class TSimpleTaskRunner: public TTaskRunnerBase { + public: + TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask>); ~TSimpleTaskRunner() override; - - private: - // Function to be called on completion of all pending tasks. - TContinueFunc ContinueFunc; - + + private: + // Function to be called on completion of all pending tasks. + TContinueFunc ContinueFunc; + bool ReplyReceived() override /* override */; - + ISimpleTask* GetImpl() { return (ISimpleTask*)GetImplBase(); } - }; - - class ISimpleTask: public ITaskBase { - public: - typedef TSimpleTaskRunner TTaskRunner; - typedef ISimpleTask ITask; - - virtual TContinueFunc Start() = 0; - }; - -} + }; + + class ISimpleTask: public ITaskBase { + public: + typedef TSimpleTaskRunner TTaskRunner; + typedef ISimpleTask ITask; + + virtual TContinueFunc Start() = 0; + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/simple_ut.cpp b/library/cpp/messagebus/rain_check/core/simple_ut.cpp index d4545e05aa..97b5db2d89 100644 --- a/library/cpp/messagebus/rain_check/core/simple_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/simple_ut.cpp @@ -1,59 +1,59 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/rain_check/test/ut/test.h> #include <library/cpp/messagebus/latch.h> - + #include <util/system/event.h> - -using namespace NRainCheck; - + +using namespace NRainCheck; + Y_UNIT_TEST_SUITE(RainCheckSimple) { - struct TTaskWithCompletionCallback: public ISimpleTask { - TTestEnv* const Env; - TTestSync* const TestSync; - - TTaskWithCompletionCallback(TTestEnv* env, TTestSync* testSync) - : Env(env) - , TestSync(testSync) + struct TTaskWithCompletionCallback: public ISimpleTask { + TTestEnv* const Env; + TTestSync* const TestSync; + + TTaskWithCompletionCallback(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) { } - - TSubtaskCompletion SleepCompletion; - + + TSubtaskCompletion SleepCompletion; + TContinueFunc Start() override { - TestSync->CheckAndIncrement(0); - - Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1)); - SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::SleepCompletionCallback); - - return &TTaskWithCompletionCallback::Last; - } - - void SleepCompletionCallback(TSubtaskCompletion* completion) { + TestSync->CheckAndIncrement(0); + + Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1)); + SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::SleepCompletionCallback); + + return &TTaskWithCompletionCallback::Last; + } + + void SleepCompletionCallback(TSubtaskCompletion* completion) { Y_VERIFY(completion == &SleepCompletion); - TestSync->CheckAndIncrement(1); - - Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1)); - SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::NextSleepCompletionCallback); - } - + TestSync->CheckAndIncrement(1); + + Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1)); + SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::NextSleepCompletionCallback); + } + void NextSleepCompletionCallback(TSubtaskCompletion*) { - TestSync->CheckAndIncrement(2); - } - - TContinueFunc Last() { - TestSync->CheckAndIncrement(3); + TestSync->CheckAndIncrement(2); + } + + TContinueFunc Last() { + TestSync->CheckAndIncrement(3); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(CompletionCallback) { - TTestEnv env; - TTestSync testSync; - - env.SpawnTask<TTaskWithCompletionCallback>(&testSync); - - testSync.WaitForAndIncrement(4); - } -} + TTestEnv env; + TTestSync testSync; + + env.SpawnTask<TTaskWithCompletionCallback>(&testSync); + + testSync.WaitForAndIncrement(4); + } +} diff --git a/library/cpp/messagebus/rain_check/core/sleep.cpp b/library/cpp/messagebus/rain_check/core/sleep.cpp index f5d0b4cac9..10b875bc79 100644 --- a/library/cpp/messagebus/rain_check/core/sleep.cpp +++ b/library/cpp/messagebus/rain_check/core/sleep.cpp @@ -1,47 +1,47 @@ #include "rain_check.h" -#include <util/system/yassert.h> - -using namespace NRainCheck; -using namespace NRainCheck::NPrivate; -using namespace NBus; -using namespace NBus::NPrivate; - -TSleepService::TSleepService(::NBus::NPrivate::TScheduler* scheduler) - : Scheduler(scheduler) -{ -} - -NRainCheck::TSleepService::TSleepService() - : SchedulerHolder(new TScheduler) - , Scheduler(SchedulerHolder.Get()) -{ -} - +#include <util/system/yassert.h> + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; +using namespace NBus; +using namespace NBus::NPrivate; + +TSleepService::TSleepService(::NBus::NPrivate::TScheduler* scheduler) + : Scheduler(scheduler) +{ +} + +NRainCheck::TSleepService::TSleepService() + : SchedulerHolder(new TScheduler) + , Scheduler(SchedulerHolder.Get()) +{ +} + NRainCheck::TSleepService::~TSleepService() { - if (!!SchedulerHolder) { - Scheduler->Stop(); - } -} - -namespace { - struct TSleepServiceScheduleItem: public IScheduleItem { - ISubtaskListener* const Parent; - - TSleepServiceScheduleItem(ISubtaskListener* parent, TInstant time) - : IScheduleItem(time) - , Parent(parent) - { - } - + if (!!SchedulerHolder) { + Scheduler->Stop(); + } +} + +namespace { + struct TSleepServiceScheduleItem: public IScheduleItem { + ISubtaskListener* const Parent; + + TSleepServiceScheduleItem(ISubtaskListener* parent, TInstant time) + : IScheduleItem(time) + , Parent(parent) + { + } + void Do() override { - Parent->SetDone(); - } - }; -} - + Parent->SetDone(); + } + }; +} + void TSleepService::Sleep(TSubtaskCompletion* r, TDuration duration) { - TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); - r->SetRunning(current); - Scheduler->Schedule(new TSleepServiceScheduleItem(r, TInstant::Now() + duration)); -} + TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); + r->SetRunning(current); + Scheduler->Schedule(new TSleepServiceScheduleItem(r, TInstant::Now() + duration)); +} diff --git a/library/cpp/messagebus/rain_check/core/sleep.h b/library/cpp/messagebus/rain_check/core/sleep.h index 1a7a1f8674..b5b343de98 100644 --- a/library/cpp/messagebus/rain_check/core/sleep.h +++ b/library/cpp/messagebus/rain_check/core/sleep.h @@ -1,24 +1,24 @@ -#pragma once - +#pragma once + #include "fwd.h" - + #include <library/cpp/messagebus/scheduler/scheduler.h> - + #include <util/datetime/base.h> + +namespace NRainCheck { + class TSleepService { + private: + THolder< ::NBus::NPrivate::TScheduler> SchedulerHolder; + ::NBus::NPrivate::TScheduler* const Scheduler; -namespace NRainCheck { - class TSleepService { - private: - THolder< ::NBus::NPrivate::TScheduler> SchedulerHolder; - ::NBus::NPrivate::TScheduler* const Scheduler; - - public: - TSleepService(::NBus::NPrivate::TScheduler*); - TSleepService(); - ~TSleepService(); - - // Wake up a task after given duration. - void Sleep(TSubtaskCompletion* r, TDuration); - }; - -} + public: + TSleepService(::NBus::NPrivate::TScheduler*); + TSleepService(); + ~TSleepService(); + + // Wake up a task after given duration. + void Sleep(TSubtaskCompletion* r, TDuration); + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/sleep_ut.cpp b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp index 2ae85a87b1..3c92fa2ca7 100644 --- a/library/cpp/messagebus/rain_check/core/sleep_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp @@ -1,46 +1,46 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/rain_check/test/ut/test.h> - + #include <util/system/event.h> -using namespace NRainCheck; -using namespace NActor; - +using namespace NRainCheck; +using namespace NActor; + Y_UNIT_TEST_SUITE(Sleep) { - struct TTestTask: public ISimpleTask { - TSimpleEnv* const Env; - TTestSync* const TestSync; - - TTestTask(TSimpleEnv* env, TTestSync* testSync) - : Env(env) - , TestSync(testSync) + struct TTestTask: public ISimpleTask { + TSimpleEnv* const Env; + TTestSync* const TestSync; + + TTestTask(TSimpleEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) { } - - TSubtaskCompletion Sleep; - + + TSubtaskCompletion Sleep; + TContinueFunc Start() override { - Env->SleepService.Sleep(&Sleep, TDuration::MilliSeconds(1)); - - TestSync->CheckAndIncrement(0); - - return &TTestTask::Continue; - } - - TContinueFunc Continue() { - TestSync->CheckAndIncrement(1); + Env->SleepService.Sleep(&Sleep, TDuration::MilliSeconds(1)); + + TestSync->CheckAndIncrement(0); + + return &TTestTask::Continue; + } + + TContinueFunc Continue() { + TestSync->CheckAndIncrement(1); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Test) { - TTestSync testSync; - - TSimpleEnv env; - - TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync); - - testSync.WaitForAndIncrement(2); - } -} + TTestSync testSync; + + TSimpleEnv env; + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync); + + testSync.WaitForAndIncrement(2); + } +} diff --git a/library/cpp/messagebus/rain_check/core/spawn.cpp b/library/cpp/messagebus/rain_check/core/spawn.cpp index c570355fbe..d8fc78c129 100644 --- a/library/cpp/messagebus/rain_check/core/spawn.cpp +++ b/library/cpp/messagebus/rain_check/core/spawn.cpp @@ -1,5 +1,5 @@ -#include "spawn.h" - +#include "spawn.h" + void NRainCheck::NPrivate::SpawnTaskImpl(TTaskRunnerBase* task) { - task->Schedule(); -} + task->Schedule(); +} diff --git a/library/cpp/messagebus/rain_check/core/spawn.h b/library/cpp/messagebus/rain_check/core/spawn.h index f2b146bf29..33ba955e0a 100644 --- a/library/cpp/messagebus/rain_check/core/spawn.h +++ b/library/cpp/messagebus/rain_check/core/spawn.h @@ -1,50 +1,50 @@ -#pragma once - -#include "coro.h" -#include "simple.h" +#pragma once + +#include "coro.h" +#include "simple.h" #include "task.h" - -namespace NRainCheck { - namespace NPrivate { - void SpawnTaskImpl(TTaskRunnerBase* task); - - template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam> - TIntrusivePtr<TRunner> SpawnTaskWithRunner(TEnv* env, TParam param1, ISubtaskListener* subtaskListener) { + +namespace NRainCheck { + namespace NPrivate { + void SpawnTaskImpl(TTaskRunnerBase* task); + + template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam> + TIntrusivePtr<TRunner> SpawnTaskWithRunner(TEnv* env, TParam param1, ISubtaskListener* subtaskListener) { static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)"); - TIntrusivePtr<TRunner> task(new TRunner(env, subtaskListener, new TTask(env, param1))); - NPrivate::SpawnTaskImpl(task.Get()); - return task; - } - - template <typename TTask, typename ITask, typename TRunner, typename TEnv> - void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion) { + TIntrusivePtr<TRunner> task(new TRunner(env, subtaskListener, new TTask(env, param1))); + NPrivate::SpawnTaskImpl(task.Get()); + return task; + } + + template <typename TTask, typename ITask, typename TRunner, typename TEnv> + void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion) { static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)"); - TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); - completion->SetRunning(current); - NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env))); - } - - template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam> - void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion, TParam param) { + TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); + completion->SetRunning(current); + NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env))); + } + + template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam> + void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion, TParam param) { static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)"); - TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); - completion->SetRunning(current); - NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env, param))); - } - - } - - // Instantiate and start a task with given parameter. - template <typename TTask, typename TEnv, typename TParam> - TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TEnv* env, TParam param1, ISubtaskListener* subtaskListener = &TNopSubtaskListener::Instance) { - return NPrivate::SpawnTaskWithRunner< + TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); + completion->SetRunning(current); + NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env, param))); + } + + } + + // Instantiate and start a task with given parameter. + template <typename TTask, typename TEnv, typename TParam> + TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TEnv* env, TParam param1, ISubtaskListener* subtaskListener = &TNopSubtaskListener::Instance) { + return NPrivate::SpawnTaskWithRunner< TTask, typename TTask::ITask, typename TTask::TTaskRunner, TEnv, TParam>(env, param1, subtaskListener); - } - - // Instantiate and start subtask of given task. - template <typename TTask, typename TEnv, typename TParam> - void SpawnSubtask(TEnv* env, TSubtaskCompletion* completion, TParam param) { - return NPrivate::SpawnSubtaskWithRunner<TTask, typename TTask::ITask, typename TTask::TTaskRunner>(env, completion, param); - } - -} + } + + // Instantiate and start subtask of given task. + template <typename TTask, typename TEnv, typename TParam> + void SpawnSubtask(TEnv* env, TSubtaskCompletion* completion, TParam param) { + return NPrivate::SpawnSubtaskWithRunner<TTask, typename TTask::ITask, typename TTask::TTaskRunner>(env, completion, param); + } + +} diff --git a/library/cpp/messagebus/rain_check/core/spawn_ut.cpp b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp index ba5a5e41cf..2b3ef75c67 100644 --- a/library/cpp/messagebus/rain_check/core/spawn_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp @@ -1,145 +1,145 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/rain_check/test/helper/misc.h> #include <library/cpp/messagebus/rain_check/test/ut/test.h> #include <library/cpp/messagebus/latch.h> - + #include <util/system/event.h> - + #include <array> - -using namespace NRainCheck; -using namespace NActor; - + +using namespace NRainCheck; +using namespace NActor; + Y_UNIT_TEST_SUITE(Spawn) { - struct TTestTask: public ISimpleTask { - TTestSync* const TestSync; - - TTestTask(TSimpleEnv*, TTestSync* testSync) - : TestSync(testSync) - , I(0) + struct TTestTask: public ISimpleTask { + TTestSync* const TestSync; + + TTestTask(TSimpleEnv*, TTestSync* testSync) + : TestSync(testSync) + , I(0) { } - + TSystemEvent Started; - - unsigned I; - + + unsigned I; + TContinueFunc Start() override { - if (I < 4) { - I += 1; - return &TTestTask::Start; - } - TestSync->CheckAndIncrement(0); - return &TTestTask::Continue; - } - - TContinueFunc Continue() { - TestSync->CheckAndIncrement(1); - - Started.Signal(); + if (I < 4) { + I += 1; + return &TTestTask::Start; + } + TestSync->CheckAndIncrement(0); + return &TTestTask::Continue; + } + + TContinueFunc Continue() { + TestSync->CheckAndIncrement(1); + + Started.Signal(); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Continuation) { - TTestSync testSync; - - TSimpleEnv env; - - TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync); - - testSync.WaitForAndIncrement(2); - } - - struct TSubtask: public ISimpleTask { - TTestEnv* const Env; - TTestSync* const TestSync; - - TSubtask(TTestEnv* env, TTestSync* testSync) - : Env(env) - , TestSync(testSync) + TTestSync testSync; + + TSimpleEnv env; + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync); + + testSync.WaitForAndIncrement(2); + } + + struct TSubtask: public ISimpleTask { + TTestEnv* const Env; + TTestSync* const TestSync; + + TSubtask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) { } - + TContinueFunc Start() override { - Sleep(TDuration::MilliSeconds(1)); - TestSync->CheckAndIncrement(1); + Sleep(TDuration::MilliSeconds(1)); + TestSync->CheckAndIncrement(1); return nullptr; - } - }; - - struct TSpawnTask: public ISimpleTask { - TTestEnv* const Env; - TTestSync* const TestSync; - - TSpawnTask(TTestEnv* env, TTestSync* testSync) - : Env(env) - , TestSync(testSync) + } + }; + + struct TSpawnTask: public ISimpleTask { + TTestEnv* const Env; + TTestSync* const TestSync; + + TSpawnTask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) { } - - TSubtaskCompletion SubtaskCompletion; - + + TSubtaskCompletion SubtaskCompletion; + TContinueFunc Start() override { - TestSync->CheckAndIncrement(0); - SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync); - return &TSpawnTask::Continue; - } - - TContinueFunc Continue() { - TestSync->CheckAndIncrement(2); + TestSync->CheckAndIncrement(0); + SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync); + return &TSpawnTask::Continue; + } + + TContinueFunc Continue() { + TestSync->CheckAndIncrement(2); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Subtask) { - TTestSync testSync; - - TTestEnv env; - - TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnTask>(&testSync); - - testSync.WaitForAndIncrement(3); - } - - struct TSpawnLongTask: public ISimpleTask { - TTestEnv* const Env; - TTestSync* const TestSync; - unsigned I; - + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnTask>(&testSync); + + testSync.WaitForAndIncrement(3); + } + + struct TSpawnLongTask: public ISimpleTask { + TTestEnv* const Env; + TTestSync* const TestSync; + unsigned I; + TSpawnLongTask(TTestEnv* env, TTestSync* testSync) : Env(env) , TestSync(testSync) , I(0) { } - + std::array<TSubtaskCompletion, 3> Subtasks; - + TContinueFunc Start() override { - if (I == 1000) { - TestSync->CheckAndIncrement(0); + if (I == 1000) { + TestSync->CheckAndIncrement(0); return nullptr; - } - + } + for (auto& subtask : Subtasks) { SpawnSubtask<TNopSimpleTask>(Env, &subtask, ""); - } - - ++I; - return &TSpawnLongTask::Start; - } - }; - + } + + ++I; + return &TSpawnLongTask::Start; + } + }; + Y_UNIT_TEST(SubtaskLong) { - TTestSync testSync; - - TTestEnv env; - - TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnLongTask>(&testSync); - - testSync.WaitForAndIncrement(1); - } -} + TTestSync testSync; + + TTestEnv env; + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnLongTask>(&testSync); + + testSync.WaitForAndIncrement(1); + } +} diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp index a098437d53..d20ae30402 100644 --- a/library/cpp/messagebus/rain_check/core/task.cpp +++ b/library/cpp/messagebus/rain_check/core/task.cpp @@ -1,216 +1,216 @@ #include "rain_check.h" - + #include <library/cpp/messagebus/actor/temp_tls_vector.h> - + #include <util/system/type_name.h> #include <util/system/tls.h> - -using namespace NRainCheck; -using namespace NRainCheck::NPrivate; - -using namespace NActor; - -namespace { + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; + +using namespace NActor; + +namespace { Y_POD_STATIC_THREAD(TTaskRunnerBase*) ThreadCurrentTask; -} - +} + void TNopSubtaskListener::SetDone() { } - -TNopSubtaskListener TNopSubtaskListener::Instance; - -TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl) - : TActor<TTaskRunnerBase>(env->GetExecutor()) - , Impl(impl) - , ParentTask(parentTask) - //, HoldsSelfReference(false) - , Done(false) - , SetDoneCalled(false) -{ -} - -TTaskRunnerBase::~TTaskRunnerBase() { + +TNopSubtaskListener TNopSubtaskListener::Instance; + +TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl) + : TActor<TTaskRunnerBase>(env->GetExecutor()) + , Impl(impl) + , ParentTask(parentTask) + //, HoldsSelfReference(false) + , Done(false) + , SetDoneCalled(false) +{ +} + +TTaskRunnerBase::~TTaskRunnerBase() { Y_ASSERT(Done); -} - -namespace { - struct TRunningInThisThreadGuard { - TTaskRunnerBase* const Task; - TRunningInThisThreadGuard(TTaskRunnerBase* task) - : Task(task) - { +} + +namespace { + struct TRunningInThisThreadGuard { + TTaskRunnerBase* const Task; + TRunningInThisThreadGuard(TTaskRunnerBase* task) + : Task(task) + { Y_ASSERT(!ThreadCurrentTask); - ThreadCurrentTask = task; - } - - ~TRunningInThisThreadGuard() { + ThreadCurrentTask = task; + } + + ~TRunningInThisThreadGuard() { Y_ASSERT(ThreadCurrentTask == Task); ThreadCurrentTask = nullptr; - } - }; -} - + } + }; +} + void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) { Y_ASSERT(RefCount() > 0); - - TRunningInThisThreadGuard g(this); - - //RetainRef(); - - for (;;) { - TTempTlsVector<TSubtaskCompletion*> temp; - - temp.GetVector()->swap(Pending); - + + TRunningInThisThreadGuard g(this); + + //RetainRef(); + + for (;;) { + TTempTlsVector<TSubtaskCompletion*> temp; + + temp.GetVector()->swap(Pending); + for (auto& pending : *temp.GetVector()) { if (pending->IsComplete()) { pending->FireCompletionCallback(GetImplBase()); - } else { + } else { Pending.push_back(pending); - } - } - - if (!Pending.empty()) { - return; - } - - if (!Done) { - Done = !ReplyReceived(); - } else { - if (Pending.empty()) { - if (!SetDoneCalled) { - ParentTask->SetDone(); - SetDoneCalled = true; - } - //ReleaseRef(); - return; - } - } - } -} - + } + } + + if (!Pending.empty()) { + return; + } + + if (!Done) { + Done = !ReplyReceived(); + } else { + if (Pending.empty()) { + if (!SetDoneCalled) { + ParentTask->SetDone(); + SetDoneCalled = true; + } + //ReleaseRef(); + return; + } + } + } +} + bool TTaskRunnerBase::IsRunningInThisThread() const { - return ThreadCurrentTask == this; -} - + return ThreadCurrentTask == this; +} + TSubtaskCompletion::~TSubtaskCompletion() { - ESubtaskState state = State.Get(); + ESubtaskState state = State.Get(); Y_ASSERT(state == CREATED || state == DONE || state == CANCELED); -} - +} + void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) { Y_ASSERT(IsComplete()); - - if (!!CompletionFunc) { - TSubtaskCompletionFunc temp = CompletionFunc; - // completion func must be reset before calling it, - // because function may set it back - CompletionFunc = TSubtaskCompletionFunc(); - (task->*(temp.Func))(this); - } -} - + + if (!!CompletionFunc) { + TSubtaskCompletionFunc temp = CompletionFunc; + // completion func must be reset before calling it, + // because function may set it back + CompletionFunc = TSubtaskCompletionFunc(); + (task->*(temp.Func))(this); + } +} + void NRainCheck::TSubtaskCompletion::Cancel() { - for (;;) { - ESubtaskState state = State.Get(); - if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) { - return; - } - if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) { - return; - } - if (state == DONE && State.CompareAndSet(DONE, CANCELED)) { - return; - } - if (state == CANCEL_REQUESTED || state == CANCELED) { - return; - } - } -} - + for (;;) { + ESubtaskState state = State.Get(); + if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) { + return; + } + if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) { + return; + } + if (state == DONE && State.CompareAndSet(DONE, CANCELED)) { + return; + } + if (state == CANCEL_REQUESTED || state == CANCELED) { + return; + } + } +} + void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) { Y_ASSERT(!TaskRunner); Y_ASSERT(!!parent); - - TaskRunner = parent; - - parent->Pending.push_back(this); - - parent->RefV(); - - for (;;) { - ESubtaskState current = State.Get(); - if (current != CREATED && current != DONE) { + + TaskRunner = parent; + + parent->Pending.push_back(this); + + parent->RefV(); + + for (;;) { + ESubtaskState current = State.Get(); + if (current != CREATED && current != DONE) { Y_FAIL("current state should be CREATED or DONE: %s", ToCString(current)); - } - if (State.CompareAndSet(current, RUNNING)) { - return; - } - } -} - + } + if (State.CompareAndSet(current, RUNNING)) { + return; + } + } +} + void TSubtaskCompletion::SetDone() { Y_ASSERT(!!TaskRunner); - TTaskRunnerBase* temp = TaskRunner; + TTaskRunnerBase* temp = TaskRunner; TaskRunner = nullptr; - - for (;;) { - ESubtaskState state = State.Get(); - if (state == RUNNING) { - if (State.CompareAndSet(RUNNING, DONE)) { - break; - } - } else if (state == CANCEL_REQUESTED) { - if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) { - break; - } - } else { + + for (;;) { + ESubtaskState state = State.Get(); + if (state == RUNNING) { + if (State.CompareAndSet(RUNNING, DONE)) { + break; + } + } else if (state == CANCEL_REQUESTED) { + if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) { + break; + } + } else { Y_FAIL("cannot SetDone: unknown state: %s", ToCString(state)); - } - } - - temp->ScheduleV(); - temp->UnRefV(); -} - -#if 0 -void NRainCheck::TTaskRunnerBase::RetainRef() -{ - if (HoldsSelfReference) { - return; - } - HoldsSelfReference = true; - Ref(); -} - -void NRainCheck::TTaskRunnerBase::ReleaseRef() -{ - if (!HoldsSelfReference) { - return; - } - HoldsSelfReference = false; - DecRef(); -} -#endif - + } + } + + temp->ScheduleV(); + temp->UnRefV(); +} + +#if 0 +void NRainCheck::TTaskRunnerBase::RetainRef() +{ + if (HoldsSelfReference) { + return; + } + HoldsSelfReference = true; + Ref(); +} + +void NRainCheck::TTaskRunnerBase::ReleaseRef() +{ + if (!HoldsSelfReference) { + return; + } + HoldsSelfReference = false; + DecRef(); +} +#endif + void TTaskRunnerBase::AssertInThisThread() const { Y_ASSERT(IsRunningInThisThread()); -} - +} + TTaskRunnerBase* TTaskRunnerBase::CurrentTask() { Y_VERIFY(!!ThreadCurrentTask); - return ThreadCurrentTask; -} - + return ThreadCurrentTask; +} + ITaskBase* TTaskRunnerBase::CurrentTaskImpl() { return CurrentTask()->GetImplBase(); } TString TTaskRunnerBase::GetStatusSingleLine() { return TypeName(*Impl); -} - +} + bool NRainCheck::AreWeInsideTask() { return ThreadCurrentTask != nullptr; } diff --git a/library/cpp/messagebus/rain_check/core/task.h b/library/cpp/messagebus/rain_check/core/task.h index 7d8778bcda..b84e62a1eb 100644 --- a/library/cpp/messagebus/rain_check/core/task.h +++ b/library/cpp/messagebus/rain_check/core/task.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include "fwd.h" #include <library/cpp/messagebus/actor/actor.h> @@ -7,55 +7,55 @@ #include <library/cpp/deprecated/enum_codegen/enum_codegen.h> -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> -#include <util/thread/lfstack.h> - -namespace NRainCheck { - struct ISubtaskListener { - virtual void SetDone() = 0; +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/thread/lfstack.h> + +namespace NRainCheck { + struct ISubtaskListener { + virtual void SetDone() = 0; virtual ~ISubtaskListener() { } - }; - - struct TNopSubtaskListener: public ISubtaskListener { + }; + + struct TNopSubtaskListener: public ISubtaskListener { void SetDone() override; - - static TNopSubtaskListener Instance; - }; - - class TSubtaskCompletionFunc { - friend class TSubtaskCompletion; - - typedef void (ITaskBase::*TFunc)(TSubtaskCompletion*); - TFunc Func; - - public: - TSubtaskCompletionFunc() + + static TNopSubtaskListener Instance; + }; + + class TSubtaskCompletionFunc { + friend class TSubtaskCompletion; + + typedef void (ITaskBase::*TFunc)(TSubtaskCompletion*); + TFunc Func; + + public: + TSubtaskCompletionFunc() : Func(nullptr) { } - - TSubtaskCompletionFunc(void*) + + TSubtaskCompletionFunc(void*) : Func(nullptr) { } - - template <typename TTask> - TSubtaskCompletionFunc(void (TTask::*func)(TSubtaskCompletion*)) + + template <typename TTask> + TSubtaskCompletionFunc(void (TTask::*func)(TSubtaskCompletion*)) : Func((TFunc)func) - { + { static_assert((std::is_base_of<ITaskBase, TTask>::value), "expect (std::is_base_of<ITaskBase, TTask>::value)"); - } - - bool operator!() const { - return !Func; - } - }; - - template <typename T> - class TTaskFuture; - + } + + bool operator!() const { + return !Func; + } + }; + + template <typename T> + class TTaskFuture; + #define SUBTASK_STATE_MAP(XX) \ XX(CREATED, "Initial") \ XX(RUNNING, "Running") \ @@ -63,33 +63,33 @@ namespace NRainCheck { XX(CANCEL_REQUESTED, "Cancel requested, but still executing") \ XX(CANCELED, "Canceled") \ /**/ - - enum ESubtaskState { - SUBTASK_STATE_MAP(ENUM_VALUE_GEN_NO_VALUE) - }; - - ENUM_TO_STRING(ESubtaskState, SUBTASK_STATE_MAP) - + + enum ESubtaskState { + SUBTASK_STATE_MAP(ENUM_VALUE_GEN_NO_VALUE) + }; + + ENUM_TO_STRING(ESubtaskState, SUBTASK_STATE_MAP) + class TSubtaskCompletion : TNonCopyable, public ISubtaskListener { - friend struct TTaskAccessor; + friend struct TTaskAccessor; - private: - TAtomicBox<ESubtaskState> State; - TTaskRunnerBase* volatile TaskRunner; - TSubtaskCompletionFunc CompletionFunc; + private: + TAtomicBox<ESubtaskState> State; + TTaskRunnerBase* volatile TaskRunner; + TSubtaskCompletionFunc CompletionFunc; - public: + public: TSubtaskCompletion() : State(CREATED) , TaskRunner() { } ~TSubtaskCompletion() override; - - // Either done or cancel requested or cancelled - bool IsComplete() const { - ESubtaskState state = State.Get(); - switch (state) { + + // Either done or cancel requested or cancelled + bool IsComplete() const { + ESubtaskState state = State.Get(); + switch (state) { case RUNNING: return false; case DONE: @@ -102,82 +102,82 @@ namespace NRainCheck { Y_FAIL("not started"); default: Y_FAIL("unknown value: %u", (unsigned)state); - } - } - - void FireCompletionCallback(ITaskBase*); - - void SetCompletionCallback(TSubtaskCompletionFunc func) { - CompletionFunc = func; - } - - // Completed, but not cancelled - bool IsDone() const { - return State.Get() == DONE; - } - - // Request cancel by actor - // Does nothing but marks task cancelled, - // and allows proceeding to next callback - void Cancel(); - - // called by service provider implementations - // must not be called by actor - void SetRunning(TTaskRunnerBase* parent); + } + } + + void FireCompletionCallback(ITaskBase*); + + void SetCompletionCallback(TSubtaskCompletionFunc func) { + CompletionFunc = func; + } + + // Completed, but not cancelled + bool IsDone() const { + return State.Get() == DONE; + } + + // Request cancel by actor + // Does nothing but marks task cancelled, + // and allows proceeding to next callback + void Cancel(); + + // called by service provider implementations + // must not be called by actor + void SetRunning(TTaskRunnerBase* parent); void SetDone() override; - }; - - // See ISimpleTask, ICoroTask + }; + + // See ISimpleTask, ICoroTask class TTaskRunnerBase: public TAtomicRefCount<TTaskRunnerBase>, public NActor::TActor<TTaskRunnerBase> { - friend class NActor::TActor<TTaskRunnerBase>; - friend class TContinueFunc; - friend struct TTaskAccessor; - friend class TSubtaskCompletion; - - private: - THolder<ITaskBase> Impl; - - ISubtaskListener* const ParentTask; - // While task is running, it holds extra reference to self. - //bool HoldsSelfReference; - bool Done; - bool SetDoneCalled; - - // Subtasks currently executed. + friend class NActor::TActor<TTaskRunnerBase>; + friend class TContinueFunc; + friend struct TTaskAccessor; + friend class TSubtaskCompletion; + + private: + THolder<ITaskBase> Impl; + + ISubtaskListener* const ParentTask; + // While task is running, it holds extra reference to self. + //bool HoldsSelfReference; + bool Done; + bool SetDoneCalled; + + // Subtasks currently executed. TVector<TSubtaskCompletion*> Pending; - - void Act(NActor::TDefaultTag); - - public: - // Construct task. Task is not automatically started. - TTaskRunnerBase(IEnv*, ISubtaskListener* parent, TAutoPtr<ITaskBase> impl); + + void Act(NActor::TDefaultTag); + + public: + // Construct task. Task is not automatically started. + TTaskRunnerBase(IEnv*, ISubtaskListener* parent, TAutoPtr<ITaskBase> impl); ~TTaskRunnerBase() override; - - bool IsRunningInThisThread() const; - void AssertInThisThread() const; - static TTaskRunnerBase* CurrentTask(); + + bool IsRunningInThisThread() const; + void AssertInThisThread() const; + static TTaskRunnerBase* CurrentTask(); static ITaskBase* CurrentTaskImpl(); - + TString GetStatusSingleLine(); - - protected: - //void RetainRef(); - //void ReleaseRef(); + + protected: + //void RetainRef(); + //void ReleaseRef(); ITaskBase* GetImplBase() { return Impl.Get(); } - - private: - // true if need to call again - virtual bool ReplyReceived() = 0; - }; - - class ITaskBase { - public: + + private: + // true if need to call again + virtual bool ReplyReceived() = 0; + }; + + class ITaskBase { + public: virtual ~ITaskBase() { } - }; - + }; + // Check that current method executed inside some task. bool AreWeInsideTask(); diff --git a/library/cpp/messagebus/rain_check/core/track.cpp b/library/cpp/messagebus/rain_check/core/track.cpp index 092a51a214..cc3747b9f6 100644 --- a/library/cpp/messagebus/rain_check/core/track.cpp +++ b/library/cpp/messagebus/rain_check/core/track.cpp @@ -1,66 +1,66 @@ -#include "track.h" - -using namespace NRainCheck; -using namespace NRainCheck::NPrivate; - +#include "track.h" + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; + void TTaskTrackerReceipt::SetDone() { - TaskTracker->GetQueue<TTaskTrackerReceipt*>()->EnqueueAndSchedule(this); -} - + TaskTracker->GetQueue<TTaskTrackerReceipt*>()->EnqueueAndSchedule(this); +} + TString TTaskTrackerReceipt::GetStatusSingleLine() { - return Task->GetStatusSingleLine(); -} - + return Task->GetStatusSingleLine(); +} + TTaskTracker::TTaskTracker(NActor::TExecutor* executor) : NActor::TActor<TTaskTracker>(executor) -{ -} - +{ +} + TTaskTracker::~TTaskTracker() { Y_ASSERT(Tasks.Empty()); -} - -void TTaskTracker::Shutdown() { - ShutdownFlag.Set(true); - Schedule(); - ShutdownEvent.WaitI(); -} - -void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, ITaskFactory* taskFactory) { - THolder<ITaskFactory> holder(taskFactory); - - THolder<TTaskTrackerReceipt> receipt(new TTaskTrackerReceipt(this)); - receipt->Task = taskFactory->NewTask(receipt.Get()); - - Tasks.PushBack(receipt.Release()); -} - -void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TTaskTrackerReceipt* receipt) { +} + +void TTaskTracker::Shutdown() { + ShutdownFlag.Set(true); + Schedule(); + ShutdownEvent.WaitI(); +} + +void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, ITaskFactory* taskFactory) { + THolder<ITaskFactory> holder(taskFactory); + + THolder<TTaskTrackerReceipt> receipt(new TTaskTrackerReceipt(this)); + receipt->Task = taskFactory->NewTask(receipt.Get()); + + Tasks.PushBack(receipt.Release()); +} + +void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TTaskTrackerReceipt* receipt) { Y_ASSERT(!receipt->Empty()); - receipt->Unlink(); - delete receipt; -} - -void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<TTaskTrackerStatus>* status) { - TTaskTrackerStatus s; - s.Size = Tasks.Size(); - status->SetResult(s); -} - + receipt->Unlink(); + delete receipt; +} + +void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<TTaskTrackerStatus>* status) { + TTaskTrackerStatus s; + s.Size = Tasks.Size(); + status->SetResult(s); +} + void TTaskTracker::Act(NActor::TDefaultTag) { - GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->DequeueAll(); - GetQueue<ITaskFactory*>()->DequeueAll(); - GetQueue<TTaskTrackerReceipt*>()->DequeueAll(); - - if (ShutdownFlag.Get()) { - if (Tasks.Empty()) { - ShutdownEvent.Signal(); - } - } -} - -ui32 TTaskTracker::Size() { - TAsyncResult<TTaskTrackerStatus> r; - GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->EnqueueAndSchedule(&r); - return r.GetResult().Size; -} + GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->DequeueAll(); + GetQueue<ITaskFactory*>()->DequeueAll(); + GetQueue<TTaskTrackerReceipt*>()->DequeueAll(); + + if (ShutdownFlag.Get()) { + if (Tasks.Empty()) { + ShutdownEvent.Signal(); + } + } +} + +ui32 TTaskTracker::Size() { + TAsyncResult<TTaskTrackerStatus> r; + GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->EnqueueAndSchedule(&r); + return r.GetResult().Size; +} diff --git a/library/cpp/messagebus/rain_check/core/track.h b/library/cpp/messagebus/rain_check/core/track.h index d387de7574..a7f3d099f0 100644 --- a/library/cpp/messagebus/rain_check/core/track.h +++ b/library/cpp/messagebus/rain_check/core/track.h @@ -1,97 +1,97 @@ -#pragma once - +#pragma once + #include "spawn.h" #include "task.h" - + #include <library/cpp/messagebus/async_result.h> #include <library/cpp/messagebus/actor/queue_in_actor.h> #include <library/cpp/messagebus/misc/atomic_box.h> - + #include <util/generic/intrlist.h> #include <util/system/event.h> -namespace NRainCheck { - class TTaskTracker; - - namespace NPrivate { - struct ITaskFactory { - virtual TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener*) = 0; +namespace NRainCheck { + class TTaskTracker; + + namespace NPrivate { + struct ITaskFactory { + virtual TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener*) = 0; virtual ~ITaskFactory() { } - }; - - struct TTaskTrackerReceipt: public ISubtaskListener, public TIntrusiveListItem<TTaskTrackerReceipt> { - TTaskTracker* const TaskTracker; - TIntrusivePtr<TTaskRunnerBase> Task; - + }; + + struct TTaskTrackerReceipt: public ISubtaskListener, public TIntrusiveListItem<TTaskTrackerReceipt> { + TTaskTracker* const TaskTracker; + TIntrusivePtr<TTaskRunnerBase> Task; + TTaskTrackerReceipt(TTaskTracker* taskTracker) : TaskTracker(taskTracker) { } - + void SetDone() override; - + TString GetStatusSingleLine(); - }; - - struct TTaskTrackerStatus { - ui32 Size; - }; - - } - - class TTaskTracker + }; + + struct TTaskTrackerStatus { + ui32 Size; + }; + + } + + class TTaskTracker : public TAtomicRefCount<TTaskTracker>, public NActor::TActor<TTaskTracker>, public NActor::TQueueInActor<TTaskTracker, NPrivate::ITaskFactory*>, public NActor::TQueueInActor<TTaskTracker, NPrivate::TTaskTrackerReceipt*>, public NActor::TQueueInActor<TTaskTracker, TAsyncResult<NPrivate::TTaskTrackerStatus>*> { - friend struct NPrivate::TTaskTrackerReceipt; - - private: - TAtomicBox<bool> ShutdownFlag; + friend struct NPrivate::TTaskTrackerReceipt; + + private: + TAtomicBox<bool> ShutdownFlag; TSystemEvent ShutdownEvent; - - TIntrusiveList<NPrivate::TTaskTrackerReceipt> Tasks; - - template <typename TItem> - NActor::TQueueInActor<TTaskTracker, TItem>* GetQueue() { - return this; - } - - public: - TTaskTracker(NActor::TExecutor* executor); + + TIntrusiveList<NPrivate::TTaskTrackerReceipt> Tasks; + + template <typename TItem> + NActor::TQueueInActor<TTaskTracker, TItem>* GetQueue() { + return this; + } + + public: + TTaskTracker(NActor::TExecutor* executor); ~TTaskTracker() override; - - void Shutdown(); - - void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::ITaskFactory*); - void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::TTaskTrackerReceipt*); - void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<NPrivate::TTaskTrackerStatus>*); - - void Act(NActor::TDefaultTag); - - template <typename TTask, typename TEnv, typename TParam> - void Spawn(TEnv* env, TParam param) { - struct TTaskFactory: public NPrivate::ITaskFactory { - TEnv* const Env; - TParam Param; - + + void Shutdown(); + + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::ITaskFactory*); + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::TTaskTrackerReceipt*); + void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<NPrivate::TTaskTrackerStatus>*); + + void Act(NActor::TDefaultTag); + + template <typename TTask, typename TEnv, typename TParam> + void Spawn(TEnv* env, TParam param) { + struct TTaskFactory: public NPrivate::ITaskFactory { + TEnv* const Env; + TParam Param; + TTaskFactory(TEnv* env, TParam param) : Env(env) , Param(param) { } - + TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener* subtaskListener) override { - return NRainCheck::SpawnTask<TTask>(Env, Param, subtaskListener).Get(); - } - }; - - GetQueue<NPrivate::ITaskFactory*>()->EnqueueAndSchedule(new TTaskFactory(env, param)); - } - - ui32 Size(); - }; - -} + return NRainCheck::SpawnTask<TTask>(Env, Param, subtaskListener).Get(); + } + }; + + GetQueue<NPrivate::ITaskFactory*>()->EnqueueAndSchedule(new TTaskFactory(env, param)); + } + + ui32 Size(); + }; + +} diff --git a/library/cpp/messagebus/rain_check/core/track_ut.cpp b/library/cpp/messagebus/rain_check/core/track_ut.cpp index 05f7de1319..f2ac90fa3c 100644 --- a/library/cpp/messagebus/rain_check/core/track_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/track_ut.cpp @@ -1,45 +1,45 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "track.h" #include <library/cpp/messagebus/rain_check/test/helper/misc.h> #include <library/cpp/messagebus/rain_check/test/ut/test.h> - -using namespace NRainCheck; - + +using namespace NRainCheck; + Y_UNIT_TEST_SUITE(TaskTracker) { - struct TTaskForTracker: public ISimpleTask { - TTestSync* const TestSync; - + struct TTaskForTracker: public ISimpleTask { + TTestSync* const TestSync; + TTaskForTracker(TTestEnv*, TTestSync* testSync) : TestSync(testSync) { } - + TContinueFunc Start() override { - TestSync->WaitForAndIncrement(0); - TestSync->WaitForAndIncrement(2); + TestSync->WaitForAndIncrement(0); + TestSync->WaitForAndIncrement(2); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Simple) { - TTestEnv env; - - TIntrusivePtr<TTaskTracker> tracker(new TTaskTracker(env.GetExecutor())); - - TTestSync testSync; - - tracker->Spawn<TTaskForTracker>(&env, &testSync); - - testSync.WaitFor(1); - + TTestEnv env; + + TIntrusivePtr<TTaskTracker> tracker(new TTaskTracker(env.GetExecutor())); + + TTestSync testSync; + + tracker->Spawn<TTaskForTracker>(&env, &testSync); + + testSync.WaitFor(1); + UNIT_ASSERT_VALUES_EQUAL(1u, tracker->Size()); - - testSync.CheckAndIncrement(1); - - testSync.WaitForAndIncrement(3); - - tracker->Shutdown(); - } -} + + testSync.CheckAndIncrement(1); + + testSync.WaitForAndIncrement(3); + + tracker->Shutdown(); + } +} diff --git a/library/cpp/messagebus/rain_check/core/ya.make b/library/cpp/messagebus/rain_check/core/ya.make index c6fb5640d4..497e452729 100644 --- a/library/cpp/messagebus/rain_check/core/ya.make +++ b/library/cpp/messagebus/rain_check/core/ya.make @@ -1,25 +1,25 @@ -LIBRARY() - +LIBRARY() + OWNER(g:messagebus) - -PEERDIR( + +PEERDIR( library/cpp/coroutine/engine library/cpp/deprecated/enum_codegen library/cpp/messagebus library/cpp/messagebus/actor library/cpp/messagebus/scheduler -) - -SRCS( - coro.cpp - coro_stack.cpp - env.cpp - rain_check.cpp - simple.cpp - sleep.cpp - spawn.cpp - task.cpp - track.cpp -) - -END() +) + +SRCS( + coro.cpp + coro_stack.cpp + env.cpp + rain_check.cpp + simple.cpp + sleep.cpp + spawn.cpp + task.cpp + track.cpp +) + +END() diff --git a/library/cpp/messagebus/rain_check/http/client_ut.cpp b/library/cpp/messagebus/rain_check/http/client_ut.cpp index 1628114391..c6e4a151bd 100644 --- a/library/cpp/messagebus/rain_check/http/client_ut.cpp +++ b/library/cpp/messagebus/rain_check/http/client_ut.cpp @@ -25,7 +25,7 @@ #include <utility> using namespace NRainCheck; -using namespace NBus::NTest; +using namespace NBus::NTest; namespace { class THttpClientEnv: public TTestEnvTemplate<THttpClientEnv> { @@ -145,11 +145,11 @@ Y_UNIT_TEST_SUITE(RainCheckHttpClient) { static const TIpPort SERVER_PORT = 4000; Y_UNIT_TEST(Simple) { - // TODO: randomize port - if (!IsFixedPortTestAllowed()) { - return; - } - + // TODO: randomize port + if (!IsFixedPortTestAllowed()) { + return; + } + TSimpleServer server; NNeh::IServicesRef runner = RunServer(SERVER_PORT, server); diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp index daac8d9a99..13d3132fb7 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp @@ -1,98 +1,98 @@ -#include "messagebus_client.h" - -using namespace NRainCheck; -using namespace NBus; - -TBusClientService::TBusClientService( +#include "messagebus_client.h" + +using namespace NRainCheck; +using namespace NBus; + +TBusClientService::TBusClientService( const NBus::TBusSessionConfig& config, NBus::TBusProtocol* proto, NBus::TBusMessageQueue* queue) { - Session = queue->CreateSource(proto, this, config); -} - + Session = queue->CreateSource(proto, this, config); +} + TBusClientService::~TBusClientService() { - Session->Shutdown(); -} - + Session->Shutdown(); +} + void TBusClientService::SendCommon(NBus::TBusMessage* message, const NBus::TNetAddr&, TBusFuture* future) { - TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); - - future->SetRunning(current); - - future->Task = current; - - // after this statement message is owned by both messagebus and future - future->Request.Reset(message); - - // TODO: allow cookie in messagebus - message->Data = future; -} - -void TBusClientService::ProcessResultCommon(NBus::TBusMessageAutoPtr message, + TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); + + future->SetRunning(current); + + future->Task = current; + + // after this statement message is owned by both messagebus and future + future->Request.Reset(message); + + // TODO: allow cookie in messagebus + message->Data = future; +} + +void TBusClientService::ProcessResultCommon(NBus::TBusMessageAutoPtr message, const NBus::TNetAddr&, TBusFuture* future, NBus::EMessageStatus status) { Y_UNUSED(message.Release()); - - if (status == NBus::MESSAGE_OK) { - return; - } - + + if (status == NBus::MESSAGE_OK) { + return; + } + future->SetDoneAndSchedule(status, nullptr); -} - -void TBusClientService::SendOneWay( +} + +void TBusClientService::SendOneWay( NBus::TBusMessageAutoPtr message, const NBus::TNetAddr& addr, TBusFuture* future) { - SendCommon(message.Get(), addr, future); - - EMessageStatus ok = Session->SendMessageOneWay(message.Get(), &addr, false); - ProcessResultCommon(message, addr, future, ok); -} - + SendCommon(message.Get(), addr, future); + + EMessageStatus ok = Session->SendMessageOneWay(message.Get(), &addr, false); + ProcessResultCommon(message, addr, future, ok); +} + NBus::TBusClientSessionPtr TBusClientService::GetSessionForMonitoring() const { return Session; } -void TBusClientService::Send( +void TBusClientService::Send( TBusMessageAutoPtr message, const TNetAddr& addr, TBusFuture* future) { - SendCommon(message.Get(), addr, future); - - EMessageStatus ok = Session->SendMessage(message.Get(), &addr, false); - ProcessResultCommon(message, addr, future, ok); -} - -void TBusClientService::OnReply( + SendCommon(message.Get(), addr, future); + + EMessageStatus ok = Session->SendMessage(message.Get(), &addr, false); + ProcessResultCommon(message, addr, future, ok); +} + +void TBusClientService::OnReply( TAutoPtr<TBusMessage> request, TAutoPtr<TBusMessage> response) { TBusFuture* future = (TBusFuture*)request->Data; Y_ASSERT(future->Request.Get() == request.Get()); Y_UNUSED(request.Release()); - future->SetDoneAndSchedule(MESSAGE_OK, response); -} - -void NRainCheck::TBusClientService::OnMessageSentOneWay( + future->SetDoneAndSchedule(MESSAGE_OK, response); +} + +void NRainCheck::TBusClientService::OnMessageSentOneWay( TAutoPtr<NBus::TBusMessage> request) { TBusFuture* future = (TBusFuture*)request->Data; Y_ASSERT(future->Request.Get() == request.Get()); Y_UNUSED(request.Release()); future->SetDoneAndSchedule(MESSAGE_OK, nullptr); -} - -void TBusClientService::OnError( +} + +void TBusClientService::OnError( TAutoPtr<TBusMessage> message, NBus::EMessageStatus status) { if (message->Data == nullptr) { - return; - } - + return; + } + TBusFuture* future = (TBusFuture*)message->Data; Y_ASSERT(future->Request.Get() == message.Get()); Y_UNUSED(message.Release()); future->SetDoneAndSchedule(status, nullptr); -} - +} + void TBusFuture::SetDoneAndSchedule(EMessageStatus status, TAutoPtr<TBusMessage> response) { - Status = status; - Response.Reset(response.Release()); - SetDone(); -} + Status = status; + Response.Reset(response.Release()); + SetDone(); +} diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h index 0a291cdea6..8bcc03b8d9 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h @@ -1,67 +1,67 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/rain_check/core/task.h> #include <library/cpp/messagebus/ybus.h> - -namespace NRainCheck { - class TBusFuture: public TSubtaskCompletion { - friend class TBusClientService; - - private: - THolder<NBus::TBusMessage> Request; - THolder<NBus::TBusMessage> Response; - NBus::EMessageStatus Status; - - private: - TTaskRunnerBase* Task; - - void SetDoneAndSchedule(NBus::EMessageStatus, TAutoPtr<NBus::TBusMessage>); - - public: - // TODO: add MESSAGE_UNDEFINED + +namespace NRainCheck { + class TBusFuture: public TSubtaskCompletion { + friend class TBusClientService; + + private: + THolder<NBus::TBusMessage> Request; + THolder<NBus::TBusMessage> Response; + NBus::EMessageStatus Status; + + private: + TTaskRunnerBase* Task; + + void SetDoneAndSchedule(NBus::EMessageStatus, TAutoPtr<NBus::TBusMessage>); + + public: + // TODO: add MESSAGE_UNDEFINED TBusFuture() : Status(NBus::MESSAGE_DONT_ASK) , Task(nullptr) { } - - NBus::TBusMessage* GetRequest() const { - return Request.Get(); - } - - NBus::TBusMessage* GetResponse() const { + + NBus::TBusMessage* GetRequest() const { + return Request.Get(); + } + + NBus::TBusMessage* GetResponse() const { Y_ASSERT(IsDone()); - return Response.Get(); - } - - NBus::EMessageStatus GetStatus() const { + return Response.Get(); + } + + NBus::EMessageStatus GetStatus() const { Y_ASSERT(IsDone()); - return Status; - } - }; - - class TBusClientService: private NBus::IBusClientHandler { - private: - NBus::TBusClientSessionPtr Session; - - public: - TBusClientService(const NBus::TBusSessionConfig&, NBus::TBusProtocol*, NBus::TBusMessageQueue*); + return Status; + } + }; + + class TBusClientService: private NBus::IBusClientHandler { + private: + NBus::TBusClientSessionPtr Session; + + public: + TBusClientService(const NBus::TBusSessionConfig&, NBus::TBusProtocol*, NBus::TBusMessageQueue*); ~TBusClientService() override; - - void Send(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future); - void SendOneWay(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future); - + + void Send(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future); + void SendOneWay(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future); + // Use it only for monitoring NBus::TBusClientSessionPtr GetSessionForMonitoring() const; - private: - void SendCommon(NBus::TBusMessage*, const NBus::TNetAddr&, TBusFuture* future); - void ProcessResultCommon(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future, NBus::EMessageStatus); - + private: + void SendCommon(NBus::TBusMessage*, const NBus::TNetAddr&, TBusFuture* future); + void ProcessResultCommon(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future, NBus::EMessageStatus); + void OnReply(TAutoPtr<NBus::TBusMessage> pMessage, TAutoPtr<NBus::TBusMessage> pReply) override; void OnError(TAutoPtr<NBus::TBusMessage> pMessage, NBus::EMessageStatus status) override; void OnMessageSentOneWay(TAutoPtr<NBus::TBusMessage>) override; - }; - -} + }; + +} diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp index 1b3618558b..4571f6f74a 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp @@ -1,146 +1,146 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "messagebus_client.h" #include <library/cpp/messagebus/rain_check/test/ut/test.h> #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> - + #include <util/generic/cast.h> - -using namespace NBus; -using namespace NBus::NTest; -using namespace NRainCheck; - -struct TMessageBusClientEnv: public TTestEnvTemplate<TMessageBusClientEnv> { - // TODO: use same thread pool - TBusMessageQueuePtr Queue; - TExampleProtocol Proto; - TBusClientService BusClientService; - - static TBusQueueConfig QueueConfig() { - TBusQueueConfig r; - r.NumWorkers = 4; - return r; - } - - TMessageBusClientEnv() - : Queue(CreateMessageQueue(GetExecutor())) - , BusClientService(TBusSessionConfig(), &Proto, Queue.Get()) + +using namespace NBus; +using namespace NBus::NTest; +using namespace NRainCheck; + +struct TMessageBusClientEnv: public TTestEnvTemplate<TMessageBusClientEnv> { + // TODO: use same thread pool + TBusMessageQueuePtr Queue; + TExampleProtocol Proto; + TBusClientService BusClientService; + + static TBusQueueConfig QueueConfig() { + TBusQueueConfig r; + r.NumWorkers = 4; + return r; + } + + TMessageBusClientEnv() + : Queue(CreateMessageQueue(GetExecutor())) + , BusClientService(TBusSessionConfig(), &Proto, Queue.Get()) { } -}; - +}; + Y_UNIT_TEST_SUITE(RainCheckMessageBusClient) { - struct TSimpleTask: public ISimpleTask { - TMessageBusClientEnv* const Env; - - const unsigned ServerPort; - - TSimpleTask(TMessageBusClientEnv* env, unsigned serverPort) - : Env(env) - , ServerPort(serverPort) - { - } - + struct TSimpleTask: public ISimpleTask { + TMessageBusClientEnv* const Env; + + const unsigned ServerPort; + + TSimpleTask(TMessageBusClientEnv* env, unsigned serverPort) + : Env(env) + , ServerPort(serverPort) + { + } + TVector<TSimpleSharedPtr<TBusFuture>> Requests; - + TContinueFunc Start() override { - for (unsigned i = 0; i < 3; ++i) { - Requests.push_back(new TBusFuture); - TNetAddr addr("localhost", ServerPort); - Env->BusClientService.Send(new TExampleRequest(&Env->Proto.RequestCount), addr, Requests[i].Get()); - } - - return TContinueFunc(&TSimpleTask::GotReplies); - } - - TContinueFunc GotReplies() { - for (unsigned i = 0; i < Requests.size(); ++i) { + for (unsigned i = 0; i < 3; ++i) { + Requests.push_back(new TBusFuture); + TNetAddr addr("localhost", ServerPort); + Env->BusClientService.Send(new TExampleRequest(&Env->Proto.RequestCount), addr, Requests[i].Get()); + } + + return TContinueFunc(&TSimpleTask::GotReplies); + } + + TContinueFunc GotReplies() { + for (unsigned i = 0; i < Requests.size(); ++i) { Y_VERIFY(Requests[i]->GetStatus() == MESSAGE_OK); - VerifyDynamicCast<TExampleResponse*>(Requests[i]->GetResponse()); - } - Env->TestSync.CheckAndIncrement(0); + VerifyDynamicCast<TExampleResponse*>(Requests[i]->GetResponse()); + } + Env->TestSync.CheckAndIncrement(0); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Simple) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TMessageBusClientEnv env; - - TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSimpleTask>(server.GetActualListenPort()); - - env.TestSync.WaitForAndIncrement(1); - } - - struct TOneWayServer: public NBus::IBusServerHandler { - TTestSync* const TestSync; - TExampleProtocol Proto; - NBus::TBusMessageQueuePtr Queue; - NBus::TBusServerSessionPtr Session; - - TOneWayServer(TTestSync* testSync) - : TestSync(testSync) - { - Queue = CreateMessageQueue(); - Session = Queue->CreateDestination(&Proto, this, NBus::TBusSessionConfig()); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TMessageBusClientEnv env; + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSimpleTask>(server.GetActualListenPort()); + + env.TestSync.WaitForAndIncrement(1); + } + + struct TOneWayServer: public NBus::IBusServerHandler { + TTestSync* const TestSync; + TExampleProtocol Proto; + NBus::TBusMessageQueuePtr Queue; + NBus::TBusServerSessionPtr Session; + + TOneWayServer(TTestSync* testSync) + : TestSync(testSync) + { + Queue = CreateMessageQueue(); + Session = Queue->CreateDestination(&Proto, this, NBus::TBusSessionConfig()); + } + void OnMessage(NBus::TOnMessageContext& context) override { - TestSync->CheckAndIncrement(1); - context.ForgetRequest(); - } - }; - - struct TOneWayTask: public ISimpleTask { - TMessageBusClientEnv* const Env; - - const unsigned ServerPort; - - TOneWayTask(TMessageBusClientEnv* env, unsigned serverPort) - : Env(env) - , ServerPort(serverPort) - { - } - + TestSync->CheckAndIncrement(1); + context.ForgetRequest(); + } + }; + + struct TOneWayTask: public ISimpleTask { + TMessageBusClientEnv* const Env; + + const unsigned ServerPort; + + TOneWayTask(TMessageBusClientEnv* env, unsigned serverPort) + : Env(env) + , ServerPort(serverPort) + { + } + TVector<TSimpleSharedPtr<TBusFuture>> Requests; - + TContinueFunc Start() override { - Env->TestSync.CheckAndIncrement(0); - - for (unsigned i = 0; i < 1; ++i) { - Requests.push_back(new TBusFuture); - TNetAddr addr("localhost", ServerPort); - Env->BusClientService.SendOneWay(new TExampleRequest(&Env->Proto.RequestCount), addr, Requests[i].Get()); - } - - return TContinueFunc(&TOneWayTask::GotReplies); - } - - TContinueFunc GotReplies() { - for (unsigned i = 0; i < Requests.size(); ++i) { + Env->TestSync.CheckAndIncrement(0); + + for (unsigned i = 0; i < 1; ++i) { + Requests.push_back(new TBusFuture); + TNetAddr addr("localhost", ServerPort); + Env->BusClientService.SendOneWay(new TExampleRequest(&Env->Proto.RequestCount), addr, Requests[i].Get()); + } + + return TContinueFunc(&TOneWayTask::GotReplies); + } + + TContinueFunc GotReplies() { + for (unsigned i = 0; i < Requests.size(); ++i) { Y_VERIFY(Requests[i]->GetStatus() == MESSAGE_OK); Y_VERIFY(!Requests[i]->GetResponse()); - } - Env->TestSync.WaitForAndIncrement(2); + } + Env->TestSync.WaitForAndIncrement(2); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(OneWay) { - TObjectCountCheck objectCountCheck; - - TMessageBusClientEnv env; - - TOneWayServer server(&env.TestSync); - - TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TOneWayTask>(server.Session->GetActualListenPort()); - - env.TestSync.WaitForAndIncrement(3); - } -} + TObjectCountCheck objectCountCheck; + + TMessageBusClientEnv env; + + TOneWayServer server(&env.TestSync); + + TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TOneWayTask>(server.Session->GetActualListenPort()); + + env.TestSync.WaitForAndIncrement(3); + } +} diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp index 5d4b13d664..1346ef3243 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp @@ -1,17 +1,17 @@ #include "messagebus_server.h" #include <library/cpp/messagebus/rain_check/core/spawn.h> - -using namespace NRainCheck; - -TBusTaskStarter::TBusTaskStarter(TAutoPtr<ITaskFactory> taskFactory) - : TaskFactory(taskFactory) -{ -} - + +using namespace NRainCheck; + +TBusTaskStarter::TBusTaskStarter(TAutoPtr<ITaskFactory> taskFactory) + : TaskFactory(taskFactory) +{ +} + void TBusTaskStarter::OnMessage(NBus::TOnMessageContext& onMessage) { - TaskFactory->NewTask(onMessage); -} - + TaskFactory->NewTask(onMessage); +} + TBusTaskStarter::~TBusTaskStarter() { -} +} diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h index 1334f05fe4..28d016599a 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h @@ -1,46 +1,46 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/rain_check/core/spawn.h> #include <library/cpp/messagebus/rain_check/core/task.h> - + #include <library/cpp/messagebus/ybus.h> - + #include <util/system/yassert.h> - -namespace NRainCheck { - class TBusTaskStarter: public NBus::IBusServerHandler { - private: - struct ITaskFactory { - virtual void NewTask(NBus::TOnMessageContext&) = 0; + +namespace NRainCheck { + class TBusTaskStarter: public NBus::IBusServerHandler { + private: + struct ITaskFactory { + virtual void NewTask(NBus::TOnMessageContext&) = 0; virtual ~ITaskFactory() { } - }; - - THolder<ITaskFactory> TaskFactory; - + }; + + THolder<ITaskFactory> TaskFactory; + void OnMessage(NBus::TOnMessageContext&) override; - public: - TBusTaskStarter(TAutoPtr<ITaskFactory>); + public: + TBusTaskStarter(TAutoPtr<ITaskFactory>); ~TBusTaskStarter() override; - - public: - template <typename TTask, typename TEnv> - static TAutoPtr<TBusTaskStarter> NewStarter(TEnv* env) { - struct TTaskFactory: public ITaskFactory { - TEnv* const Env; - + + public: + template <typename TTask, typename TEnv> + static TAutoPtr<TBusTaskStarter> NewStarter(TEnv* env) { + struct TTaskFactory: public ITaskFactory { + TEnv* const Env; + TTaskFactory(TEnv* env) : Env(env) { } - + void NewTask(NBus::TOnMessageContext& context) override { - SpawnTask<TTask, TEnv, NBus::TOnMessageContext&>(Env, context); - } - }; - - return new TBusTaskStarter(new TTaskFactory(env)); - } - }; -} + SpawnTask<TTask, TEnv, NBus::TOnMessageContext&>(Env, context); + } + }; + + return new TBusTaskStarter(new TTaskFactory(env)); + } + }; +} diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp index 7c11399f1b..fcb718c3ba 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp @@ -1,51 +1,51 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "messagebus_server.h" - + #include <library/cpp/messagebus/rain_check/test/ut/test.h> - + #include <library/cpp/messagebus/test/helper/example.h> - -using namespace NBus; -using namespace NBus::NTest; -using namespace NRainCheck; - -struct TMessageBusServerEnv: public TTestEnvTemplate<TMessageBusServerEnv> { - TExampleProtocol Proto; -}; - + +using namespace NBus; +using namespace NBus::NTest; +using namespace NRainCheck; + +struct TMessageBusServerEnv: public TTestEnvTemplate<TMessageBusServerEnv> { + TExampleProtocol Proto; +}; + Y_UNIT_TEST_SUITE(RainCheckMessageBusServer) { - struct TSimpleServerTask: public ISimpleTask { - private: - TMessageBusServerEnv* const Env; - TOnMessageContext MessageContext; - - public: - TSimpleServerTask(TMessageBusServerEnv* env, TOnMessageContext& messageContext) - : Env(env) - { - MessageContext.Swap(messageContext); - } - + struct TSimpleServerTask: public ISimpleTask { + private: + TMessageBusServerEnv* const Env; + TOnMessageContext MessageContext; + + public: + TSimpleServerTask(TMessageBusServerEnv* env, TOnMessageContext& messageContext) + : Env(env) + { + MessageContext.Swap(messageContext); + } + TContinueFunc Start() override { - MessageContext.SendReplyMove(new TExampleResponse(&Env->Proto.ResponseCount)); + MessageContext.SendReplyMove(new TExampleResponse(&Env->Proto.ResponseCount)); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Simple) { - TMessageBusServerEnv env; - - THolder<TBusTaskStarter> starter(TBusTaskStarter::NewStarter<TSimpleServerTask>(&env)); - - TBusMessageQueuePtr queue(CreateMessageQueue(env.GetExecutor())); - - TExampleProtocol proto; - - TBusServerSessionPtr session = queue->CreateDestination(&env.Proto, starter.Get(), TBusSessionConfig()); - - TExampleClient client; - - client.SendMessagesWaitReplies(1, TNetAddr("localhost", session->GetActualListenPort())); - } -} + TMessageBusServerEnv env; + + THolder<TBusTaskStarter> starter(TBusTaskStarter::NewStarter<TSimpleServerTask>(&env)); + + TBusMessageQueuePtr queue(CreateMessageQueue(env.GetExecutor())); + + TExampleProtocol proto; + + TBusServerSessionPtr session = queue->CreateDestination(&env.Proto, starter.Get(), TBusSessionConfig()); + + TExampleClient client; + + client.SendMessagesWaitReplies(1, TNetAddr("localhost", session->GetActualListenPort())); + } +} diff --git a/library/cpp/messagebus/rain_check/messagebus/ya.make b/library/cpp/messagebus/rain_check/messagebus/ya.make index defdac9a61..d7dc902ad1 100644 --- a/library/cpp/messagebus/rain_check/messagebus/ya.make +++ b/library/cpp/messagebus/rain_check/messagebus/ya.make @@ -1,15 +1,15 @@ -LIBRARY() - +LIBRARY() + OWNER(g:messagebus) - -PEERDIR( + +PEERDIR( library/cpp/messagebus library/cpp/messagebus/rain_check/core -) - -SRCS( - messagebus_client.cpp - messagebus_server.cpp -) - -END() +) + +SRCS( + messagebus_client.cpp + messagebus_server.cpp +) + +END() diff --git a/library/cpp/messagebus/rain_check/test/helper/misc.cpp b/library/cpp/messagebus/rain_check/test/helper/misc.cpp index c0fcb27252..2a75c42744 100644 --- a/library/cpp/messagebus/rain_check/test/helper/misc.cpp +++ b/library/cpp/messagebus/rain_check/test/helper/misc.cpp @@ -1,27 +1,27 @@ #include "misc.h" -#include <util/system/yassert.h> - -using namespace NRainCheck; - +#include <util/system/yassert.h> + +using namespace NRainCheck; + void TSpawnNopTasksCoroTask::Run() { Y_VERIFY(Count <= Completion.size()); - for (unsigned i = 0; i < Count; ++i) { - SpawnSubtask<TNopCoroTask>(Env, &Completion[i], ""); - } - - WaitForSubtasks(); -} - + for (unsigned i = 0; i < Count; ++i) { + SpawnSubtask<TNopCoroTask>(Env, &Completion[i], ""); + } + + WaitForSubtasks(); +} + TContinueFunc TSpawnNopTasksSimpleTask::Start() { Y_VERIFY(Count <= Completion.size()); - for (unsigned i = 0; i < Count; ++i) { - SpawnSubtask<TNopSimpleTask>(Env, &Completion[i], ""); - } - - return &TSpawnNopTasksSimpleTask::Join; -} - + for (unsigned i = 0; i < Count; ++i) { + SpawnSubtask<TNopSimpleTask>(Env, &Completion[i], ""); + } + + return &TSpawnNopTasksSimpleTask::Join; +} + TContinueFunc TSpawnNopTasksSimpleTask::Join() { return nullptr; -} +} diff --git a/library/cpp/messagebus/rain_check/test/helper/misc.h b/library/cpp/messagebus/rain_check/test/helper/misc.h index 9150be4d2f..dbcc04778d 100644 --- a/library/cpp/messagebus/rain_check/test/helper/misc.h +++ b/library/cpp/messagebus/rain_check/test/helper/misc.h @@ -1,57 +1,57 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/rain_check/core/rain_check.h> - + #include <array> - -namespace NRainCheck { - struct TNopSimpleTask: public ISimpleTask { + +namespace NRainCheck { + struct TNopSimpleTask: public ISimpleTask { TNopSimpleTask(IEnv*, const void*) { } - + TContinueFunc Start() override { return nullptr; - } - }; - - struct TNopCoroTask: public ICoroTask { + } + }; + + struct TNopCoroTask: public ICoroTask { TNopCoroTask(IEnv*, const void*) { } - + void Run() override { } - }; - - struct TSpawnNopTasksCoroTask: public ICoroTask { - IEnv* const Env; - unsigned const Count; - - TSpawnNopTasksCoroTask(IEnv* env, unsigned count) - : Env(env) - , Count(count) + }; + + struct TSpawnNopTasksCoroTask: public ICoroTask { + IEnv* const Env; + unsigned const Count; + + TSpawnNopTasksCoroTask(IEnv* env, unsigned count) + : Env(env) + , Count(count) { } - + std::array<TSubtaskCompletion, 2> Completion; - + void Run() override; - }; - - struct TSpawnNopTasksSimpleTask: public ISimpleTask { - IEnv* const Env; - unsigned const Count; - - TSpawnNopTasksSimpleTask(IEnv* env, unsigned count) - : Env(env) - , Count(count) + }; + + struct TSpawnNopTasksSimpleTask: public ISimpleTask { + IEnv* const Env; + unsigned const Count; + + TSpawnNopTasksSimpleTask(IEnv* env, unsigned count) + : Env(env) + , Count(count) { } - + std::array<TSubtaskCompletion, 2> Completion; - + TContinueFunc Start() override; - - TContinueFunc Join(); - }; - -} + + TContinueFunc Join(); + }; + +} diff --git a/library/cpp/messagebus/rain_check/test/helper/ya.make b/library/cpp/messagebus/rain_check/test/helper/ya.make index aa9e4e6d81..08265167a7 100644 --- a/library/cpp/messagebus/rain_check/test/helper/ya.make +++ b/library/cpp/messagebus/rain_check/test/helper/ya.make @@ -1,13 +1,13 @@ -LIBRARY(messagebus-rain_check-test-helper) - +LIBRARY(messagebus-rain_check-test-helper) + OWNER(g:messagebus) - + PEERDIR( library/cpp/messagebus/rain_check/core ) -SRCS( - misc.cpp -) - -END() +SRCS( + misc.cpp +) + +END() diff --git a/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp b/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp index 22edbd8c6b..d0c6451f47 100644 --- a/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp @@ -1,154 +1,154 @@ #include <library/cpp/messagebus/rain_check/test/helper/misc.h> - + #include <library/cpp/messagebus/rain_check/core/rain_check.h> - + #include <util/datetime/base.h> #include <array> - -using namespace NRainCheck; - -static const unsigned SUBTASKS = 2; - -struct TRainCheckPerftestEnv: public TSimpleEnvTemplate<TRainCheckPerftestEnv> { - unsigned SubtasksPerTask; - - TRainCheckPerftestEnv() - : TSimpleEnvTemplate<TRainCheckPerftestEnv>(4) - , SubtasksPerTask(1000) + +using namespace NRainCheck; + +static const unsigned SUBTASKS = 2; + +struct TRainCheckPerftestEnv: public TSimpleEnvTemplate<TRainCheckPerftestEnv> { + unsigned SubtasksPerTask; + + TRainCheckPerftestEnv() + : TSimpleEnvTemplate<TRainCheckPerftestEnv>(4) + , SubtasksPerTask(1000) { } -}; - -struct TCoroOuter: public ICoroTask { - TRainCheckPerftestEnv* const Env; - +}; + +struct TCoroOuter: public ICoroTask { + TRainCheckPerftestEnv* const Env; + TCoroOuter(TRainCheckPerftestEnv* env) : Env(env) { } - + void Run() override { - for (;;) { - TInstant start = TInstant::Now(); - - unsigned count = 0; - - unsigned current = 1000; - - do { - for (unsigned i = 0; i < current; ++i) { + for (;;) { + TInstant start = TInstant::Now(); + + unsigned count = 0; + + unsigned current = 1000; + + do { + for (unsigned i = 0; i < current; ++i) { std::array<TSubtaskCompletion, SUBTASKS> completion; - - for (unsigned j = 0; j < SUBTASKS; ++j) { - //SpawnSubtask<TNopSimpleTask>(Env, &completion[j]); - //SpawnSubtask<TSpawnNopTasksCoroTask>(Env, &completion[j], SUBTASKS); - SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &completion[j], SUBTASKS); - } - - WaitForSubtasks(); - } - - count += current; - current *= 2; - } while (TInstant::Now() - start < TDuration::Seconds(1)); - - TDuration d = TInstant::Now() - start; - unsigned dns = d.NanoSeconds() / count; - Cerr << dns << "ns per spawn/join\n"; - } - } -}; - -struct TSimpleOuter: public ISimpleTask { - TRainCheckPerftestEnv* const Env; - + + for (unsigned j = 0; j < SUBTASKS; ++j) { + //SpawnSubtask<TNopSimpleTask>(Env, &completion[j]); + //SpawnSubtask<TSpawnNopTasksCoroTask>(Env, &completion[j], SUBTASKS); + SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &completion[j], SUBTASKS); + } + + WaitForSubtasks(); + } + + count += current; + current *= 2; + } while (TInstant::Now() - start < TDuration::Seconds(1)); + + TDuration d = TInstant::Now() - start; + unsigned dns = d.NanoSeconds() / count; + Cerr << dns << "ns per spawn/join\n"; + } + } +}; + +struct TSimpleOuter: public ISimpleTask { + TRainCheckPerftestEnv* const Env; + TSimpleOuter(TRainCheckPerftestEnv* env, const void*) : Env(env) { } - - TInstant StartInstant; - unsigned Count; - unsigned Current; - unsigned I; - + + TInstant StartInstant; + unsigned Count; + unsigned Current; + unsigned I; + TContinueFunc Start() override { - StartInstant = TInstant::Now(); - Count = 0; - Current = 1000; - I = 0; - - return &TSimpleOuter::Spawn; - } - + StartInstant = TInstant::Now(); + Count = 0; + Current = 1000; + I = 0; + + return &TSimpleOuter::Spawn; + } + std::array<TSubtaskCompletion, SUBTASKS> Completion; - - TContinueFunc Spawn() { - for (unsigned j = 0; j < SUBTASKS; ++j) { - //SpawnSubtask<TNopSimpleTask>(Env, &Completion[j]); - //SpawnSubtask<TSpawnNopTasksCoroTask>(Env, &Completion[j], SUBTASKS); - SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &Completion[j], SUBTASKS); - } - - return &TSimpleOuter::Join; - } - - TContinueFunc Join() { - I += 1; - if (I != Current) { - return &TSimpleOuter::Spawn; - } - - I = 0; - Count += Current; - Current *= 2; - - TDuration d = TInstant::Now() - StartInstant; - if (d < TDuration::Seconds(1)) { - return &TSimpleOuter::Spawn; - } - - unsigned dns = d.NanoSeconds() / Count; - Cerr << dns << "ns per spawn/join\n"; - - return &TSimpleOuter::Start; - } -}; - -struct TReproduceCrashTask: public ISimpleTask { - TRainCheckPerftestEnv* const Env; - + + TContinueFunc Spawn() { + for (unsigned j = 0; j < SUBTASKS; ++j) { + //SpawnSubtask<TNopSimpleTask>(Env, &Completion[j]); + //SpawnSubtask<TSpawnNopTasksCoroTask>(Env, &Completion[j], SUBTASKS); + SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &Completion[j], SUBTASKS); + } + + return &TSimpleOuter::Join; + } + + TContinueFunc Join() { + I += 1; + if (I != Current) { + return &TSimpleOuter::Spawn; + } + + I = 0; + Count += Current; + Current *= 2; + + TDuration d = TInstant::Now() - StartInstant; + if (d < TDuration::Seconds(1)) { + return &TSimpleOuter::Spawn; + } + + unsigned dns = d.NanoSeconds() / Count; + Cerr << dns << "ns per spawn/join\n"; + + return &TSimpleOuter::Start; + } +}; + +struct TReproduceCrashTask: public ISimpleTask { + TRainCheckPerftestEnv* const Env; + TReproduceCrashTask(TRainCheckPerftestEnv* env) : Env(env) { } - + std::array<TSubtaskCompletion, SUBTASKS> Completion; - + TContinueFunc Start() override { - for (unsigned j = 0; j < 2; ++j) { - //SpawnSubtask<TNopSimpleTask>(Env, &Completion[j]); - SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &Completion[j], SUBTASKS); - } - - return &TReproduceCrashTask::Start; - } -}; - -int main(int argc, char** argv) { + for (unsigned j = 0; j < 2; ++j) { + //SpawnSubtask<TNopSimpleTask>(Env, &Completion[j]); + SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &Completion[j], SUBTASKS); + } + + return &TReproduceCrashTask::Start; + } +}; + +int main(int argc, char** argv) { Y_UNUSED(argc); Y_UNUSED(argv); - - TRainCheckPerftestEnv env; - - env.SpawnTask<TSimpleOuter>(""); - //env.SpawnTask<TCoroOuter>(); - //env.SpawnTask<TReproduceCrashTask>(); - - for (;;) { - Sleep(TDuration::Hours(1)); - } - - return 0; -} + + TRainCheckPerftestEnv env; + + env.SpawnTask<TSimpleOuter>(""); + //env.SpawnTask<TCoroOuter>(); + //env.SpawnTask<TReproduceCrashTask>(); + + for (;;) { + Sleep(TDuration::Hours(1)); + } + + return 0; +} diff --git a/library/cpp/messagebus/rain_check/test/perftest/ya.make b/library/cpp/messagebus/rain_check/test/perftest/ya.make index 7330a71700..f80ddf2c05 100644 --- a/library/cpp/messagebus/rain_check/test/perftest/ya.make +++ b/library/cpp/messagebus/rain_check/test/perftest/ya.make @@ -1,14 +1,14 @@ -PROGRAM(messagebus_rain_check_perftest) - +PROGRAM(messagebus_rain_check_perftest) + OWNER(g:messagebus) - -PEERDIR( + +PEERDIR( library/cpp/messagebus/rain_check/core library/cpp/messagebus/rain_check/test/helper -) - -SRCS( - perftest.cpp -) - -END() +) + +SRCS( + perftest.cpp +) + +END() diff --git a/library/cpp/messagebus/rain_check/test/ut/test.h b/library/cpp/messagebus/rain_check/test/ut/test.h index 724f6b7530..922f0f06cb 100644 --- a/library/cpp/messagebus/rain_check/test/ut/test.h +++ b/library/cpp/messagebus/rain_check/test/ut/test.h @@ -1,13 +1,13 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/rain_check/core/rain_check.h> #include <library/cpp/messagebus/misc/test_sync.h> - -template <typename TSelf> -struct TTestEnvTemplate: public NRainCheck::TSimpleEnvTemplate<TSelf> { - TTestSync TestSync; -}; - -struct TTestEnv: public TTestEnvTemplate<TTestEnv> { -}; + +template <typename TSelf> +struct TTestEnvTemplate: public NRainCheck::TSimpleEnvTemplate<TSelf> { + TTestSync TestSync; +}; + +struct TTestEnv: public TTestEnvTemplate<TTestEnv> { +}; diff --git a/library/cpp/messagebus/rain_check/test/ut/ya.make b/library/cpp/messagebus/rain_check/test/ut/ya.make index 9f7a93417a..6191fe9fe0 100644 --- a/library/cpp/messagebus/rain_check/test/ut/ya.make +++ b/library/cpp/messagebus/rain_check/test/ut/ya.make @@ -1,24 +1,24 @@ PROGRAM(library-messagebus-rain_check-test-ut) - + OWNER(g:messagebus) - -PEERDIR( + +PEERDIR( library/cpp/testing/unittest_main library/cpp/messagebus/rain_check/core library/cpp/messagebus/rain_check/http library/cpp/messagebus/rain_check/messagebus library/cpp/messagebus/test/helper -) - -SRCS( - ../../core/coro_ut.cpp - ../../core/simple_ut.cpp - ../../core/sleep_ut.cpp - ../../core/spawn_ut.cpp - ../../core/track_ut.cpp +) + +SRCS( + ../../core/coro_ut.cpp + ../../core/simple_ut.cpp + ../../core/sleep_ut.cpp + ../../core/spawn_ut.cpp + ../../core/track_ut.cpp ../../http/client_ut.cpp - ../../messagebus/messagebus_client_ut.cpp - ../../messagebus/messagebus_server_ut.cpp -) - -END() + ../../messagebus/messagebus_client_ut.cpp + ../../messagebus/messagebus_server_ut.cpp +) + +END() diff --git a/library/cpp/messagebus/rain_check/test/ya.make b/library/cpp/messagebus/rain_check/test/ya.make index 4c1d6f8161..83cdb16977 100644 --- a/library/cpp/messagebus/rain_check/test/ya.make +++ b/library/cpp/messagebus/rain_check/test/ya.make @@ -1,6 +1,6 @@ OWNER(g:messagebus) -RECURSE( +RECURSE( perftest ut -) +) diff --git a/library/cpp/messagebus/rain_check/ya.make b/library/cpp/messagebus/rain_check/ya.make index 966d54c232..c408615f42 100644 --- a/library/cpp/messagebus/rain_check/ya.make +++ b/library/cpp/messagebus/rain_check/ya.make @@ -1,8 +1,8 @@ OWNER(g:messagebus) -RECURSE( +RECURSE( core http messagebus test -) +) |