aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/rain_check
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/rain_check
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/rain_check')
-rw-r--r--library/cpp/messagebus/rain_check/core/coro.cpp90
-rw-r--r--library/cpp/messagebus/rain_check/core/coro.h82
-rw-r--r--library/cpp/messagebus/rain_check/core/coro_stack.cpp68
-rw-r--r--library/cpp/messagebus/rain_check/core/coro_stack.h44
-rw-r--r--library/cpp/messagebus/rain_check/core/coro_ut.cpp182
-rw-r--r--library/cpp/messagebus/rain_check/core/env.cpp6
-rw-r--r--library/cpp/messagebus/rain_check/core/env.h62
-rw-r--r--library/cpp/messagebus/rain_check/core/fwd.h36
-rw-r--r--library/cpp/messagebus/rain_check/core/rain_check.cpp2
-rw-r--r--library/cpp/messagebus/rain_check/core/rain_check.h12
-rw-r--r--library/cpp/messagebus/rain_check/core/simple.cpp32
-rw-r--r--library/cpp/messagebus/rain_check/core/simple.h98
-rw-r--r--library/cpp/messagebus/rain_check/core/simple_ut.cpp92
-rw-r--r--library/cpp/messagebus/rain_check/core/sleep.cpp84
-rw-r--r--library/cpp/messagebus/rain_check/core/sleep.h40
-rw-r--r--library/cpp/messagebus/rain_check/core/sleep_ut.cpp72
-rw-r--r--library/cpp/messagebus/rain_check/core/spawn.cpp8
-rw-r--r--library/cpp/messagebus/rain_check/core/spawn.h90
-rw-r--r--library/cpp/messagebus/rain_check/core/spawn_ut.cpp220
-rw-r--r--library/cpp/messagebus/rain_check/core/task.cpp342
-rw-r--r--library/cpp/messagebus/rain_check/core/task.h246
-rw-r--r--library/cpp/messagebus/rain_check/core/track.cpp116
-rw-r--r--library/cpp/messagebus/rain_check/core/track.h136
-rw-r--r--library/cpp/messagebus/rain_check/core/track_ut.cpp62
-rw-r--r--library/cpp/messagebus/rain_check/core/ya.make38
-rw-r--r--library/cpp/messagebus/rain_check/http/client_ut.cpp12
-rw-r--r--library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp128
-rw-r--r--library/cpp/messagebus/rain_check/messagebus/messagebus_client.h100
-rw-r--r--library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp246
-rw-r--r--library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp24
-rw-r--r--library/cpp/messagebus/rain_check/messagebus/messagebus_server.h64
-rw-r--r--library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp86
-rw-r--r--library/cpp/messagebus/rain_check/messagebus/ya.make24
-rw-r--r--library/cpp/messagebus/rain_check/test/helper/misc.cpp38
-rw-r--r--library/cpp/messagebus/rain_check/test/helper/misc.h78
-rw-r--r--library/cpp/messagebus/rain_check/test/helper/ya.make16
-rw-r--r--library/cpp/messagebus/rain_check/test/perftest/perftest.cpp254
-rw-r--r--library/cpp/messagebus/rain_check/test/perftest/ya.make22
-rw-r--r--library/cpp/messagebus/rain_check/test/ut/test.h20
-rw-r--r--library/cpp/messagebus/rain_check/test/ut/ya.make32
-rw-r--r--library/cpp/messagebus/rain_check/test/ya.make4
-rw-r--r--library/cpp/messagebus/rain_check/ya.make4
42 files changed, 1706 insertions, 1706 deletions
diff --git a/library/cpp/messagebus/rain_check/core/coro.cpp b/library/cpp/messagebus/rain_check/core/coro.cpp
index 500841dd5b..eda2fab402 100644
--- a/library/cpp/messagebus/rain_check/core/coro.cpp
+++ b/library/cpp/messagebus/rain_check/core/coro.cpp
@@ -1,60 +1,60 @@
#include "coro.h"
-
-#include "coro_stack.h"
-
+
+#include "coro_stack.h"
+
#include <util/system/tls.h>
#include <util/system/yassert.h>
-
-using namespace NRainCheck;
-
+
+using namespace NRainCheck;
+
TContClosure TCoroTaskRunner::ContClosure(TCoroTaskRunner* runner, TArrayRef<char> memRegion) {
- TContClosure contClosure;
- contClosure.TrampoLine = runner;
- contClosure.Stack = memRegion;
- return contClosure;
-}
-
-TCoroTaskRunner::TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl)
- : TTaskRunnerBase(env, parent, impl.Release())
- , Stack(GetImpl()->StackSize)
- , ContMachineContext(ContClosure(this, Stack.MemRegion()))
- , CoroDone(false)
-{
-}
-
-TCoroTaskRunner::~TCoroTaskRunner() {
+ TContClosure contClosure;
+ contClosure.TrampoLine = runner;
+ contClosure.Stack = memRegion;
+ return contClosure;
+}
+
+TCoroTaskRunner::TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl)
+ : TTaskRunnerBase(env, parent, impl.Release())
+ , Stack(GetImpl()->StackSize)
+ , ContMachineContext(ContClosure(this, Stack.MemRegion()))
+ , CoroDone(false)
+{
+}
+
+TCoroTaskRunner::~TCoroTaskRunner() {
Y_ASSERT(CoroDone);
-}
-
+}
+
Y_POD_STATIC_THREAD(TContMachineContext*)
CallerContext;
Y_POD_STATIC_THREAD(TCoroTaskRunner*)
Task;
-
+
bool TCoroTaskRunner::ReplyReceived() {
Y_ASSERT(!CoroDone);
-
- TContMachineContext me;
-
- CallerContext = &me;
- Task = this;
-
- me.SwitchTo(&ContMachineContext);
-
- Stack.VerifyNoStackOverflow();
-
+
+ TContMachineContext me;
+
+ CallerContext = &me;
+ Task = this;
+
+ me.SwitchTo(&ContMachineContext);
+
+ Stack.VerifyNoStackOverflow();
+
Y_ASSERT(CallerContext == &me);
Y_ASSERT(Task == this);
-
- return !CoroDone;
-}
-
+
+ return !CoroDone;
+}
+
void NRainCheck::TCoroTaskRunner::DoRun() {
- GetImpl()->Run();
- CoroDone = true;
- ContMachineContext.SwitchTo(CallerContext);
-}
-
+ GetImpl()->Run();
+ CoroDone = true;
+ ContMachineContext.SwitchTo(CallerContext);
+}
+
void NRainCheck::ICoroTask::WaitForSubtasks() {
- Task->ContMachineContext.SwitchTo(CallerContext);
-}
+ Task->ContMachineContext.SwitchTo(CallerContext);
+}
diff --git a/library/cpp/messagebus/rain_check/core/coro.h b/library/cpp/messagebus/rain_check/core/coro.h
index 95e2a30f9b..bf2fca54bd 100644
--- a/library/cpp/messagebus/rain_check/core/coro.h
+++ b/library/cpp/messagebus/rain_check/core/coro.h
@@ -1,58 +1,58 @@
-#pragma once
-
+#pragma once
+
#include "coro_stack.h"
#include "task.h"
#include <util/generic/ptr.h>
-#include <util/memory/alloc.h>
-#include <util/system/align.h>
+#include <util/memory/alloc.h>
+#include <util/system/align.h>
#include <util/system/context.h>
#include <util/system/valgrind.h>
-
-namespace NRainCheck {
- class ICoroTask;
-
- class TCoroTaskRunner: public TTaskRunnerBase, private ITrampoLine {
- friend class ICoroTask;
-
- private:
- NPrivate::TCoroStack Stack;
- TContMachineContext ContMachineContext;
- bool CoroDone;
-
- public:
- TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl);
+
+namespace NRainCheck {
+ class ICoroTask;
+
+ class TCoroTaskRunner: public TTaskRunnerBase, private ITrampoLine {
+ friend class ICoroTask;
+
+ private:
+ NPrivate::TCoroStack Stack;
+ TContMachineContext ContMachineContext;
+ bool CoroDone;
+
+ public:
+ TCoroTaskRunner(IEnv* env, ISubtaskListener* parent, TAutoPtr<ICoroTask> impl);
~TCoroTaskRunner() override;
-
- private:
+
+ private:
static TContClosure ContClosure(TCoroTaskRunner* runner, TArrayRef<char> memRegion);
-
+
bool ReplyReceived() override /* override */;
-
+
void DoRun() override /* override */;
-
+
ICoroTask* GetImpl() {
return (ICoroTask*)GetImplBase();
}
- };
-
- class ICoroTask: public ITaskBase {
- friend class TCoroTaskRunner;
-
- private:
- size_t StackSize;
-
- public:
- typedef TCoroTaskRunner TTaskRunner;
- typedef ICoroTask ITask;
-
+ };
+
+ class ICoroTask: public ITaskBase {
+ friend class TCoroTaskRunner;
+
+ private:
+ size_t StackSize;
+
+ public:
+ typedef TCoroTaskRunner TTaskRunner;
+ typedef ICoroTask ITask;
+
ICoroTask(size_t stackSize = 0x2000)
: StackSize(stackSize)
{
}
-
- virtual void Run() = 0;
- static void WaitForSubtasks();
- };
-
-}
+
+ virtual void Run() = 0;
+ static void WaitForSubtasks();
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.cpp b/library/cpp/messagebus/rain_check/core/coro_stack.cpp
index 83b984ca6e..888d965a23 100644
--- a/library/cpp/messagebus/rain_check/core/coro_stack.cpp
+++ b/library/cpp/messagebus/rain_check/core/coro_stack.cpp
@@ -1,41 +1,41 @@
#include "coro_stack.h"
-
-#include <util/generic/singleton.h>
-#include <util/system/valgrind.h>
-
+
+#include <util/generic/singleton.h>
+#include <util/system/valgrind.h>
+
#include <cstdlib>
#include <stdio.h>
-
-using namespace NRainCheck;
-using namespace NRainCheck::NPrivate;
-
-TCoroStack::TCoroStack(size_t size)
- : SizeValue(size)
-{
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+
+TCoroStack::TCoroStack(size_t size)
+ : SizeValue(size)
+{
Y_VERIFY(size % sizeof(ui32) == 0);
Y_VERIFY(size >= 0x1000);
-
- DataHolder.Reset(malloc(size));
-
- // register in valgrind
-
- *MagicNumberLocation() = MAGIC_NUMBER;
-
-#if defined(WITH_VALGRIND)
+
+ DataHolder.Reset(malloc(size));
+
+ // register in valgrind
+
+ *MagicNumberLocation() = MAGIC_NUMBER;
+
+#if defined(WITH_VALGRIND)
ValgrindStackId = VALGRIND_STACK_REGISTER(Data(), (char*)Data() + Size());
-#endif
-}
-
+#endif
+}
+
TCoroStack::~TCoroStack() {
-#if defined(WITH_VALGRIND)
- VALGRIND_STACK_DEREGISTER(ValgrindStackId);
-#endif
-
- VerifyNoStackOverflow();
-}
-
-void TCoroStack::FailStackOverflow() {
- static const char message[] = "stack overflow\n";
- fputs(message, stderr);
- abort();
-}
+#if defined(WITH_VALGRIND)
+ VALGRIND_STACK_DEREGISTER(ValgrindStackId);
+#endif
+
+ VerifyNoStackOverflow();
+}
+
+void TCoroStack::FailStackOverflow() {
+ static const char message[] = "stack overflow\n";
+ fputs(message, stderr);
+ abort();
+}
diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.h b/library/cpp/messagebus/rain_check/core/coro_stack.h
index 2f3520e6e4..41ac786470 100644
--- a/library/cpp/messagebus/rain_check/core/coro_stack.h
+++ b/library/cpp/messagebus/rain_check/core/coro_stack.h
@@ -1,54 +1,54 @@
-#pragma once
-
+#pragma once
+
#include <util/generic/array_ref.h>
#include <util/generic/ptr.h>
-#include <util/system/valgrind.h>
-
+#include <util/system/valgrind.h>
+
namespace NRainCheck {
namespace NPrivate {
struct TCoroStack {
THolder<void, TFree> DataHolder;
size_t SizeValue;
-
-#if defined(WITH_VALGRIND)
+
+#if defined(WITH_VALGRIND)
size_t ValgrindStackId;
-#endif
-
+#endif
+
TCoroStack(size_t size);
~TCoroStack();
-
+
void* Data() {
return DataHolder.Get();
}
-
+
size_t Size() {
return SizeValue;
}
-
+
TArrayRef<char> MemRegion() {
return TArrayRef((char*)Data(), Size());
}
-
+
ui32* MagicNumberLocation() {
-#if STACK_GROW_DOWN == 1
+#if STACK_GROW_DOWN == 1
return (ui32*)Data();
-#elif STACK_GROW_DOWN == 0
+#elif STACK_GROW_DOWN == 0
return ((ui32*)(((char*)Data()) + Size())) - 1;
-#else
-#error "unknown"
-#endif
+#else
+#error "unknown"
+#endif
}
-
+
static void FailStackOverflow();
-
+
inline void VerifyNoStackOverflow() noexcept {
if (Y_UNLIKELY(*MagicNumberLocation() != MAGIC_NUMBER)) {
FailStackOverflow();
}
- }
-
+ }
+
static const ui32 MAGIC_NUMBER = 0xAB4D15FE;
};
-
+
}
}
diff --git a/library/cpp/messagebus/rain_check/core/coro_ut.cpp b/library/cpp/messagebus/rain_check/core/coro_ut.cpp
index 61a33584a5..4ee688f4c1 100644
--- a/library/cpp/messagebus/rain_check/core/coro_ut.cpp
+++ b/library/cpp/messagebus/rain_check/core/coro_ut.cpp
@@ -1,106 +1,106 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "coro.h"
-#include "spawn.h"
-
+#include "spawn.h"
+
#include <library/cpp/messagebus/rain_check/test/ut/test.h>
-
-using namespace NRainCheck;
-
+
+using namespace NRainCheck;
+
Y_UNIT_TEST_SUITE(RainCheckCoro) {
struct TSimpleCoroTask : ICoroTask {
- TTestSync* const TestSync;
-
- TSimpleCoroTask(TTestEnv*, TTestSync* testSync)
- : TestSync(testSync)
- {
- }
-
+ TTestSync* const TestSync;
+
+ TSimpleCoroTask(TTestEnv*, TTestSync* testSync)
+ : TestSync(testSync)
+ {
+ }
+
void Run() override {
- TestSync->WaitForAndIncrement(0);
- }
- };
-
+ TestSync->WaitForAndIncrement(0);
+ }
+ };
+
Y_UNIT_TEST(Simple) {
- TTestSync testSync;
-
- TTestEnv env;
-
- TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSimpleCoroTask>(&testSync);
- testSync.WaitForAndIncrement(1);
- }
-
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSimpleCoroTask>(&testSync);
+ testSync.WaitForAndIncrement(1);
+ }
+
struct TSleepCoroTask : ICoroTask {
- TTestEnv* const Env;
- TTestSync* const TestSync;
-
- TSleepCoroTask(TTestEnv* env, TTestSync* testSync)
- : Env(env)
- , TestSync(testSync)
- {
- }
-
- TSubtaskCompletion SleepCompletion;
-
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSleepCoroTask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
+ TSubtaskCompletion SleepCompletion;
+
void Run() override {
- Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1));
- WaitForSubtasks();
- TestSync->WaitForAndIncrement(0);
- }
- };
-
+ Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1));
+ WaitForSubtasks();
+ TestSync->WaitForAndIncrement(0);
+ }
+ };
+
Y_UNIT_TEST(Sleep) {
- TTestSync testSync;
-
- TTestEnv env;
-
- TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSleepCoroTask>(&testSync);
-
- testSync.WaitForAndIncrement(1);
- }
-
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSleepCoroTask>(&testSync);
+
+ testSync.WaitForAndIncrement(1);
+ }
+
struct TSubtask : ICoroTask {
- TTestEnv* const Env;
- TTestSync* const TestSync;
-
- TSubtask(TTestEnv* env, TTestSync* testSync)
- : Env(env)
- , TestSync(testSync)
- {
- }
-
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSubtask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
void Run() override {
- TestSync->CheckAndIncrement(1);
- }
- };
-
+ TestSync->CheckAndIncrement(1);
+ }
+ };
+
struct TSpawnCoroTask : ICoroTask {
- TTestEnv* const Env;
- TTestSync* const TestSync;
-
- TSpawnCoroTask(TTestEnv* env, TTestSync* testSync)
- : Env(env)
- , TestSync(testSync)
- {
- }
-
- TSubtaskCompletion SubtaskCompletion;
-
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSpawnCoroTask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
+ {
+ }
+
+ TSubtaskCompletion SubtaskCompletion;
+
void Run() override {
- TestSync->CheckAndIncrement(0);
- SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync);
- WaitForSubtasks();
- TestSync->CheckAndIncrement(2);
- }
- };
-
+ TestSync->CheckAndIncrement(0);
+ SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync);
+ WaitForSubtasks();
+ TestSync->CheckAndIncrement(2);
+ }
+ };
+
Y_UNIT_TEST(Spawn) {
- TTestSync testSync;
-
- TTestEnv env;
-
- TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSpawnCoroTask>(&testSync);
-
- testSync.WaitForAndIncrement(3);
- }
-}
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TCoroTaskRunner> task = env.SpawnTask<TSpawnCoroTask>(&testSync);
+
+ testSync.WaitForAndIncrement(3);
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/env.cpp b/library/cpp/messagebus/rain_check/core/env.cpp
index fdc0000dbd..150d63d9bb 100644
--- a/library/cpp/messagebus/rain_check/core/env.cpp
+++ b/library/cpp/messagebus/rain_check/core/env.cpp
@@ -1,3 +1,3 @@
-#include "env.h"
-
-using namespace NRainCheck;
+#include "env.h"
+
+using namespace NRainCheck;
diff --git a/library/cpp/messagebus/rain_check/core/env.h b/library/cpp/messagebus/rain_check/core/env.h
index f6dd7fceb6..4e289dbd3d 100644
--- a/library/cpp/messagebus/rain_check/core/env.h
+++ b/library/cpp/messagebus/rain_check/core/env.h
@@ -1,47 +1,47 @@
-#pragma once
-
+#pragma once
+
#include "sleep.h"
#include "spawn.h"
-
+
#include <library/cpp/messagebus/actor/executor.h>
-
+
#include <util/generic/ptr.h>
-
-namespace NRainCheck {
- struct IEnv {
- virtual ::NActor::TExecutor* GetExecutor() = 0;
+
+namespace NRainCheck {
+ struct IEnv {
+ virtual ::NActor::TExecutor* GetExecutor() = 0;
virtual ~IEnv() {
}
- };
-
- template <typename TSelf>
- struct TEnvTemplate: public IEnv {
- template <typename TTask, typename TParam>
- TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TParam param) {
+ };
+
+ template <typename TSelf>
+ struct TEnvTemplate: public IEnv {
+ template <typename TTask, typename TParam>
+ TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TParam param) {
return ::NRainCheck::SpawnTask<TTask, TSelf>((TSelf*)this, param);
- }
- };
-
- template <typename TSelf>
- struct TSimpleEnvTemplate: public TEnvTemplate<TSelf> {
- ::NActor::TExecutorPtr Executor;
- TSleepService SleepService;
-
- TSimpleEnvTemplate(unsigned threadCount = 0)
- : Executor(new ::NActor::TExecutor(threadCount != 0 ? threadCount : 4))
+ }
+ };
+
+ template <typename TSelf>
+ struct TSimpleEnvTemplate: public TEnvTemplate<TSelf> {
+ ::NActor::TExecutorPtr Executor;
+ TSleepService SleepService;
+
+ TSimpleEnvTemplate(unsigned threadCount = 0)
+ : Executor(new ::NActor::TExecutor(threadCount != 0 ? threadCount : 4))
{
}
-
+
::NActor::TExecutor* GetExecutor() override {
return Executor.Get();
}
- };
-
- struct TSimpleEnv: public TSimpleEnvTemplate<TSimpleEnv> {
+ };
+
+ struct TSimpleEnv: public TSimpleEnvTemplate<TSimpleEnv> {
TSimpleEnv(unsigned threadCount = 0)
: TSimpleEnvTemplate<TSimpleEnv>(threadCount)
{
}
- };
-
-}
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/fwd.h b/library/cpp/messagebus/rain_check/core/fwd.h
index b43ff8c17c..2f8f1d4754 100644
--- a/library/cpp/messagebus/rain_check/core/fwd.h
+++ b/library/cpp/messagebus/rain_check/core/fwd.h
@@ -1,18 +1,18 @@
-#pragma once
-
-namespace NRainCheck {
- namespace NPrivate {
- }
-
- class ITaskBase;
- class ISimpleTask;
- class ICoroTask;
-
- struct ISubtaskListener;
-
- class TTaskRunnerBase;
-
- class TSubtaskCompletion;
- struct IEnv;
-
-}
+#pragma once
+
+namespace NRainCheck {
+ namespace NPrivate {
+ }
+
+ class ITaskBase;
+ class ISimpleTask;
+ class ICoroTask;
+
+ struct ISubtaskListener;
+
+ class TTaskRunnerBase;
+
+ class TSubtaskCompletion;
+ struct IEnv;
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/rain_check.cpp b/library/cpp/messagebus/rain_check/core/rain_check.cpp
index 2ea1f9e21b..63bc300554 100644
--- a/library/cpp/messagebus/rain_check/core/rain_check.cpp
+++ b/library/cpp/messagebus/rain_check/core/rain_check.cpp
@@ -1 +1 @@
-#include "rain_check.h"
+#include "rain_check.h"
diff --git a/library/cpp/messagebus/rain_check/core/rain_check.h b/library/cpp/messagebus/rain_check/core/rain_check.h
index 0f289717a2..a97de4537e 100644
--- a/library/cpp/messagebus/rain_check/core/rain_check.h
+++ b/library/cpp/messagebus/rain_check/core/rain_check.h
@@ -1,8 +1,8 @@
-#pragma once
-
-#include "coro.h"
+#pragma once
+
+#include "coro.h"
#include "env.h"
-#include "simple.h"
-#include "sleep.h"
-#include "spawn.h"
+#include "simple.h"
+#include "sleep.h"
+#include "spawn.h"
#include "task.h"
diff --git a/library/cpp/messagebus/rain_check/core/simple.cpp b/library/cpp/messagebus/rain_check/core/simple.cpp
index 70182b2f93..8dc71a84ee 100644
--- a/library/cpp/messagebus/rain_check/core/simple.cpp
+++ b/library/cpp/messagebus/rain_check/core/simple.cpp
@@ -1,18 +1,18 @@
-#include "simple.h"
-
-using namespace NRainCheck;
-
-TSimpleTaskRunner::TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask> impl)
- : TTaskRunnerBase(env, parentTask, impl.Release())
- , ContinueFunc(&ISimpleTask::Start)
-{
-}
-
-TSimpleTaskRunner::~TSimpleTaskRunner() {
+#include "simple.h"
+
+using namespace NRainCheck;
+
+TSimpleTaskRunner::TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask> impl)
+ : TTaskRunnerBase(env, parentTask, impl.Release())
+ , ContinueFunc(&ISimpleTask::Start)
+{
+}
+
+TSimpleTaskRunner::~TSimpleTaskRunner() {
Y_ASSERT(!ContinueFunc);
-}
-
+}
+
bool TSimpleTaskRunner::ReplyReceived() {
- ContinueFunc = (GetImpl()->*(ContinueFunc.Func))();
- return !!ContinueFunc;
-}
+ ContinueFunc = (GetImpl()->*(ContinueFunc.Func))();
+ return !!ContinueFunc;
+}
diff --git a/library/cpp/messagebus/rain_check/core/simple.h b/library/cpp/messagebus/rain_check/core/simple.h
index 20e1bf19f5..eeee4c8c23 100644
--- a/library/cpp/messagebus/rain_check/core/simple.h
+++ b/library/cpp/messagebus/rain_check/core/simple.h
@@ -1,62 +1,62 @@
-#pragma once
-
-#include "task.h"
-
-namespace NRainCheck {
- class ISimpleTask;
-
- // Function called on continue
- class TContinueFunc {
- friend class TSimpleTaskRunner;
-
- typedef TContinueFunc (ISimpleTask::*TFunc)();
- TFunc Func;
-
- public:
- TContinueFunc()
+#pragma once
+
+#include "task.h"
+
+namespace NRainCheck {
+ class ISimpleTask;
+
+ // Function called on continue
+ class TContinueFunc {
+ friend class TSimpleTaskRunner;
+
+ typedef TContinueFunc (ISimpleTask::*TFunc)();
+ TFunc Func;
+
+ public:
+ TContinueFunc()
: Func(nullptr)
{
}
-
- TContinueFunc(void*)
+
+ TContinueFunc(void*)
: Func(nullptr)
{
}
-
- template <typename TTask>
- TContinueFunc(TContinueFunc (TTask::*func)())
+
+ template <typename TTask>
+ TContinueFunc(TContinueFunc (TTask::*func)())
: Func((TFunc)func)
- {
+ {
static_assert((std::is_base_of<ISimpleTask, TTask>::value), "expect (std::is_base_of<ISimpleTask, TTask>::value)");
- }
-
- bool operator!() const {
- return !Func;
- }
- };
-
- class TSimpleTaskRunner: public TTaskRunnerBase {
- public:
- TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask>);
+ }
+
+ bool operator!() const {
+ return !Func;
+ }
+ };
+
+ class TSimpleTaskRunner: public TTaskRunnerBase {
+ public:
+ TSimpleTaskRunner(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ISimpleTask>);
~TSimpleTaskRunner() override;
-
- private:
- // Function to be called on completion of all pending tasks.
- TContinueFunc ContinueFunc;
-
+
+ private:
+ // Function to be called on completion of all pending tasks.
+ TContinueFunc ContinueFunc;
+
bool ReplyReceived() override /* override */;
-
+
ISimpleTask* GetImpl() {
return (ISimpleTask*)GetImplBase();
}
- };
-
- class ISimpleTask: public ITaskBase {
- public:
- typedef TSimpleTaskRunner TTaskRunner;
- typedef ISimpleTask ITask;
-
- virtual TContinueFunc Start() = 0;
- };
-
-}
+ };
+
+ class ISimpleTask: public ITaskBase {
+ public:
+ typedef TSimpleTaskRunner TTaskRunner;
+ typedef ISimpleTask ITask;
+
+ virtual TContinueFunc Start() = 0;
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/simple_ut.cpp b/library/cpp/messagebus/rain_check/core/simple_ut.cpp
index d4545e05aa..97b5db2d89 100644
--- a/library/cpp/messagebus/rain_check/core/simple_ut.cpp
+++ b/library/cpp/messagebus/rain_check/core/simple_ut.cpp
@@ -1,59 +1,59 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <library/cpp/messagebus/rain_check/test/ut/test.h>
#include <library/cpp/messagebus/latch.h>
-
+
#include <util/system/event.h>
-
-using namespace NRainCheck;
-
+
+using namespace NRainCheck;
+
Y_UNIT_TEST_SUITE(RainCheckSimple) {
- struct TTaskWithCompletionCallback: public ISimpleTask {
- TTestEnv* const Env;
- TTestSync* const TestSync;
-
- TTaskWithCompletionCallback(TTestEnv* env, TTestSync* testSync)
- : Env(env)
- , TestSync(testSync)
+ struct TTaskWithCompletionCallback: public ISimpleTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TTaskWithCompletionCallback(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
{
}
-
- TSubtaskCompletion SleepCompletion;
-
+
+ TSubtaskCompletion SleepCompletion;
+
TContinueFunc Start() override {
- TestSync->CheckAndIncrement(0);
-
- Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1));
- SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::SleepCompletionCallback);
-
- return &TTaskWithCompletionCallback::Last;
- }
-
- void SleepCompletionCallback(TSubtaskCompletion* completion) {
+ TestSync->CheckAndIncrement(0);
+
+ Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1));
+ SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::SleepCompletionCallback);
+
+ return &TTaskWithCompletionCallback::Last;
+ }
+
+ void SleepCompletionCallback(TSubtaskCompletion* completion) {
Y_VERIFY(completion == &SleepCompletion);
- TestSync->CheckAndIncrement(1);
-
- Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1));
- SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::NextSleepCompletionCallback);
- }
-
+ TestSync->CheckAndIncrement(1);
+
+ Env->SleepService.Sleep(&SleepCompletion, TDuration::MilliSeconds(1));
+ SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::NextSleepCompletionCallback);
+ }
+
void NextSleepCompletionCallback(TSubtaskCompletion*) {
- TestSync->CheckAndIncrement(2);
- }
-
- TContinueFunc Last() {
- TestSync->CheckAndIncrement(3);
+ TestSync->CheckAndIncrement(2);
+ }
+
+ TContinueFunc Last() {
+ TestSync->CheckAndIncrement(3);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(CompletionCallback) {
- TTestEnv env;
- TTestSync testSync;
-
- env.SpawnTask<TTaskWithCompletionCallback>(&testSync);
-
- testSync.WaitForAndIncrement(4);
- }
-}
+ TTestEnv env;
+ TTestSync testSync;
+
+ env.SpawnTask<TTaskWithCompletionCallback>(&testSync);
+
+ testSync.WaitForAndIncrement(4);
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/sleep.cpp b/library/cpp/messagebus/rain_check/core/sleep.cpp
index f5d0b4cac9..10b875bc79 100644
--- a/library/cpp/messagebus/rain_check/core/sleep.cpp
+++ b/library/cpp/messagebus/rain_check/core/sleep.cpp
@@ -1,47 +1,47 @@
#include "rain_check.h"
-#include <util/system/yassert.h>
-
-using namespace NRainCheck;
-using namespace NRainCheck::NPrivate;
-using namespace NBus;
-using namespace NBus::NPrivate;
-
-TSleepService::TSleepService(::NBus::NPrivate::TScheduler* scheduler)
- : Scheduler(scheduler)
-{
-}
-
-NRainCheck::TSleepService::TSleepService()
- : SchedulerHolder(new TScheduler)
- , Scheduler(SchedulerHolder.Get())
-{
-}
-
+#include <util/system/yassert.h>
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TSleepService::TSleepService(::NBus::NPrivate::TScheduler* scheduler)
+ : Scheduler(scheduler)
+{
+}
+
+NRainCheck::TSleepService::TSleepService()
+ : SchedulerHolder(new TScheduler)
+ , Scheduler(SchedulerHolder.Get())
+{
+}
+
NRainCheck::TSleepService::~TSleepService() {
- if (!!SchedulerHolder) {
- Scheduler->Stop();
- }
-}
-
-namespace {
- struct TSleepServiceScheduleItem: public IScheduleItem {
- ISubtaskListener* const Parent;
-
- TSleepServiceScheduleItem(ISubtaskListener* parent, TInstant time)
- : IScheduleItem(time)
- , Parent(parent)
- {
- }
-
+ if (!!SchedulerHolder) {
+ Scheduler->Stop();
+ }
+}
+
+namespace {
+ struct TSleepServiceScheduleItem: public IScheduleItem {
+ ISubtaskListener* const Parent;
+
+ TSleepServiceScheduleItem(ISubtaskListener* parent, TInstant time)
+ : IScheduleItem(time)
+ , Parent(parent)
+ {
+ }
+
void Do() override {
- Parent->SetDone();
- }
- };
-}
-
+ Parent->SetDone();
+ }
+ };
+}
+
void TSleepService::Sleep(TSubtaskCompletion* r, TDuration duration) {
- TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
- r->SetRunning(current);
- Scheduler->Schedule(new TSleepServiceScheduleItem(r, TInstant::Now() + duration));
-}
+ TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
+ r->SetRunning(current);
+ Scheduler->Schedule(new TSleepServiceScheduleItem(r, TInstant::Now() + duration));
+}
diff --git a/library/cpp/messagebus/rain_check/core/sleep.h b/library/cpp/messagebus/rain_check/core/sleep.h
index 1a7a1f8674..b5b343de98 100644
--- a/library/cpp/messagebus/rain_check/core/sleep.h
+++ b/library/cpp/messagebus/rain_check/core/sleep.h
@@ -1,24 +1,24 @@
-#pragma once
-
+#pragma once
+
#include "fwd.h"
-
+
#include <library/cpp/messagebus/scheduler/scheduler.h>
-
+
#include <util/datetime/base.h>
+
+namespace NRainCheck {
+ class TSleepService {
+ private:
+ THolder< ::NBus::NPrivate::TScheduler> SchedulerHolder;
+ ::NBus::NPrivate::TScheduler* const Scheduler;
-namespace NRainCheck {
- class TSleepService {
- private:
- THolder< ::NBus::NPrivate::TScheduler> SchedulerHolder;
- ::NBus::NPrivate::TScheduler* const Scheduler;
-
- public:
- TSleepService(::NBus::NPrivate::TScheduler*);
- TSleepService();
- ~TSleepService();
-
- // Wake up a task after given duration.
- void Sleep(TSubtaskCompletion* r, TDuration);
- };
-
-}
+ public:
+ TSleepService(::NBus::NPrivate::TScheduler*);
+ TSleepService();
+ ~TSleepService();
+
+ // Wake up a task after given duration.
+ void Sleep(TSubtaskCompletion* r, TDuration);
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/sleep_ut.cpp b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp
index 2ae85a87b1..3c92fa2ca7 100644
--- a/library/cpp/messagebus/rain_check/core/sleep_ut.cpp
+++ b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp
@@ -1,46 +1,46 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <library/cpp/messagebus/rain_check/test/ut/test.h>
-
+
#include <util/system/event.h>
-using namespace NRainCheck;
-using namespace NActor;
-
+using namespace NRainCheck;
+using namespace NActor;
+
Y_UNIT_TEST_SUITE(Sleep) {
- struct TTestTask: public ISimpleTask {
- TSimpleEnv* const Env;
- TTestSync* const TestSync;
-
- TTestTask(TSimpleEnv* env, TTestSync* testSync)
- : Env(env)
- , TestSync(testSync)
+ struct TTestTask: public ISimpleTask {
+ TSimpleEnv* const Env;
+ TTestSync* const TestSync;
+
+ TTestTask(TSimpleEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
{
}
-
- TSubtaskCompletion Sleep;
-
+
+ TSubtaskCompletion Sleep;
+
TContinueFunc Start() override {
- Env->SleepService.Sleep(&Sleep, TDuration::MilliSeconds(1));
-
- TestSync->CheckAndIncrement(0);
-
- return &TTestTask::Continue;
- }
-
- TContinueFunc Continue() {
- TestSync->CheckAndIncrement(1);
+ Env->SleepService.Sleep(&Sleep, TDuration::MilliSeconds(1));
+
+ TestSync->CheckAndIncrement(0);
+
+ return &TTestTask::Continue;
+ }
+
+ TContinueFunc Continue() {
+ TestSync->CheckAndIncrement(1);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(Test) {
- TTestSync testSync;
-
- TSimpleEnv env;
-
- TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync);
-
- testSync.WaitForAndIncrement(2);
- }
-}
+ TTestSync testSync;
+
+ TSimpleEnv env;
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync);
+
+ testSync.WaitForAndIncrement(2);
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/spawn.cpp b/library/cpp/messagebus/rain_check/core/spawn.cpp
index c570355fbe..d8fc78c129 100644
--- a/library/cpp/messagebus/rain_check/core/spawn.cpp
+++ b/library/cpp/messagebus/rain_check/core/spawn.cpp
@@ -1,5 +1,5 @@
-#include "spawn.h"
-
+#include "spawn.h"
+
void NRainCheck::NPrivate::SpawnTaskImpl(TTaskRunnerBase* task) {
- task->Schedule();
-}
+ task->Schedule();
+}
diff --git a/library/cpp/messagebus/rain_check/core/spawn.h b/library/cpp/messagebus/rain_check/core/spawn.h
index f2b146bf29..33ba955e0a 100644
--- a/library/cpp/messagebus/rain_check/core/spawn.h
+++ b/library/cpp/messagebus/rain_check/core/spawn.h
@@ -1,50 +1,50 @@
-#pragma once
-
-#include "coro.h"
-#include "simple.h"
+#pragma once
+
+#include "coro.h"
+#include "simple.h"
#include "task.h"
-
-namespace NRainCheck {
- namespace NPrivate {
- void SpawnTaskImpl(TTaskRunnerBase* task);
-
- template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam>
- TIntrusivePtr<TRunner> SpawnTaskWithRunner(TEnv* env, TParam param1, ISubtaskListener* subtaskListener) {
+
+namespace NRainCheck {
+ namespace NPrivate {
+ void SpawnTaskImpl(TTaskRunnerBase* task);
+
+ template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam>
+ TIntrusivePtr<TRunner> SpawnTaskWithRunner(TEnv* env, TParam param1, ISubtaskListener* subtaskListener) {
static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)");
- TIntrusivePtr<TRunner> task(new TRunner(env, subtaskListener, new TTask(env, param1)));
- NPrivate::SpawnTaskImpl(task.Get());
- return task;
- }
-
- template <typename TTask, typename ITask, typename TRunner, typename TEnv>
- void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion) {
+ TIntrusivePtr<TRunner> task(new TRunner(env, subtaskListener, new TTask(env, param1)));
+ NPrivate::SpawnTaskImpl(task.Get());
+ return task;
+ }
+
+ template <typename TTask, typename ITask, typename TRunner, typename TEnv>
+ void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion) {
static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)");
- TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
- completion->SetRunning(current);
- NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env)));
- }
-
- template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam>
- void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion, TParam param) {
+ TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
+ completion->SetRunning(current);
+ NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env)));
+ }
+
+ template <typename TTask, typename ITask, typename TRunner, typename TEnv, typename TParam>
+ void SpawnSubtaskWithRunner(TEnv* env, TSubtaskCompletion* completion, TParam param) {
static_assert((std::is_base_of<ITask, TTask>::value), "expect (std::is_base_of<ITask, TTask>::value)");
- TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
- completion->SetRunning(current);
- NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env, param)));
- }
-
- }
-
- // Instantiate and start a task with given parameter.
- template <typename TTask, typename TEnv, typename TParam>
- TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TEnv* env, TParam param1, ISubtaskListener* subtaskListener = &TNopSubtaskListener::Instance) {
- return NPrivate::SpawnTaskWithRunner<
+ TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
+ completion->SetRunning(current);
+ NPrivate::SpawnTaskImpl(new TRunner(env, completion, new TTask(env, param)));
+ }
+
+ }
+
+ // Instantiate and start a task with given parameter.
+ template <typename TTask, typename TEnv, typename TParam>
+ TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TEnv* env, TParam param1, ISubtaskListener* subtaskListener = &TNopSubtaskListener::Instance) {
+ return NPrivate::SpawnTaskWithRunner<
TTask, typename TTask::ITask, typename TTask::TTaskRunner, TEnv, TParam>(env, param1, subtaskListener);
- }
-
- // Instantiate and start subtask of given task.
- template <typename TTask, typename TEnv, typename TParam>
- void SpawnSubtask(TEnv* env, TSubtaskCompletion* completion, TParam param) {
- return NPrivate::SpawnSubtaskWithRunner<TTask, typename TTask::ITask, typename TTask::TTaskRunner>(env, completion, param);
- }
-
-}
+ }
+
+ // Instantiate and start subtask of given task.
+ template <typename TTask, typename TEnv, typename TParam>
+ void SpawnSubtask(TEnv* env, TSubtaskCompletion* completion, TParam param) {
+ return NPrivate::SpawnSubtaskWithRunner<TTask, typename TTask::ITask, typename TTask::TTaskRunner>(env, completion, param);
+ }
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/spawn_ut.cpp b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp
index ba5a5e41cf..2b3ef75c67 100644
--- a/library/cpp/messagebus/rain_check/core/spawn_ut.cpp
+++ b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp
@@ -1,145 +1,145 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <library/cpp/messagebus/rain_check/test/helper/misc.h>
#include <library/cpp/messagebus/rain_check/test/ut/test.h>
#include <library/cpp/messagebus/latch.h>
-
+
#include <util/system/event.h>
-
+
#include <array>
-
-using namespace NRainCheck;
-using namespace NActor;
-
+
+using namespace NRainCheck;
+using namespace NActor;
+
Y_UNIT_TEST_SUITE(Spawn) {
- struct TTestTask: public ISimpleTask {
- TTestSync* const TestSync;
-
- TTestTask(TSimpleEnv*, TTestSync* testSync)
- : TestSync(testSync)
- , I(0)
+ struct TTestTask: public ISimpleTask {
+ TTestSync* const TestSync;
+
+ TTestTask(TSimpleEnv*, TTestSync* testSync)
+ : TestSync(testSync)
+ , I(0)
{
}
-
+
TSystemEvent Started;
-
- unsigned I;
-
+
+ unsigned I;
+
TContinueFunc Start() override {
- if (I < 4) {
- I += 1;
- return &TTestTask::Start;
- }
- TestSync->CheckAndIncrement(0);
- return &TTestTask::Continue;
- }
-
- TContinueFunc Continue() {
- TestSync->CheckAndIncrement(1);
-
- Started.Signal();
+ if (I < 4) {
+ I += 1;
+ return &TTestTask::Start;
+ }
+ TestSync->CheckAndIncrement(0);
+ return &TTestTask::Continue;
+ }
+
+ TContinueFunc Continue() {
+ TestSync->CheckAndIncrement(1);
+
+ Started.Signal();
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(Continuation) {
- TTestSync testSync;
-
- TSimpleEnv env;
-
- TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync);
-
- testSync.WaitForAndIncrement(2);
- }
-
- struct TSubtask: public ISimpleTask {
- TTestEnv* const Env;
- TTestSync* const TestSync;
-
- TSubtask(TTestEnv* env, TTestSync* testSync)
- : Env(env)
- , TestSync(testSync)
+ TTestSync testSync;
+
+ TSimpleEnv env;
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TTestTask>(&testSync);
+
+ testSync.WaitForAndIncrement(2);
+ }
+
+ struct TSubtask: public ISimpleTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSubtask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
{
}
-
+
TContinueFunc Start() override {
- Sleep(TDuration::MilliSeconds(1));
- TestSync->CheckAndIncrement(1);
+ Sleep(TDuration::MilliSeconds(1));
+ TestSync->CheckAndIncrement(1);
return nullptr;
- }
- };
-
- struct TSpawnTask: public ISimpleTask {
- TTestEnv* const Env;
- TTestSync* const TestSync;
-
- TSpawnTask(TTestEnv* env, TTestSync* testSync)
- : Env(env)
- , TestSync(testSync)
+ }
+ };
+
+ struct TSpawnTask: public ISimpleTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+
+ TSpawnTask(TTestEnv* env, TTestSync* testSync)
+ : Env(env)
+ , TestSync(testSync)
{
}
-
- TSubtaskCompletion SubtaskCompletion;
-
+
+ TSubtaskCompletion SubtaskCompletion;
+
TContinueFunc Start() override {
- TestSync->CheckAndIncrement(0);
- SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync);
- return &TSpawnTask::Continue;
- }
-
- TContinueFunc Continue() {
- TestSync->CheckAndIncrement(2);
+ TestSync->CheckAndIncrement(0);
+ SpawnSubtask<TSubtask>(Env, &SubtaskCompletion, TestSync);
+ return &TSpawnTask::Continue;
+ }
+
+ TContinueFunc Continue() {
+ TestSync->CheckAndIncrement(2);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(Subtask) {
- TTestSync testSync;
-
- TTestEnv env;
-
- TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnTask>(&testSync);
-
- testSync.WaitForAndIncrement(3);
- }
-
- struct TSpawnLongTask: public ISimpleTask {
- TTestEnv* const Env;
- TTestSync* const TestSync;
- unsigned I;
-
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnTask>(&testSync);
+
+ testSync.WaitForAndIncrement(3);
+ }
+
+ struct TSpawnLongTask: public ISimpleTask {
+ TTestEnv* const Env;
+ TTestSync* const TestSync;
+ unsigned I;
+
TSpawnLongTask(TTestEnv* env, TTestSync* testSync)
: Env(env)
, TestSync(testSync)
, I(0)
{
}
-
+
std::array<TSubtaskCompletion, 3> Subtasks;
-
+
TContinueFunc Start() override {
- if (I == 1000) {
- TestSync->CheckAndIncrement(0);
+ if (I == 1000) {
+ TestSync->CheckAndIncrement(0);
return nullptr;
- }
-
+ }
+
for (auto& subtask : Subtasks) {
SpawnSubtask<TNopSimpleTask>(Env, &subtask, "");
- }
-
- ++I;
- return &TSpawnLongTask::Start;
- }
- };
-
+ }
+
+ ++I;
+ return &TSpawnLongTask::Start;
+ }
+ };
+
Y_UNIT_TEST(SubtaskLong) {
- TTestSync testSync;
-
- TTestEnv env;
-
- TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnLongTask>(&testSync);
-
- testSync.WaitForAndIncrement(1);
- }
-}
+ TTestSync testSync;
+
+ TTestEnv env;
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSpawnLongTask>(&testSync);
+
+ testSync.WaitForAndIncrement(1);
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp
index a098437d53..d20ae30402 100644
--- a/library/cpp/messagebus/rain_check/core/task.cpp
+++ b/library/cpp/messagebus/rain_check/core/task.cpp
@@ -1,216 +1,216 @@
#include "rain_check.h"
-
+
#include <library/cpp/messagebus/actor/temp_tls_vector.h>
-
+
#include <util/system/type_name.h>
#include <util/system/tls.h>
-
-using namespace NRainCheck;
-using namespace NRainCheck::NPrivate;
-
-using namespace NActor;
-
-namespace {
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+
+using namespace NActor;
+
+namespace {
Y_POD_STATIC_THREAD(TTaskRunnerBase*)
ThreadCurrentTask;
-}
-
+}
+
void TNopSubtaskListener::SetDone() {
}
-
-TNopSubtaskListener TNopSubtaskListener::Instance;
-
-TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl)
- : TActor<TTaskRunnerBase>(env->GetExecutor())
- , Impl(impl)
- , ParentTask(parentTask)
- //, HoldsSelfReference(false)
- , Done(false)
- , SetDoneCalled(false)
-{
-}
-
-TTaskRunnerBase::~TTaskRunnerBase() {
+
+TNopSubtaskListener TNopSubtaskListener::Instance;
+
+TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl)
+ : TActor<TTaskRunnerBase>(env->GetExecutor())
+ , Impl(impl)
+ , ParentTask(parentTask)
+ //, HoldsSelfReference(false)
+ , Done(false)
+ , SetDoneCalled(false)
+{
+}
+
+TTaskRunnerBase::~TTaskRunnerBase() {
Y_ASSERT(Done);
-}
-
-namespace {
- struct TRunningInThisThreadGuard {
- TTaskRunnerBase* const Task;
- TRunningInThisThreadGuard(TTaskRunnerBase* task)
- : Task(task)
- {
+}
+
+namespace {
+ struct TRunningInThisThreadGuard {
+ TTaskRunnerBase* const Task;
+ TRunningInThisThreadGuard(TTaskRunnerBase* task)
+ : Task(task)
+ {
Y_ASSERT(!ThreadCurrentTask);
- ThreadCurrentTask = task;
- }
-
- ~TRunningInThisThreadGuard() {
+ ThreadCurrentTask = task;
+ }
+
+ ~TRunningInThisThreadGuard() {
Y_ASSERT(ThreadCurrentTask == Task);
ThreadCurrentTask = nullptr;
- }
- };
-}
-
+ }
+ };
+}
+
void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) {
Y_ASSERT(RefCount() > 0);
-
- TRunningInThisThreadGuard g(this);
-
- //RetainRef();
-
- for (;;) {
- TTempTlsVector<TSubtaskCompletion*> temp;
-
- temp.GetVector()->swap(Pending);
-
+
+ TRunningInThisThreadGuard g(this);
+
+ //RetainRef();
+
+ for (;;) {
+ TTempTlsVector<TSubtaskCompletion*> temp;
+
+ temp.GetVector()->swap(Pending);
+
for (auto& pending : *temp.GetVector()) {
if (pending->IsComplete()) {
pending->FireCompletionCallback(GetImplBase());
- } else {
+ } else {
Pending.push_back(pending);
- }
- }
-
- if (!Pending.empty()) {
- return;
- }
-
- if (!Done) {
- Done = !ReplyReceived();
- } else {
- if (Pending.empty()) {
- if (!SetDoneCalled) {
- ParentTask->SetDone();
- SetDoneCalled = true;
- }
- //ReleaseRef();
- return;
- }
- }
- }
-}
-
+ }
+ }
+
+ if (!Pending.empty()) {
+ return;
+ }
+
+ if (!Done) {
+ Done = !ReplyReceived();
+ } else {
+ if (Pending.empty()) {
+ if (!SetDoneCalled) {
+ ParentTask->SetDone();
+ SetDoneCalled = true;
+ }
+ //ReleaseRef();
+ return;
+ }
+ }
+ }
+}
+
bool TTaskRunnerBase::IsRunningInThisThread() const {
- return ThreadCurrentTask == this;
-}
-
+ return ThreadCurrentTask == this;
+}
+
TSubtaskCompletion::~TSubtaskCompletion() {
- ESubtaskState state = State.Get();
+ ESubtaskState state = State.Get();
Y_ASSERT(state == CREATED || state == DONE || state == CANCELED);
-}
-
+}
+
void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) {
Y_ASSERT(IsComplete());
-
- if (!!CompletionFunc) {
- TSubtaskCompletionFunc temp = CompletionFunc;
- // completion func must be reset before calling it,
- // because function may set it back
- CompletionFunc = TSubtaskCompletionFunc();
- (task->*(temp.Func))(this);
- }
-}
-
+
+ if (!!CompletionFunc) {
+ TSubtaskCompletionFunc temp = CompletionFunc;
+ // completion func must be reset before calling it,
+ // because function may set it back
+ CompletionFunc = TSubtaskCompletionFunc();
+ (task->*(temp.Func))(this);
+ }
+}
+
void NRainCheck::TSubtaskCompletion::Cancel() {
- for (;;) {
- ESubtaskState state = State.Get();
- if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) {
- return;
- }
- if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) {
- return;
- }
- if (state == DONE && State.CompareAndSet(DONE, CANCELED)) {
- return;
- }
- if (state == CANCEL_REQUESTED || state == CANCELED) {
- return;
- }
- }
-}
-
+ for (;;) {
+ ESubtaskState state = State.Get();
+ if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) {
+ return;
+ }
+ if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) {
+ return;
+ }
+ if (state == DONE && State.CompareAndSet(DONE, CANCELED)) {
+ return;
+ }
+ if (state == CANCEL_REQUESTED || state == CANCELED) {
+ return;
+ }
+ }
+}
+
void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) {
Y_ASSERT(!TaskRunner);
Y_ASSERT(!!parent);
-
- TaskRunner = parent;
-
- parent->Pending.push_back(this);
-
- parent->RefV();
-
- for (;;) {
- ESubtaskState current = State.Get();
- if (current != CREATED && current != DONE) {
+
+ TaskRunner = parent;
+
+ parent->Pending.push_back(this);
+
+ parent->RefV();
+
+ for (;;) {
+ ESubtaskState current = State.Get();
+ if (current != CREATED && current != DONE) {
Y_FAIL("current state should be CREATED or DONE: %s", ToCString(current));
- }
- if (State.CompareAndSet(current, RUNNING)) {
- return;
- }
- }
-}
-
+ }
+ if (State.CompareAndSet(current, RUNNING)) {
+ return;
+ }
+ }
+}
+
void TSubtaskCompletion::SetDone() {
Y_ASSERT(!!TaskRunner);
- TTaskRunnerBase* temp = TaskRunner;
+ TTaskRunnerBase* temp = TaskRunner;
TaskRunner = nullptr;
-
- for (;;) {
- ESubtaskState state = State.Get();
- if (state == RUNNING) {
- if (State.CompareAndSet(RUNNING, DONE)) {
- break;
- }
- } else if (state == CANCEL_REQUESTED) {
- if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) {
- break;
- }
- } else {
+
+ for (;;) {
+ ESubtaskState state = State.Get();
+ if (state == RUNNING) {
+ if (State.CompareAndSet(RUNNING, DONE)) {
+ break;
+ }
+ } else if (state == CANCEL_REQUESTED) {
+ if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) {
+ break;
+ }
+ } else {
Y_FAIL("cannot SetDone: unknown state: %s", ToCString(state));
- }
- }
-
- temp->ScheduleV();
- temp->UnRefV();
-}
-
-#if 0
-void NRainCheck::TTaskRunnerBase::RetainRef()
-{
- if (HoldsSelfReference) {
- return;
- }
- HoldsSelfReference = true;
- Ref();
-}
-
-void NRainCheck::TTaskRunnerBase::ReleaseRef()
-{
- if (!HoldsSelfReference) {
- return;
- }
- HoldsSelfReference = false;
- DecRef();
-}
-#endif
-
+ }
+ }
+
+ temp->ScheduleV();
+ temp->UnRefV();
+}
+
+#if 0
+void NRainCheck::TTaskRunnerBase::RetainRef()
+{
+ if (HoldsSelfReference) {
+ return;
+ }
+ HoldsSelfReference = true;
+ Ref();
+}
+
+void NRainCheck::TTaskRunnerBase::ReleaseRef()
+{
+ if (!HoldsSelfReference) {
+ return;
+ }
+ HoldsSelfReference = false;
+ DecRef();
+}
+#endif
+
void TTaskRunnerBase::AssertInThisThread() const {
Y_ASSERT(IsRunningInThisThread());
-}
-
+}
+
TTaskRunnerBase* TTaskRunnerBase::CurrentTask() {
Y_VERIFY(!!ThreadCurrentTask);
- return ThreadCurrentTask;
-}
-
+ return ThreadCurrentTask;
+}
+
ITaskBase* TTaskRunnerBase::CurrentTaskImpl() {
return CurrentTask()->GetImplBase();
}
TString TTaskRunnerBase::GetStatusSingleLine() {
return TypeName(*Impl);
-}
-
+}
+
bool NRainCheck::AreWeInsideTask() {
return ThreadCurrentTask != nullptr;
}
diff --git a/library/cpp/messagebus/rain_check/core/task.h b/library/cpp/messagebus/rain_check/core/task.h
index 7d8778bcda..b84e62a1eb 100644
--- a/library/cpp/messagebus/rain_check/core/task.h
+++ b/library/cpp/messagebus/rain_check/core/task.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include "fwd.h"
#include <library/cpp/messagebus/actor/actor.h>
@@ -7,55 +7,55 @@
#include <library/cpp/deprecated/enum_codegen/enum_codegen.h>
-#include <util/generic/noncopyable.h>
-#include <util/generic/ptr.h>
-#include <util/thread/lfstack.h>
-
-namespace NRainCheck {
- struct ISubtaskListener {
- virtual void SetDone() = 0;
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/thread/lfstack.h>
+
+namespace NRainCheck {
+ struct ISubtaskListener {
+ virtual void SetDone() = 0;
virtual ~ISubtaskListener() {
}
- };
-
- struct TNopSubtaskListener: public ISubtaskListener {
+ };
+
+ struct TNopSubtaskListener: public ISubtaskListener {
void SetDone() override;
-
- static TNopSubtaskListener Instance;
- };
-
- class TSubtaskCompletionFunc {
- friend class TSubtaskCompletion;
-
- typedef void (ITaskBase::*TFunc)(TSubtaskCompletion*);
- TFunc Func;
-
- public:
- TSubtaskCompletionFunc()
+
+ static TNopSubtaskListener Instance;
+ };
+
+ class TSubtaskCompletionFunc {
+ friend class TSubtaskCompletion;
+
+ typedef void (ITaskBase::*TFunc)(TSubtaskCompletion*);
+ TFunc Func;
+
+ public:
+ TSubtaskCompletionFunc()
: Func(nullptr)
{
}
-
- TSubtaskCompletionFunc(void*)
+
+ TSubtaskCompletionFunc(void*)
: Func(nullptr)
{
}
-
- template <typename TTask>
- TSubtaskCompletionFunc(void (TTask::*func)(TSubtaskCompletion*))
+
+ template <typename TTask>
+ TSubtaskCompletionFunc(void (TTask::*func)(TSubtaskCompletion*))
: Func((TFunc)func)
- {
+ {
static_assert((std::is_base_of<ITaskBase, TTask>::value), "expect (std::is_base_of<ITaskBase, TTask>::value)");
- }
-
- bool operator!() const {
- return !Func;
- }
- };
-
- template <typename T>
- class TTaskFuture;
-
+ }
+
+ bool operator!() const {
+ return !Func;
+ }
+ };
+
+ template <typename T>
+ class TTaskFuture;
+
#define SUBTASK_STATE_MAP(XX) \
XX(CREATED, "Initial") \
XX(RUNNING, "Running") \
@@ -63,33 +63,33 @@ namespace NRainCheck {
XX(CANCEL_REQUESTED, "Cancel requested, but still executing") \
XX(CANCELED, "Canceled") \
/**/
-
- enum ESubtaskState {
- SUBTASK_STATE_MAP(ENUM_VALUE_GEN_NO_VALUE)
- };
-
- ENUM_TO_STRING(ESubtaskState, SUBTASK_STATE_MAP)
-
+
+ enum ESubtaskState {
+ SUBTASK_STATE_MAP(ENUM_VALUE_GEN_NO_VALUE)
+ };
+
+ ENUM_TO_STRING(ESubtaskState, SUBTASK_STATE_MAP)
+
class TSubtaskCompletion : TNonCopyable, public ISubtaskListener {
- friend struct TTaskAccessor;
+ friend struct TTaskAccessor;
- private:
- TAtomicBox<ESubtaskState> State;
- TTaskRunnerBase* volatile TaskRunner;
- TSubtaskCompletionFunc CompletionFunc;
+ private:
+ TAtomicBox<ESubtaskState> State;
+ TTaskRunnerBase* volatile TaskRunner;
+ TSubtaskCompletionFunc CompletionFunc;
- public:
+ public:
TSubtaskCompletion()
: State(CREATED)
, TaskRunner()
{
}
~TSubtaskCompletion() override;
-
- // Either done or cancel requested or cancelled
- bool IsComplete() const {
- ESubtaskState state = State.Get();
- switch (state) {
+
+ // Either done or cancel requested or cancelled
+ bool IsComplete() const {
+ ESubtaskState state = State.Get();
+ switch (state) {
case RUNNING:
return false;
case DONE:
@@ -102,82 +102,82 @@ namespace NRainCheck {
Y_FAIL("not started");
default:
Y_FAIL("unknown value: %u", (unsigned)state);
- }
- }
-
- void FireCompletionCallback(ITaskBase*);
-
- void SetCompletionCallback(TSubtaskCompletionFunc func) {
- CompletionFunc = func;
- }
-
- // Completed, but not cancelled
- bool IsDone() const {
- return State.Get() == DONE;
- }
-
- // Request cancel by actor
- // Does nothing but marks task cancelled,
- // and allows proceeding to next callback
- void Cancel();
-
- // called by service provider implementations
- // must not be called by actor
- void SetRunning(TTaskRunnerBase* parent);
+ }
+ }
+
+ void FireCompletionCallback(ITaskBase*);
+
+ void SetCompletionCallback(TSubtaskCompletionFunc func) {
+ CompletionFunc = func;
+ }
+
+ // Completed, but not cancelled
+ bool IsDone() const {
+ return State.Get() == DONE;
+ }
+
+ // Request cancel by actor
+ // Does nothing but marks task cancelled,
+ // and allows proceeding to next callback
+ void Cancel();
+
+ // called by service provider implementations
+ // must not be called by actor
+ void SetRunning(TTaskRunnerBase* parent);
void SetDone() override;
- };
-
- // See ISimpleTask, ICoroTask
+ };
+
+ // See ISimpleTask, ICoroTask
class TTaskRunnerBase: public TAtomicRefCount<TTaskRunnerBase>, public NActor::TActor<TTaskRunnerBase> {
- friend class NActor::TActor<TTaskRunnerBase>;
- friend class TContinueFunc;
- friend struct TTaskAccessor;
- friend class TSubtaskCompletion;
-
- private:
- THolder<ITaskBase> Impl;
-
- ISubtaskListener* const ParentTask;
- // While task is running, it holds extra reference to self.
- //bool HoldsSelfReference;
- bool Done;
- bool SetDoneCalled;
-
- // Subtasks currently executed.
+ friend class NActor::TActor<TTaskRunnerBase>;
+ friend class TContinueFunc;
+ friend struct TTaskAccessor;
+ friend class TSubtaskCompletion;
+
+ private:
+ THolder<ITaskBase> Impl;
+
+ ISubtaskListener* const ParentTask;
+ // While task is running, it holds extra reference to self.
+ //bool HoldsSelfReference;
+ bool Done;
+ bool SetDoneCalled;
+
+ // Subtasks currently executed.
TVector<TSubtaskCompletion*> Pending;
-
- void Act(NActor::TDefaultTag);
-
- public:
- // Construct task. Task is not automatically started.
- TTaskRunnerBase(IEnv*, ISubtaskListener* parent, TAutoPtr<ITaskBase> impl);
+
+ void Act(NActor::TDefaultTag);
+
+ public:
+ // Construct task. Task is not automatically started.
+ TTaskRunnerBase(IEnv*, ISubtaskListener* parent, TAutoPtr<ITaskBase> impl);
~TTaskRunnerBase() override;
-
- bool IsRunningInThisThread() const;
- void AssertInThisThread() const;
- static TTaskRunnerBase* CurrentTask();
+
+ bool IsRunningInThisThread() const;
+ void AssertInThisThread() const;
+ static TTaskRunnerBase* CurrentTask();
static ITaskBase* CurrentTaskImpl();
-
+
TString GetStatusSingleLine();
-
- protected:
- //void RetainRef();
- //void ReleaseRef();
+
+ protected:
+ //void RetainRef();
+ //void ReleaseRef();
ITaskBase* GetImplBase() {
return Impl.Get();
}
-
- private:
- // true if need to call again
- virtual bool ReplyReceived() = 0;
- };
-
- class ITaskBase {
- public:
+
+ private:
+ // true if need to call again
+ virtual bool ReplyReceived() = 0;
+ };
+
+ class ITaskBase {
+ public:
virtual ~ITaskBase() {
}
- };
-
+ };
+
// Check that current method executed inside some task.
bool AreWeInsideTask();
diff --git a/library/cpp/messagebus/rain_check/core/track.cpp b/library/cpp/messagebus/rain_check/core/track.cpp
index 092a51a214..cc3747b9f6 100644
--- a/library/cpp/messagebus/rain_check/core/track.cpp
+++ b/library/cpp/messagebus/rain_check/core/track.cpp
@@ -1,66 +1,66 @@
-#include "track.h"
-
-using namespace NRainCheck;
-using namespace NRainCheck::NPrivate;
-
+#include "track.h"
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+
void TTaskTrackerReceipt::SetDone() {
- TaskTracker->GetQueue<TTaskTrackerReceipt*>()->EnqueueAndSchedule(this);
-}
-
+ TaskTracker->GetQueue<TTaskTrackerReceipt*>()->EnqueueAndSchedule(this);
+}
+
TString TTaskTrackerReceipt::GetStatusSingleLine() {
- return Task->GetStatusSingleLine();
-}
-
+ return Task->GetStatusSingleLine();
+}
+
TTaskTracker::TTaskTracker(NActor::TExecutor* executor)
: NActor::TActor<TTaskTracker>(executor)
-{
-}
-
+{
+}
+
TTaskTracker::~TTaskTracker() {
Y_ASSERT(Tasks.Empty());
-}
-
-void TTaskTracker::Shutdown() {
- ShutdownFlag.Set(true);
- Schedule();
- ShutdownEvent.WaitI();
-}
-
-void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, ITaskFactory* taskFactory) {
- THolder<ITaskFactory> holder(taskFactory);
-
- THolder<TTaskTrackerReceipt> receipt(new TTaskTrackerReceipt(this));
- receipt->Task = taskFactory->NewTask(receipt.Get());
-
- Tasks.PushBack(receipt.Release());
-}
-
-void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TTaskTrackerReceipt* receipt) {
+}
+
+void TTaskTracker::Shutdown() {
+ ShutdownFlag.Set(true);
+ Schedule();
+ ShutdownEvent.WaitI();
+}
+
+void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, ITaskFactory* taskFactory) {
+ THolder<ITaskFactory> holder(taskFactory);
+
+ THolder<TTaskTrackerReceipt> receipt(new TTaskTrackerReceipt(this));
+ receipt->Task = taskFactory->NewTask(receipt.Get());
+
+ Tasks.PushBack(receipt.Release());
+}
+
+void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TTaskTrackerReceipt* receipt) {
Y_ASSERT(!receipt->Empty());
- receipt->Unlink();
- delete receipt;
-}
-
-void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<TTaskTrackerStatus>* status) {
- TTaskTrackerStatus s;
- s.Size = Tasks.Size();
- status->SetResult(s);
-}
-
+ receipt->Unlink();
+ delete receipt;
+}
+
+void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<TTaskTrackerStatus>* status) {
+ TTaskTrackerStatus s;
+ s.Size = Tasks.Size();
+ status->SetResult(s);
+}
+
void TTaskTracker::Act(NActor::TDefaultTag) {
- GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->DequeueAll();
- GetQueue<ITaskFactory*>()->DequeueAll();
- GetQueue<TTaskTrackerReceipt*>()->DequeueAll();
-
- if (ShutdownFlag.Get()) {
- if (Tasks.Empty()) {
- ShutdownEvent.Signal();
- }
- }
-}
-
-ui32 TTaskTracker::Size() {
- TAsyncResult<TTaskTrackerStatus> r;
- GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->EnqueueAndSchedule(&r);
- return r.GetResult().Size;
-}
+ GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->DequeueAll();
+ GetQueue<ITaskFactory*>()->DequeueAll();
+ GetQueue<TTaskTrackerReceipt*>()->DequeueAll();
+
+ if (ShutdownFlag.Get()) {
+ if (Tasks.Empty()) {
+ ShutdownEvent.Signal();
+ }
+ }
+}
+
+ui32 TTaskTracker::Size() {
+ TAsyncResult<TTaskTrackerStatus> r;
+ GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->EnqueueAndSchedule(&r);
+ return r.GetResult().Size;
+}
diff --git a/library/cpp/messagebus/rain_check/core/track.h b/library/cpp/messagebus/rain_check/core/track.h
index d387de7574..a7f3d099f0 100644
--- a/library/cpp/messagebus/rain_check/core/track.h
+++ b/library/cpp/messagebus/rain_check/core/track.h
@@ -1,97 +1,97 @@
-#pragma once
-
+#pragma once
+
#include "spawn.h"
#include "task.h"
-
+
#include <library/cpp/messagebus/async_result.h>
#include <library/cpp/messagebus/actor/queue_in_actor.h>
#include <library/cpp/messagebus/misc/atomic_box.h>
-
+
#include <util/generic/intrlist.h>
#include <util/system/event.h>
-namespace NRainCheck {
- class TTaskTracker;
-
- namespace NPrivate {
- struct ITaskFactory {
- virtual TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener*) = 0;
+namespace NRainCheck {
+ class TTaskTracker;
+
+ namespace NPrivate {
+ struct ITaskFactory {
+ virtual TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener*) = 0;
virtual ~ITaskFactory() {
}
- };
-
- struct TTaskTrackerReceipt: public ISubtaskListener, public TIntrusiveListItem<TTaskTrackerReceipt> {
- TTaskTracker* const TaskTracker;
- TIntrusivePtr<TTaskRunnerBase> Task;
-
+ };
+
+ struct TTaskTrackerReceipt: public ISubtaskListener, public TIntrusiveListItem<TTaskTrackerReceipt> {
+ TTaskTracker* const TaskTracker;
+ TIntrusivePtr<TTaskRunnerBase> Task;
+
TTaskTrackerReceipt(TTaskTracker* taskTracker)
: TaskTracker(taskTracker)
{
}
-
+
void SetDone() override;
-
+
TString GetStatusSingleLine();
- };
-
- struct TTaskTrackerStatus {
- ui32 Size;
- };
-
- }
-
- class TTaskTracker
+ };
+
+ struct TTaskTrackerStatus {
+ ui32 Size;
+ };
+
+ }
+
+ class TTaskTracker
: public TAtomicRefCount<TTaskTracker>,
public NActor::TActor<TTaskTracker>,
public NActor::TQueueInActor<TTaskTracker, NPrivate::ITaskFactory*>,
public NActor::TQueueInActor<TTaskTracker, NPrivate::TTaskTrackerReceipt*>,
public NActor::TQueueInActor<TTaskTracker, TAsyncResult<NPrivate::TTaskTrackerStatus>*> {
- friend struct NPrivate::TTaskTrackerReceipt;
-
- private:
- TAtomicBox<bool> ShutdownFlag;
+ friend struct NPrivate::TTaskTrackerReceipt;
+
+ private:
+ TAtomicBox<bool> ShutdownFlag;
TSystemEvent ShutdownEvent;
-
- TIntrusiveList<NPrivate::TTaskTrackerReceipt> Tasks;
-
- template <typename TItem>
- NActor::TQueueInActor<TTaskTracker, TItem>* GetQueue() {
- return this;
- }
-
- public:
- TTaskTracker(NActor::TExecutor* executor);
+
+ TIntrusiveList<NPrivate::TTaskTrackerReceipt> Tasks;
+
+ template <typename TItem>
+ NActor::TQueueInActor<TTaskTracker, TItem>* GetQueue() {
+ return this;
+ }
+
+ public:
+ TTaskTracker(NActor::TExecutor* executor);
~TTaskTracker() override;
-
- void Shutdown();
-
- void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::ITaskFactory*);
- void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::TTaskTrackerReceipt*);
- void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<NPrivate::TTaskTrackerStatus>*);
-
- void Act(NActor::TDefaultTag);
-
- template <typename TTask, typename TEnv, typename TParam>
- void Spawn(TEnv* env, TParam param) {
- struct TTaskFactory: public NPrivate::ITaskFactory {
- TEnv* const Env;
- TParam Param;
-
+
+ void Shutdown();
+
+ void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::ITaskFactory*);
+ void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::TTaskTrackerReceipt*);
+ void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<NPrivate::TTaskTrackerStatus>*);
+
+ void Act(NActor::TDefaultTag);
+
+ template <typename TTask, typename TEnv, typename TParam>
+ void Spawn(TEnv* env, TParam param) {
+ struct TTaskFactory: public NPrivate::ITaskFactory {
+ TEnv* const Env;
+ TParam Param;
+
TTaskFactory(TEnv* env, TParam param)
: Env(env)
, Param(param)
{
}
-
+
TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener* subtaskListener) override {
- return NRainCheck::SpawnTask<TTask>(Env, Param, subtaskListener).Get();
- }
- };
-
- GetQueue<NPrivate::ITaskFactory*>()->EnqueueAndSchedule(new TTaskFactory(env, param));
- }
-
- ui32 Size();
- };
-
-}
+ return NRainCheck::SpawnTask<TTask>(Env, Param, subtaskListener).Get();
+ }
+ };
+
+ GetQueue<NPrivate::ITaskFactory*>()->EnqueueAndSchedule(new TTaskFactory(env, param));
+ }
+
+ ui32 Size();
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/core/track_ut.cpp b/library/cpp/messagebus/rain_check/core/track_ut.cpp
index 05f7de1319..f2ac90fa3c 100644
--- a/library/cpp/messagebus/rain_check/core/track_ut.cpp
+++ b/library/cpp/messagebus/rain_check/core/track_ut.cpp
@@ -1,45 +1,45 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "track.h"
#include <library/cpp/messagebus/rain_check/test/helper/misc.h>
#include <library/cpp/messagebus/rain_check/test/ut/test.h>
-
-using namespace NRainCheck;
-
+
+using namespace NRainCheck;
+
Y_UNIT_TEST_SUITE(TaskTracker) {
- struct TTaskForTracker: public ISimpleTask {
- TTestSync* const TestSync;
-
+ struct TTaskForTracker: public ISimpleTask {
+ TTestSync* const TestSync;
+
TTaskForTracker(TTestEnv*, TTestSync* testSync)
: TestSync(testSync)
{
}
-
+
TContinueFunc Start() override {
- TestSync->WaitForAndIncrement(0);
- TestSync->WaitForAndIncrement(2);
+ TestSync->WaitForAndIncrement(0);
+ TestSync->WaitForAndIncrement(2);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(Simple) {
- TTestEnv env;
-
- TIntrusivePtr<TTaskTracker> tracker(new TTaskTracker(env.GetExecutor()));
-
- TTestSync testSync;
-
- tracker->Spawn<TTaskForTracker>(&env, &testSync);
-
- testSync.WaitFor(1);
-
+ TTestEnv env;
+
+ TIntrusivePtr<TTaskTracker> tracker(new TTaskTracker(env.GetExecutor()));
+
+ TTestSync testSync;
+
+ tracker->Spawn<TTaskForTracker>(&env, &testSync);
+
+ testSync.WaitFor(1);
+
UNIT_ASSERT_VALUES_EQUAL(1u, tracker->Size());
-
- testSync.CheckAndIncrement(1);
-
- testSync.WaitForAndIncrement(3);
-
- tracker->Shutdown();
- }
-}
+
+ testSync.CheckAndIncrement(1);
+
+ testSync.WaitForAndIncrement(3);
+
+ tracker->Shutdown();
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/core/ya.make b/library/cpp/messagebus/rain_check/core/ya.make
index c6fb5640d4..497e452729 100644
--- a/library/cpp/messagebus/rain_check/core/ya.make
+++ b/library/cpp/messagebus/rain_check/core/ya.make
@@ -1,25 +1,25 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(g:messagebus)
-
-PEERDIR(
+
+PEERDIR(
library/cpp/coroutine/engine
library/cpp/deprecated/enum_codegen
library/cpp/messagebus
library/cpp/messagebus/actor
library/cpp/messagebus/scheduler
-)
-
-SRCS(
- coro.cpp
- coro_stack.cpp
- env.cpp
- rain_check.cpp
- simple.cpp
- sleep.cpp
- spawn.cpp
- task.cpp
- track.cpp
-)
-
-END()
+)
+
+SRCS(
+ coro.cpp
+ coro_stack.cpp
+ env.cpp
+ rain_check.cpp
+ simple.cpp
+ sleep.cpp
+ spawn.cpp
+ task.cpp
+ track.cpp
+)
+
+END()
diff --git a/library/cpp/messagebus/rain_check/http/client_ut.cpp b/library/cpp/messagebus/rain_check/http/client_ut.cpp
index 1628114391..c6e4a151bd 100644
--- a/library/cpp/messagebus/rain_check/http/client_ut.cpp
+++ b/library/cpp/messagebus/rain_check/http/client_ut.cpp
@@ -25,7 +25,7 @@
#include <utility>
using namespace NRainCheck;
-using namespace NBus::NTest;
+using namespace NBus::NTest;
namespace {
class THttpClientEnv: public TTestEnvTemplate<THttpClientEnv> {
@@ -145,11 +145,11 @@ Y_UNIT_TEST_SUITE(RainCheckHttpClient) {
static const TIpPort SERVER_PORT = 4000;
Y_UNIT_TEST(Simple) {
- // TODO: randomize port
- if (!IsFixedPortTestAllowed()) {
- return;
- }
-
+ // TODO: randomize port
+ if (!IsFixedPortTestAllowed()) {
+ return;
+ }
+
TSimpleServer server;
NNeh::IServicesRef runner = RunServer(SERVER_PORT, server);
diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp
index daac8d9a99..13d3132fb7 100644
--- a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp
+++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp
@@ -1,98 +1,98 @@
-#include "messagebus_client.h"
-
-using namespace NRainCheck;
-using namespace NBus;
-
-TBusClientService::TBusClientService(
+#include "messagebus_client.h"
+
+using namespace NRainCheck;
+using namespace NBus;
+
+TBusClientService::TBusClientService(
const NBus::TBusSessionConfig& config,
NBus::TBusProtocol* proto,
NBus::TBusMessageQueue* queue) {
- Session = queue->CreateSource(proto, this, config);
-}
-
+ Session = queue->CreateSource(proto, this, config);
+}
+
TBusClientService::~TBusClientService() {
- Session->Shutdown();
-}
-
+ Session->Shutdown();
+}
+
void TBusClientService::SendCommon(NBus::TBusMessage* message, const NBus::TNetAddr&, TBusFuture* future) {
- TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
-
- future->SetRunning(current);
-
- future->Task = current;
-
- // after this statement message is owned by both messagebus and future
- future->Request.Reset(message);
-
- // TODO: allow cookie in messagebus
- message->Data = future;
-}
-
-void TBusClientService::ProcessResultCommon(NBus::TBusMessageAutoPtr message,
+ TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask();
+
+ future->SetRunning(current);
+
+ future->Task = current;
+
+ // after this statement message is owned by both messagebus and future
+ future->Request.Reset(message);
+
+ // TODO: allow cookie in messagebus
+ message->Data = future;
+}
+
+void TBusClientService::ProcessResultCommon(NBus::TBusMessageAutoPtr message,
const NBus::TNetAddr&, TBusFuture* future,
NBus::EMessageStatus status) {
Y_UNUSED(message.Release());
-
- if (status == NBus::MESSAGE_OK) {
- return;
- }
-
+
+ if (status == NBus::MESSAGE_OK) {
+ return;
+ }
+
future->SetDoneAndSchedule(status, nullptr);
-}
-
-void TBusClientService::SendOneWay(
+}
+
+void TBusClientService::SendOneWay(
NBus::TBusMessageAutoPtr message, const NBus::TNetAddr& addr,
TBusFuture* future) {
- SendCommon(message.Get(), addr, future);
-
- EMessageStatus ok = Session->SendMessageOneWay(message.Get(), &addr, false);
- ProcessResultCommon(message, addr, future, ok);
-}
-
+ SendCommon(message.Get(), addr, future);
+
+ EMessageStatus ok = Session->SendMessageOneWay(message.Get(), &addr, false);
+ ProcessResultCommon(message, addr, future, ok);
+}
+
NBus::TBusClientSessionPtr TBusClientService::GetSessionForMonitoring() const {
return Session;
}
-void TBusClientService::Send(
+void TBusClientService::Send(
TBusMessageAutoPtr message, const TNetAddr& addr,
TBusFuture* future) {
- SendCommon(message.Get(), addr, future);
-
- EMessageStatus ok = Session->SendMessage(message.Get(), &addr, false);
- ProcessResultCommon(message, addr, future, ok);
-}
-
-void TBusClientService::OnReply(
+ SendCommon(message.Get(), addr, future);
+
+ EMessageStatus ok = Session->SendMessage(message.Get(), &addr, false);
+ ProcessResultCommon(message, addr, future, ok);
+}
+
+void TBusClientService::OnReply(
TAutoPtr<TBusMessage> request,
TAutoPtr<TBusMessage> response) {
TBusFuture* future = (TBusFuture*)request->Data;
Y_ASSERT(future->Request.Get() == request.Get());
Y_UNUSED(request.Release());
- future->SetDoneAndSchedule(MESSAGE_OK, response);
-}
-
-void NRainCheck::TBusClientService::OnMessageSentOneWay(
+ future->SetDoneAndSchedule(MESSAGE_OK, response);
+}
+
+void NRainCheck::TBusClientService::OnMessageSentOneWay(
TAutoPtr<NBus::TBusMessage> request) {
TBusFuture* future = (TBusFuture*)request->Data;
Y_ASSERT(future->Request.Get() == request.Get());
Y_UNUSED(request.Release());
future->SetDoneAndSchedule(MESSAGE_OK, nullptr);
-}
-
-void TBusClientService::OnError(
+}
+
+void TBusClientService::OnError(
TAutoPtr<TBusMessage> message, NBus::EMessageStatus status) {
if (message->Data == nullptr) {
- return;
- }
-
+ return;
+ }
+
TBusFuture* future = (TBusFuture*)message->Data;
Y_ASSERT(future->Request.Get() == message.Get());
Y_UNUSED(message.Release());
future->SetDoneAndSchedule(status, nullptr);
-}
-
+}
+
void TBusFuture::SetDoneAndSchedule(EMessageStatus status, TAutoPtr<TBusMessage> response) {
- Status = status;
- Response.Reset(response.Release());
- SetDone();
-}
+ Status = status;
+ Response.Reset(response.Release());
+ SetDone();
+}
diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h
index 0a291cdea6..8bcc03b8d9 100644
--- a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h
+++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h
@@ -1,67 +1,67 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/rain_check/core/task.h>
#include <library/cpp/messagebus/ybus.h>
-
-namespace NRainCheck {
- class TBusFuture: public TSubtaskCompletion {
- friend class TBusClientService;
-
- private:
- THolder<NBus::TBusMessage> Request;
- THolder<NBus::TBusMessage> Response;
- NBus::EMessageStatus Status;
-
- private:
- TTaskRunnerBase* Task;
-
- void SetDoneAndSchedule(NBus::EMessageStatus, TAutoPtr<NBus::TBusMessage>);
-
- public:
- // TODO: add MESSAGE_UNDEFINED
+
+namespace NRainCheck {
+ class TBusFuture: public TSubtaskCompletion {
+ friend class TBusClientService;
+
+ private:
+ THolder<NBus::TBusMessage> Request;
+ THolder<NBus::TBusMessage> Response;
+ NBus::EMessageStatus Status;
+
+ private:
+ TTaskRunnerBase* Task;
+
+ void SetDoneAndSchedule(NBus::EMessageStatus, TAutoPtr<NBus::TBusMessage>);
+
+ public:
+ // TODO: add MESSAGE_UNDEFINED
TBusFuture()
: Status(NBus::MESSAGE_DONT_ASK)
, Task(nullptr)
{
}
-
- NBus::TBusMessage* GetRequest() const {
- return Request.Get();
- }
-
- NBus::TBusMessage* GetResponse() const {
+
+ NBus::TBusMessage* GetRequest() const {
+ return Request.Get();
+ }
+
+ NBus::TBusMessage* GetResponse() const {
Y_ASSERT(IsDone());
- return Response.Get();
- }
-
- NBus::EMessageStatus GetStatus() const {
+ return Response.Get();
+ }
+
+ NBus::EMessageStatus GetStatus() const {
Y_ASSERT(IsDone());
- return Status;
- }
- };
-
- class TBusClientService: private NBus::IBusClientHandler {
- private:
- NBus::TBusClientSessionPtr Session;
-
- public:
- TBusClientService(const NBus::TBusSessionConfig&, NBus::TBusProtocol*, NBus::TBusMessageQueue*);
+ return Status;
+ }
+ };
+
+ class TBusClientService: private NBus::IBusClientHandler {
+ private:
+ NBus::TBusClientSessionPtr Session;
+
+ public:
+ TBusClientService(const NBus::TBusSessionConfig&, NBus::TBusProtocol*, NBus::TBusMessageQueue*);
~TBusClientService() override;
-
- void Send(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future);
- void SendOneWay(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future);
-
+
+ void Send(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future);
+ void SendOneWay(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future);
+
// Use it only for monitoring
NBus::TBusClientSessionPtr GetSessionForMonitoring() const;
- private:
- void SendCommon(NBus::TBusMessage*, const NBus::TNetAddr&, TBusFuture* future);
- void ProcessResultCommon(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future, NBus::EMessageStatus);
-
+ private:
+ void SendCommon(NBus::TBusMessage*, const NBus::TNetAddr&, TBusFuture* future);
+ void ProcessResultCommon(NBus::TBusMessageAutoPtr, const NBus::TNetAddr&, TBusFuture* future, NBus::EMessageStatus);
+
void OnReply(TAutoPtr<NBus::TBusMessage> pMessage, TAutoPtr<NBus::TBusMessage> pReply) override;
void OnError(TAutoPtr<NBus::TBusMessage> pMessage, NBus::EMessageStatus status) override;
void OnMessageSentOneWay(TAutoPtr<NBus::TBusMessage>) override;
- };
-
-}
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp
index 1b3618558b..4571f6f74a 100644
--- a/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp
+++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp
@@ -1,146 +1,146 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "messagebus_client.h"
#include <library/cpp/messagebus/rain_check/test/ut/test.h>
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
-
+
#include <util/generic/cast.h>
-
-using namespace NBus;
-using namespace NBus::NTest;
-using namespace NRainCheck;
-
-struct TMessageBusClientEnv: public TTestEnvTemplate<TMessageBusClientEnv> {
- // TODO: use same thread pool
- TBusMessageQueuePtr Queue;
- TExampleProtocol Proto;
- TBusClientService BusClientService;
-
- static TBusQueueConfig QueueConfig() {
- TBusQueueConfig r;
- r.NumWorkers = 4;
- return r;
- }
-
- TMessageBusClientEnv()
- : Queue(CreateMessageQueue(GetExecutor()))
- , BusClientService(TBusSessionConfig(), &Proto, Queue.Get())
+
+using namespace NBus;
+using namespace NBus::NTest;
+using namespace NRainCheck;
+
+struct TMessageBusClientEnv: public TTestEnvTemplate<TMessageBusClientEnv> {
+ // TODO: use same thread pool
+ TBusMessageQueuePtr Queue;
+ TExampleProtocol Proto;
+ TBusClientService BusClientService;
+
+ static TBusQueueConfig QueueConfig() {
+ TBusQueueConfig r;
+ r.NumWorkers = 4;
+ return r;
+ }
+
+ TMessageBusClientEnv()
+ : Queue(CreateMessageQueue(GetExecutor()))
+ , BusClientService(TBusSessionConfig(), &Proto, Queue.Get())
{
}
-};
-
+};
+
Y_UNIT_TEST_SUITE(RainCheckMessageBusClient) {
- struct TSimpleTask: public ISimpleTask {
- TMessageBusClientEnv* const Env;
-
- const unsigned ServerPort;
-
- TSimpleTask(TMessageBusClientEnv* env, unsigned serverPort)
- : Env(env)
- , ServerPort(serverPort)
- {
- }
-
+ struct TSimpleTask: public ISimpleTask {
+ TMessageBusClientEnv* const Env;
+
+ const unsigned ServerPort;
+
+ TSimpleTask(TMessageBusClientEnv* env, unsigned serverPort)
+ : Env(env)
+ , ServerPort(serverPort)
+ {
+ }
+
TVector<TSimpleSharedPtr<TBusFuture>> Requests;
-
+
TContinueFunc Start() override {
- for (unsigned i = 0; i < 3; ++i) {
- Requests.push_back(new TBusFuture);
- TNetAddr addr("localhost", ServerPort);
- Env->BusClientService.Send(new TExampleRequest(&Env->Proto.RequestCount), addr, Requests[i].Get());
- }
-
- return TContinueFunc(&TSimpleTask::GotReplies);
- }
-
- TContinueFunc GotReplies() {
- for (unsigned i = 0; i < Requests.size(); ++i) {
+ for (unsigned i = 0; i < 3; ++i) {
+ Requests.push_back(new TBusFuture);
+ TNetAddr addr("localhost", ServerPort);
+ Env->BusClientService.Send(new TExampleRequest(&Env->Proto.RequestCount), addr, Requests[i].Get());
+ }
+
+ return TContinueFunc(&TSimpleTask::GotReplies);
+ }
+
+ TContinueFunc GotReplies() {
+ for (unsigned i = 0; i < Requests.size(); ++i) {
Y_VERIFY(Requests[i]->GetStatus() == MESSAGE_OK);
- VerifyDynamicCast<TExampleResponse*>(Requests[i]->GetResponse());
- }
- Env->TestSync.CheckAndIncrement(0);
+ VerifyDynamicCast<TExampleResponse*>(Requests[i]->GetResponse());
+ }
+ Env->TestSync.CheckAndIncrement(0);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(Simple) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
-
- TMessageBusClientEnv env;
-
- TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSimpleTask>(server.GetActualListenPort());
-
- env.TestSync.WaitForAndIncrement(1);
- }
-
- struct TOneWayServer: public NBus::IBusServerHandler {
- TTestSync* const TestSync;
- TExampleProtocol Proto;
- NBus::TBusMessageQueuePtr Queue;
- NBus::TBusServerSessionPtr Session;
-
- TOneWayServer(TTestSync* testSync)
- : TestSync(testSync)
- {
- Queue = CreateMessageQueue();
- Session = Queue->CreateDestination(&Proto, this, NBus::TBusSessionConfig());
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TMessageBusClientEnv env;
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TSimpleTask>(server.GetActualListenPort());
+
+ env.TestSync.WaitForAndIncrement(1);
+ }
+
+ struct TOneWayServer: public NBus::IBusServerHandler {
+ TTestSync* const TestSync;
+ TExampleProtocol Proto;
+ NBus::TBusMessageQueuePtr Queue;
+ NBus::TBusServerSessionPtr Session;
+
+ TOneWayServer(TTestSync* testSync)
+ : TestSync(testSync)
+ {
+ Queue = CreateMessageQueue();
+ Session = Queue->CreateDestination(&Proto, this, NBus::TBusSessionConfig());
+ }
+
void OnMessage(NBus::TOnMessageContext& context) override {
- TestSync->CheckAndIncrement(1);
- context.ForgetRequest();
- }
- };
-
- struct TOneWayTask: public ISimpleTask {
- TMessageBusClientEnv* const Env;
-
- const unsigned ServerPort;
-
- TOneWayTask(TMessageBusClientEnv* env, unsigned serverPort)
- : Env(env)
- , ServerPort(serverPort)
- {
- }
-
+ TestSync->CheckAndIncrement(1);
+ context.ForgetRequest();
+ }
+ };
+
+ struct TOneWayTask: public ISimpleTask {
+ TMessageBusClientEnv* const Env;
+
+ const unsigned ServerPort;
+
+ TOneWayTask(TMessageBusClientEnv* env, unsigned serverPort)
+ : Env(env)
+ , ServerPort(serverPort)
+ {
+ }
+
TVector<TSimpleSharedPtr<TBusFuture>> Requests;
-
+
TContinueFunc Start() override {
- Env->TestSync.CheckAndIncrement(0);
-
- for (unsigned i = 0; i < 1; ++i) {
- Requests.push_back(new TBusFuture);
- TNetAddr addr("localhost", ServerPort);
- Env->BusClientService.SendOneWay(new TExampleRequest(&Env->Proto.RequestCount), addr, Requests[i].Get());
- }
-
- return TContinueFunc(&TOneWayTask::GotReplies);
- }
-
- TContinueFunc GotReplies() {
- for (unsigned i = 0; i < Requests.size(); ++i) {
+ Env->TestSync.CheckAndIncrement(0);
+
+ for (unsigned i = 0; i < 1; ++i) {
+ Requests.push_back(new TBusFuture);
+ TNetAddr addr("localhost", ServerPort);
+ Env->BusClientService.SendOneWay(new TExampleRequest(&Env->Proto.RequestCount), addr, Requests[i].Get());
+ }
+
+ return TContinueFunc(&TOneWayTask::GotReplies);
+ }
+
+ TContinueFunc GotReplies() {
+ for (unsigned i = 0; i < Requests.size(); ++i) {
Y_VERIFY(Requests[i]->GetStatus() == MESSAGE_OK);
Y_VERIFY(!Requests[i]->GetResponse());
- }
- Env->TestSync.WaitForAndIncrement(2);
+ }
+ Env->TestSync.WaitForAndIncrement(2);
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(OneWay) {
- TObjectCountCheck objectCountCheck;
-
- TMessageBusClientEnv env;
-
- TOneWayServer server(&env.TestSync);
-
- TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TOneWayTask>(server.Session->GetActualListenPort());
-
- env.TestSync.WaitForAndIncrement(3);
- }
-}
+ TObjectCountCheck objectCountCheck;
+
+ TMessageBusClientEnv env;
+
+ TOneWayServer server(&env.TestSync);
+
+ TIntrusivePtr<TSimpleTaskRunner> task = env.SpawnTask<TOneWayTask>(server.Session->GetActualListenPort());
+
+ env.TestSync.WaitForAndIncrement(3);
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp
index 5d4b13d664..1346ef3243 100644
--- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp
+++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp
@@ -1,17 +1,17 @@
#include "messagebus_server.h"
#include <library/cpp/messagebus/rain_check/core/spawn.h>
-
-using namespace NRainCheck;
-
-TBusTaskStarter::TBusTaskStarter(TAutoPtr<ITaskFactory> taskFactory)
- : TaskFactory(taskFactory)
-{
-}
-
+
+using namespace NRainCheck;
+
+TBusTaskStarter::TBusTaskStarter(TAutoPtr<ITaskFactory> taskFactory)
+ : TaskFactory(taskFactory)
+{
+}
+
void TBusTaskStarter::OnMessage(NBus::TOnMessageContext& onMessage) {
- TaskFactory->NewTask(onMessage);
-}
-
+ TaskFactory->NewTask(onMessage);
+}
+
TBusTaskStarter::~TBusTaskStarter() {
-}
+}
diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h
index 1334f05fe4..28d016599a 100644
--- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h
+++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h
@@ -1,46 +1,46 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/rain_check/core/spawn.h>
#include <library/cpp/messagebus/rain_check/core/task.h>
-
+
#include <library/cpp/messagebus/ybus.h>
-
+
#include <util/system/yassert.h>
-
-namespace NRainCheck {
- class TBusTaskStarter: public NBus::IBusServerHandler {
- private:
- struct ITaskFactory {
- virtual void NewTask(NBus::TOnMessageContext&) = 0;
+
+namespace NRainCheck {
+ class TBusTaskStarter: public NBus::IBusServerHandler {
+ private:
+ struct ITaskFactory {
+ virtual void NewTask(NBus::TOnMessageContext&) = 0;
virtual ~ITaskFactory() {
}
- };
-
- THolder<ITaskFactory> TaskFactory;
-
+ };
+
+ THolder<ITaskFactory> TaskFactory;
+
void OnMessage(NBus::TOnMessageContext&) override;
- public:
- TBusTaskStarter(TAutoPtr<ITaskFactory>);
+ public:
+ TBusTaskStarter(TAutoPtr<ITaskFactory>);
~TBusTaskStarter() override;
-
- public:
- template <typename TTask, typename TEnv>
- static TAutoPtr<TBusTaskStarter> NewStarter(TEnv* env) {
- struct TTaskFactory: public ITaskFactory {
- TEnv* const Env;
-
+
+ public:
+ template <typename TTask, typename TEnv>
+ static TAutoPtr<TBusTaskStarter> NewStarter(TEnv* env) {
+ struct TTaskFactory: public ITaskFactory {
+ TEnv* const Env;
+
TTaskFactory(TEnv* env)
: Env(env)
{
}
-
+
void NewTask(NBus::TOnMessageContext& context) override {
- SpawnTask<TTask, TEnv, NBus::TOnMessageContext&>(Env, context);
- }
- };
-
- return new TBusTaskStarter(new TTaskFactory(env));
- }
- };
-}
+ SpawnTask<TTask, TEnv, NBus::TOnMessageContext&>(Env, context);
+ }
+ };
+
+ return new TBusTaskStarter(new TTaskFactory(env));
+ }
+ };
+}
diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp
index 7c11399f1b..fcb718c3ba 100644
--- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp
+++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp
@@ -1,51 +1,51 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "messagebus_server.h"
-
+
#include <library/cpp/messagebus/rain_check/test/ut/test.h>
-
+
#include <library/cpp/messagebus/test/helper/example.h>
-
-using namespace NBus;
-using namespace NBus::NTest;
-using namespace NRainCheck;
-
-struct TMessageBusServerEnv: public TTestEnvTemplate<TMessageBusServerEnv> {
- TExampleProtocol Proto;
-};
-
+
+using namespace NBus;
+using namespace NBus::NTest;
+using namespace NRainCheck;
+
+struct TMessageBusServerEnv: public TTestEnvTemplate<TMessageBusServerEnv> {
+ TExampleProtocol Proto;
+};
+
Y_UNIT_TEST_SUITE(RainCheckMessageBusServer) {
- struct TSimpleServerTask: public ISimpleTask {
- private:
- TMessageBusServerEnv* const Env;
- TOnMessageContext MessageContext;
-
- public:
- TSimpleServerTask(TMessageBusServerEnv* env, TOnMessageContext& messageContext)
- : Env(env)
- {
- MessageContext.Swap(messageContext);
- }
-
+ struct TSimpleServerTask: public ISimpleTask {
+ private:
+ TMessageBusServerEnv* const Env;
+ TOnMessageContext MessageContext;
+
+ public:
+ TSimpleServerTask(TMessageBusServerEnv* env, TOnMessageContext& messageContext)
+ : Env(env)
+ {
+ MessageContext.Swap(messageContext);
+ }
+
TContinueFunc Start() override {
- MessageContext.SendReplyMove(new TExampleResponse(&Env->Proto.ResponseCount));
+ MessageContext.SendReplyMove(new TExampleResponse(&Env->Proto.ResponseCount));
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(Simple) {
- TMessageBusServerEnv env;
-
- THolder<TBusTaskStarter> starter(TBusTaskStarter::NewStarter<TSimpleServerTask>(&env));
-
- TBusMessageQueuePtr queue(CreateMessageQueue(env.GetExecutor()));
-
- TExampleProtocol proto;
-
- TBusServerSessionPtr session = queue->CreateDestination(&env.Proto, starter.Get(), TBusSessionConfig());
-
- TExampleClient client;
-
- client.SendMessagesWaitReplies(1, TNetAddr("localhost", session->GetActualListenPort()));
- }
-}
+ TMessageBusServerEnv env;
+
+ THolder<TBusTaskStarter> starter(TBusTaskStarter::NewStarter<TSimpleServerTask>(&env));
+
+ TBusMessageQueuePtr queue(CreateMessageQueue(env.GetExecutor()));
+
+ TExampleProtocol proto;
+
+ TBusServerSessionPtr session = queue->CreateDestination(&env.Proto, starter.Get(), TBusSessionConfig());
+
+ TExampleClient client;
+
+ client.SendMessagesWaitReplies(1, TNetAddr("localhost", session->GetActualListenPort()));
+ }
+}
diff --git a/library/cpp/messagebus/rain_check/messagebus/ya.make b/library/cpp/messagebus/rain_check/messagebus/ya.make
index defdac9a61..d7dc902ad1 100644
--- a/library/cpp/messagebus/rain_check/messagebus/ya.make
+++ b/library/cpp/messagebus/rain_check/messagebus/ya.make
@@ -1,15 +1,15 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(g:messagebus)
-
-PEERDIR(
+
+PEERDIR(
library/cpp/messagebus
library/cpp/messagebus/rain_check/core
-)
-
-SRCS(
- messagebus_client.cpp
- messagebus_server.cpp
-)
-
-END()
+)
+
+SRCS(
+ messagebus_client.cpp
+ messagebus_server.cpp
+)
+
+END()
diff --git a/library/cpp/messagebus/rain_check/test/helper/misc.cpp b/library/cpp/messagebus/rain_check/test/helper/misc.cpp
index c0fcb27252..2a75c42744 100644
--- a/library/cpp/messagebus/rain_check/test/helper/misc.cpp
+++ b/library/cpp/messagebus/rain_check/test/helper/misc.cpp
@@ -1,27 +1,27 @@
#include "misc.h"
-#include <util/system/yassert.h>
-
-using namespace NRainCheck;
-
+#include <util/system/yassert.h>
+
+using namespace NRainCheck;
+
void TSpawnNopTasksCoroTask::Run() {
Y_VERIFY(Count <= Completion.size());
- for (unsigned i = 0; i < Count; ++i) {
- SpawnSubtask<TNopCoroTask>(Env, &Completion[i], "");
- }
-
- WaitForSubtasks();
-}
-
+ for (unsigned i = 0; i < Count; ++i) {
+ SpawnSubtask<TNopCoroTask>(Env, &Completion[i], "");
+ }
+
+ WaitForSubtasks();
+}
+
TContinueFunc TSpawnNopTasksSimpleTask::Start() {
Y_VERIFY(Count <= Completion.size());
- for (unsigned i = 0; i < Count; ++i) {
- SpawnSubtask<TNopSimpleTask>(Env, &Completion[i], "");
- }
-
- return &TSpawnNopTasksSimpleTask::Join;
-}
-
+ for (unsigned i = 0; i < Count; ++i) {
+ SpawnSubtask<TNopSimpleTask>(Env, &Completion[i], "");
+ }
+
+ return &TSpawnNopTasksSimpleTask::Join;
+}
+
TContinueFunc TSpawnNopTasksSimpleTask::Join() {
return nullptr;
-}
+}
diff --git a/library/cpp/messagebus/rain_check/test/helper/misc.h b/library/cpp/messagebus/rain_check/test/helper/misc.h
index 9150be4d2f..dbcc04778d 100644
--- a/library/cpp/messagebus/rain_check/test/helper/misc.h
+++ b/library/cpp/messagebus/rain_check/test/helper/misc.h
@@ -1,57 +1,57 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/rain_check/core/rain_check.h>
-
+
#include <array>
-
-namespace NRainCheck {
- struct TNopSimpleTask: public ISimpleTask {
+
+namespace NRainCheck {
+ struct TNopSimpleTask: public ISimpleTask {
TNopSimpleTask(IEnv*, const void*) {
}
-
+
TContinueFunc Start() override {
return nullptr;
- }
- };
-
- struct TNopCoroTask: public ICoroTask {
+ }
+ };
+
+ struct TNopCoroTask: public ICoroTask {
TNopCoroTask(IEnv*, const void*) {
}
-
+
void Run() override {
}
- };
-
- struct TSpawnNopTasksCoroTask: public ICoroTask {
- IEnv* const Env;
- unsigned const Count;
-
- TSpawnNopTasksCoroTask(IEnv* env, unsigned count)
- : Env(env)
- , Count(count)
+ };
+
+ struct TSpawnNopTasksCoroTask: public ICoroTask {
+ IEnv* const Env;
+ unsigned const Count;
+
+ TSpawnNopTasksCoroTask(IEnv* env, unsigned count)
+ : Env(env)
+ , Count(count)
{
}
-
+
std::array<TSubtaskCompletion, 2> Completion;
-
+
void Run() override;
- };
-
- struct TSpawnNopTasksSimpleTask: public ISimpleTask {
- IEnv* const Env;
- unsigned const Count;
-
- TSpawnNopTasksSimpleTask(IEnv* env, unsigned count)
- : Env(env)
- , Count(count)
+ };
+
+ struct TSpawnNopTasksSimpleTask: public ISimpleTask {
+ IEnv* const Env;
+ unsigned const Count;
+
+ TSpawnNopTasksSimpleTask(IEnv* env, unsigned count)
+ : Env(env)
+ , Count(count)
{
}
-
+
std::array<TSubtaskCompletion, 2> Completion;
-
+
TContinueFunc Start() override;
-
- TContinueFunc Join();
- };
-
-}
+
+ TContinueFunc Join();
+ };
+
+}
diff --git a/library/cpp/messagebus/rain_check/test/helper/ya.make b/library/cpp/messagebus/rain_check/test/helper/ya.make
index aa9e4e6d81..08265167a7 100644
--- a/library/cpp/messagebus/rain_check/test/helper/ya.make
+++ b/library/cpp/messagebus/rain_check/test/helper/ya.make
@@ -1,13 +1,13 @@
-LIBRARY(messagebus-rain_check-test-helper)
-
+LIBRARY(messagebus-rain_check-test-helper)
+
OWNER(g:messagebus)
-
+
PEERDIR(
library/cpp/messagebus/rain_check/core
)
-SRCS(
- misc.cpp
-)
-
-END()
+SRCS(
+ misc.cpp
+)
+
+END()
diff --git a/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp b/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp
index 22edbd8c6b..d0c6451f47 100644
--- a/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp
+++ b/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp
@@ -1,154 +1,154 @@
#include <library/cpp/messagebus/rain_check/test/helper/misc.h>
-
+
#include <library/cpp/messagebus/rain_check/core/rain_check.h>
-
+
#include <util/datetime/base.h>
#include <array>
-
-using namespace NRainCheck;
-
-static const unsigned SUBTASKS = 2;
-
-struct TRainCheckPerftestEnv: public TSimpleEnvTemplate<TRainCheckPerftestEnv> {
- unsigned SubtasksPerTask;
-
- TRainCheckPerftestEnv()
- : TSimpleEnvTemplate<TRainCheckPerftestEnv>(4)
- , SubtasksPerTask(1000)
+
+using namespace NRainCheck;
+
+static const unsigned SUBTASKS = 2;
+
+struct TRainCheckPerftestEnv: public TSimpleEnvTemplate<TRainCheckPerftestEnv> {
+ unsigned SubtasksPerTask;
+
+ TRainCheckPerftestEnv()
+ : TSimpleEnvTemplate<TRainCheckPerftestEnv>(4)
+ , SubtasksPerTask(1000)
{
}
-};
-
-struct TCoroOuter: public ICoroTask {
- TRainCheckPerftestEnv* const Env;
-
+};
+
+struct TCoroOuter: public ICoroTask {
+ TRainCheckPerftestEnv* const Env;
+
TCoroOuter(TRainCheckPerftestEnv* env)
: Env(env)
{
}
-
+
void Run() override {
- for (;;) {
- TInstant start = TInstant::Now();
-
- unsigned count = 0;
-
- unsigned current = 1000;
-
- do {
- for (unsigned i = 0; i < current; ++i) {
+ for (;;) {
+ TInstant start = TInstant::Now();
+
+ unsigned count = 0;
+
+ unsigned current = 1000;
+
+ do {
+ for (unsigned i = 0; i < current; ++i) {
std::array<TSubtaskCompletion, SUBTASKS> completion;
-
- for (unsigned j = 0; j < SUBTASKS; ++j) {
- //SpawnSubtask<TNopSimpleTask>(Env, &completion[j]);
- //SpawnSubtask<TSpawnNopTasksCoroTask>(Env, &completion[j], SUBTASKS);
- SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &completion[j], SUBTASKS);
- }
-
- WaitForSubtasks();
- }
-
- count += current;
- current *= 2;
- } while (TInstant::Now() - start < TDuration::Seconds(1));
-
- TDuration d = TInstant::Now() - start;
- unsigned dns = d.NanoSeconds() / count;
- Cerr << dns << "ns per spawn/join\n";
- }
- }
-};
-
-struct TSimpleOuter: public ISimpleTask {
- TRainCheckPerftestEnv* const Env;
-
+
+ for (unsigned j = 0; j < SUBTASKS; ++j) {
+ //SpawnSubtask<TNopSimpleTask>(Env, &completion[j]);
+ //SpawnSubtask<TSpawnNopTasksCoroTask>(Env, &completion[j], SUBTASKS);
+ SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &completion[j], SUBTASKS);
+ }
+
+ WaitForSubtasks();
+ }
+
+ count += current;
+ current *= 2;
+ } while (TInstant::Now() - start < TDuration::Seconds(1));
+
+ TDuration d = TInstant::Now() - start;
+ unsigned dns = d.NanoSeconds() / count;
+ Cerr << dns << "ns per spawn/join\n";
+ }
+ }
+};
+
+struct TSimpleOuter: public ISimpleTask {
+ TRainCheckPerftestEnv* const Env;
+
TSimpleOuter(TRainCheckPerftestEnv* env, const void*)
: Env(env)
{
}
-
- TInstant StartInstant;
- unsigned Count;
- unsigned Current;
- unsigned I;
-
+
+ TInstant StartInstant;
+ unsigned Count;
+ unsigned Current;
+ unsigned I;
+
TContinueFunc Start() override {
- StartInstant = TInstant::Now();
- Count = 0;
- Current = 1000;
- I = 0;
-
- return &TSimpleOuter::Spawn;
- }
-
+ StartInstant = TInstant::Now();
+ Count = 0;
+ Current = 1000;
+ I = 0;
+
+ return &TSimpleOuter::Spawn;
+ }
+
std::array<TSubtaskCompletion, SUBTASKS> Completion;
-
- TContinueFunc Spawn() {
- for (unsigned j = 0; j < SUBTASKS; ++j) {
- //SpawnSubtask<TNopSimpleTask>(Env, &Completion[j]);
- //SpawnSubtask<TSpawnNopTasksCoroTask>(Env, &Completion[j], SUBTASKS);
- SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &Completion[j], SUBTASKS);
- }
-
- return &TSimpleOuter::Join;
- }
-
- TContinueFunc Join() {
- I += 1;
- if (I != Current) {
- return &TSimpleOuter::Spawn;
- }
-
- I = 0;
- Count += Current;
- Current *= 2;
-
- TDuration d = TInstant::Now() - StartInstant;
- if (d < TDuration::Seconds(1)) {
- return &TSimpleOuter::Spawn;
- }
-
- unsigned dns = d.NanoSeconds() / Count;
- Cerr << dns << "ns per spawn/join\n";
-
- return &TSimpleOuter::Start;
- }
-};
-
-struct TReproduceCrashTask: public ISimpleTask {
- TRainCheckPerftestEnv* const Env;
-
+
+ TContinueFunc Spawn() {
+ for (unsigned j = 0; j < SUBTASKS; ++j) {
+ //SpawnSubtask<TNopSimpleTask>(Env, &Completion[j]);
+ //SpawnSubtask<TSpawnNopTasksCoroTask>(Env, &Completion[j], SUBTASKS);
+ SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &Completion[j], SUBTASKS);
+ }
+
+ return &TSimpleOuter::Join;
+ }
+
+ TContinueFunc Join() {
+ I += 1;
+ if (I != Current) {
+ return &TSimpleOuter::Spawn;
+ }
+
+ I = 0;
+ Count += Current;
+ Current *= 2;
+
+ TDuration d = TInstant::Now() - StartInstant;
+ if (d < TDuration::Seconds(1)) {
+ return &TSimpleOuter::Spawn;
+ }
+
+ unsigned dns = d.NanoSeconds() / Count;
+ Cerr << dns << "ns per spawn/join\n";
+
+ return &TSimpleOuter::Start;
+ }
+};
+
+struct TReproduceCrashTask: public ISimpleTask {
+ TRainCheckPerftestEnv* const Env;
+
TReproduceCrashTask(TRainCheckPerftestEnv* env)
: Env(env)
{
}
-
+
std::array<TSubtaskCompletion, SUBTASKS> Completion;
-
+
TContinueFunc Start() override {
- for (unsigned j = 0; j < 2; ++j) {
- //SpawnSubtask<TNopSimpleTask>(Env, &Completion[j]);
- SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &Completion[j], SUBTASKS);
- }
-
- return &TReproduceCrashTask::Start;
- }
-};
-
-int main(int argc, char** argv) {
+ for (unsigned j = 0; j < 2; ++j) {
+ //SpawnSubtask<TNopSimpleTask>(Env, &Completion[j]);
+ SpawnSubtask<TSpawnNopTasksSimpleTask>(Env, &Completion[j], SUBTASKS);
+ }
+
+ return &TReproduceCrashTask::Start;
+ }
+};
+
+int main(int argc, char** argv) {
Y_UNUSED(argc);
Y_UNUSED(argv);
-
- TRainCheckPerftestEnv env;
-
- env.SpawnTask<TSimpleOuter>("");
- //env.SpawnTask<TCoroOuter>();
- //env.SpawnTask<TReproduceCrashTask>();
-
- for (;;) {
- Sleep(TDuration::Hours(1));
- }
-
- return 0;
-}
+
+ TRainCheckPerftestEnv env;
+
+ env.SpawnTask<TSimpleOuter>("");
+ //env.SpawnTask<TCoroOuter>();
+ //env.SpawnTask<TReproduceCrashTask>();
+
+ for (;;) {
+ Sleep(TDuration::Hours(1));
+ }
+
+ return 0;
+}
diff --git a/library/cpp/messagebus/rain_check/test/perftest/ya.make b/library/cpp/messagebus/rain_check/test/perftest/ya.make
index 7330a71700..f80ddf2c05 100644
--- a/library/cpp/messagebus/rain_check/test/perftest/ya.make
+++ b/library/cpp/messagebus/rain_check/test/perftest/ya.make
@@ -1,14 +1,14 @@
-PROGRAM(messagebus_rain_check_perftest)
-
+PROGRAM(messagebus_rain_check_perftest)
+
OWNER(g:messagebus)
-
-PEERDIR(
+
+PEERDIR(
library/cpp/messagebus/rain_check/core
library/cpp/messagebus/rain_check/test/helper
-)
-
-SRCS(
- perftest.cpp
-)
-
-END()
+)
+
+SRCS(
+ perftest.cpp
+)
+
+END()
diff --git a/library/cpp/messagebus/rain_check/test/ut/test.h b/library/cpp/messagebus/rain_check/test/ut/test.h
index 724f6b7530..922f0f06cb 100644
--- a/library/cpp/messagebus/rain_check/test/ut/test.h
+++ b/library/cpp/messagebus/rain_check/test/ut/test.h
@@ -1,13 +1,13 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/rain_check/core/rain_check.h>
#include <library/cpp/messagebus/misc/test_sync.h>
-
-template <typename TSelf>
-struct TTestEnvTemplate: public NRainCheck::TSimpleEnvTemplate<TSelf> {
- TTestSync TestSync;
-};
-
-struct TTestEnv: public TTestEnvTemplate<TTestEnv> {
-};
+
+template <typename TSelf>
+struct TTestEnvTemplate: public NRainCheck::TSimpleEnvTemplate<TSelf> {
+ TTestSync TestSync;
+};
+
+struct TTestEnv: public TTestEnvTemplate<TTestEnv> {
+};
diff --git a/library/cpp/messagebus/rain_check/test/ut/ya.make b/library/cpp/messagebus/rain_check/test/ut/ya.make
index 9f7a93417a..6191fe9fe0 100644
--- a/library/cpp/messagebus/rain_check/test/ut/ya.make
+++ b/library/cpp/messagebus/rain_check/test/ut/ya.make
@@ -1,24 +1,24 @@
PROGRAM(library-messagebus-rain_check-test-ut)
-
+
OWNER(g:messagebus)
-
-PEERDIR(
+
+PEERDIR(
library/cpp/testing/unittest_main
library/cpp/messagebus/rain_check/core
library/cpp/messagebus/rain_check/http
library/cpp/messagebus/rain_check/messagebus
library/cpp/messagebus/test/helper
-)
-
-SRCS(
- ../../core/coro_ut.cpp
- ../../core/simple_ut.cpp
- ../../core/sleep_ut.cpp
- ../../core/spawn_ut.cpp
- ../../core/track_ut.cpp
+)
+
+SRCS(
+ ../../core/coro_ut.cpp
+ ../../core/simple_ut.cpp
+ ../../core/sleep_ut.cpp
+ ../../core/spawn_ut.cpp
+ ../../core/track_ut.cpp
../../http/client_ut.cpp
- ../../messagebus/messagebus_client_ut.cpp
- ../../messagebus/messagebus_server_ut.cpp
-)
-
-END()
+ ../../messagebus/messagebus_client_ut.cpp
+ ../../messagebus/messagebus_server_ut.cpp
+)
+
+END()
diff --git a/library/cpp/messagebus/rain_check/test/ya.make b/library/cpp/messagebus/rain_check/test/ya.make
index 4c1d6f8161..83cdb16977 100644
--- a/library/cpp/messagebus/rain_check/test/ya.make
+++ b/library/cpp/messagebus/rain_check/test/ya.make
@@ -1,6 +1,6 @@
OWNER(g:messagebus)
-RECURSE(
+RECURSE(
perftest
ut
-)
+)
diff --git a/library/cpp/messagebus/rain_check/ya.make b/library/cpp/messagebus/rain_check/ya.make
index 966d54c232..c408615f42 100644
--- a/library/cpp/messagebus/rain_check/ya.make
+++ b/library/cpp/messagebus/rain_check/ya.make
@@ -1,8 +1,8 @@
OWNER(g:messagebus)
-RECURSE(
+RECURSE(
core
http
messagebus
test
-)
+)