diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/rain_check | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/rain_check')
34 files changed, 523 insertions, 523 deletions
diff --git a/library/cpp/messagebus/rain_check/core/coro.cpp b/library/cpp/messagebus/rain_check/core/coro.cpp index 8dda6c9f97..500841dd5b 100644 --- a/library/cpp/messagebus/rain_check/core/coro.cpp +++ b/library/cpp/messagebus/rain_check/core/coro.cpp @@ -26,12 +26,12 @@ TCoroTaskRunner::~TCoroTaskRunner() { Y_ASSERT(CoroDone); } -Y_POD_STATIC_THREAD(TContMachineContext*) -CallerContext; -Y_POD_STATIC_THREAD(TCoroTaskRunner*) -Task; +Y_POD_STATIC_THREAD(TContMachineContext*) +CallerContext; +Y_POD_STATIC_THREAD(TCoroTaskRunner*) +Task; -bool TCoroTaskRunner::ReplyReceived() { +bool TCoroTaskRunner::ReplyReceived() { Y_ASSERT(!CoroDone); TContMachineContext me; @@ -49,12 +49,12 @@ bool TCoroTaskRunner::ReplyReceived() { return !CoroDone; } -void NRainCheck::TCoroTaskRunner::DoRun() { +void NRainCheck::TCoroTaskRunner::DoRun() { GetImpl()->Run(); CoroDone = true; ContMachineContext.SwitchTo(CallerContext); } -void NRainCheck::ICoroTask::WaitForSubtasks() { +void NRainCheck::ICoroTask::WaitForSubtasks() { Task->ContMachineContext.SwitchTo(CallerContext); } diff --git a/library/cpp/messagebus/rain_check/core/coro.h b/library/cpp/messagebus/rain_check/core/coro.h index c62bef5ebc..95e2a30f9b 100644 --- a/library/cpp/messagebus/rain_check/core/coro.h +++ b/library/cpp/messagebus/rain_check/core/coro.h @@ -14,7 +14,7 @@ namespace NRainCheck { class TCoroTaskRunner: public TTaskRunnerBase, private ITrampoLine { friend class ICoroTask; - + private: NPrivate::TCoroStack Stack; TContMachineContext ContMachineContext; @@ -31,9 +31,9 @@ namespace NRainCheck { void DoRun() override /* override */; - ICoroTask* GetImpl() { - return (ICoroTask*)GetImplBase(); - } + ICoroTask* GetImpl() { + return (ICoroTask*)GetImplBase(); + } }; class ICoroTask: public ITaskBase { @@ -46,10 +46,10 @@ namespace NRainCheck { typedef TCoroTaskRunner TTaskRunner; typedef ICoroTask ITask; - ICoroTask(size_t stackSize = 0x2000) - : StackSize(stackSize) - { - } + ICoroTask(size_t stackSize = 0x2000) + : StackSize(stackSize) + { + } virtual void Run() = 0; static void WaitForSubtasks(); diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.cpp b/library/cpp/messagebus/rain_check/core/coro_stack.cpp index be69939415..83b984ca6e 100644 --- a/library/cpp/messagebus/rain_check/core/coro_stack.cpp +++ b/library/cpp/messagebus/rain_check/core/coro_stack.cpp @@ -22,11 +22,11 @@ TCoroStack::TCoroStack(size_t size) *MagicNumberLocation() = MAGIC_NUMBER; #if defined(WITH_VALGRIND) - ValgrindStackId = VALGRIND_STACK_REGISTER(Data(), (char*)Data() + Size()); + ValgrindStackId = VALGRIND_STACK_REGISTER(Data(), (char*)Data() + Size()); #endif } -TCoroStack::~TCoroStack() { +TCoroStack::~TCoroStack() { #if defined(WITH_VALGRIND) VALGRIND_STACK_DEREGISTER(ValgrindStackId); #endif diff --git a/library/cpp/messagebus/rain_check/core/coro_stack.h b/library/cpp/messagebus/rain_check/core/coro_stack.h index 6d40da1ea1..2f3520e6e4 100644 --- a/library/cpp/messagebus/rain_check/core/coro_stack.h +++ b/library/cpp/messagebus/rain_check/core/coro_stack.h @@ -4,51 +4,51 @@ #include <util/generic/ptr.h> #include <util/system/valgrind.h> -namespace NRainCheck { - namespace NPrivate { - struct TCoroStack { - THolder<void, TFree> DataHolder; - size_t SizeValue; +namespace NRainCheck { + namespace NPrivate { + struct TCoroStack { + THolder<void, TFree> DataHolder; + size_t SizeValue; #if defined(WITH_VALGRIND) - size_t ValgrindStackId; + size_t ValgrindStackId; #endif - TCoroStack(size_t size); - ~TCoroStack(); + TCoroStack(size_t size); + ~TCoroStack(); - void* Data() { - return DataHolder.Get(); - } + void* Data() { + return DataHolder.Get(); + } - size_t Size() { - return SizeValue; - } + size_t Size() { + return SizeValue; + } TArrayRef<char> MemRegion() { return TArrayRef((char*)Data(), Size()); - } + } - ui32* MagicNumberLocation() { + ui32* MagicNumberLocation() { #if STACK_GROW_DOWN == 1 - return (ui32*)Data(); + return (ui32*)Data(); #elif STACK_GROW_DOWN == 0 - return ((ui32*)(((char*)Data()) + Size())) - 1; + return ((ui32*)(((char*)Data()) + Size())) - 1; #else #error "unknown" #endif - } + } - static void FailStackOverflow(); + static void FailStackOverflow(); - inline void VerifyNoStackOverflow() noexcept { - if (Y_UNLIKELY(*MagicNumberLocation() != MAGIC_NUMBER)) { - FailStackOverflow(); - } + inline void VerifyNoStackOverflow() noexcept { + if (Y_UNLIKELY(*MagicNumberLocation() != MAGIC_NUMBER)) { + FailStackOverflow(); + } } - static const ui32 MAGIC_NUMBER = 0xAB4D15FE; - }; + static const ui32 MAGIC_NUMBER = 0xAB4D15FE; + }; - } -} + } +} diff --git a/library/cpp/messagebus/rain_check/core/coro_ut.cpp b/library/cpp/messagebus/rain_check/core/coro_ut.cpp index 95edce680b..61a33584a5 100644 --- a/library/cpp/messagebus/rain_check/core/coro_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/coro_ut.cpp @@ -8,7 +8,7 @@ using namespace NRainCheck; Y_UNIT_TEST_SUITE(RainCheckCoro) { - struct TSimpleCoroTask : ICoroTask { + struct TSimpleCoroTask : ICoroTask { TTestSync* const TestSync; TSimpleCoroTask(TTestEnv*, TTestSync* testSync) @@ -30,7 +30,7 @@ Y_UNIT_TEST_SUITE(RainCheckCoro) { testSync.WaitForAndIncrement(1); } - struct TSleepCoroTask : ICoroTask { + struct TSleepCoroTask : ICoroTask { TTestEnv* const Env; TTestSync* const TestSync; @@ -59,7 +59,7 @@ Y_UNIT_TEST_SUITE(RainCheckCoro) { testSync.WaitForAndIncrement(1); } - struct TSubtask : ICoroTask { + struct TSubtask : ICoroTask { TTestEnv* const Env; TTestSync* const TestSync; @@ -74,7 +74,7 @@ Y_UNIT_TEST_SUITE(RainCheckCoro) { } }; - struct TSpawnCoroTask : ICoroTask { + struct TSpawnCoroTask : ICoroTask { TTestEnv* const Env; TTestSync* const TestSync; diff --git a/library/cpp/messagebus/rain_check/core/env.h b/library/cpp/messagebus/rain_check/core/env.h index 945364c3b3..f6dd7fceb6 100644 --- a/library/cpp/messagebus/rain_check/core/env.h +++ b/library/cpp/messagebus/rain_check/core/env.h @@ -10,15 +10,15 @@ namespace NRainCheck { struct IEnv { virtual ::NActor::TExecutor* GetExecutor() = 0; - virtual ~IEnv() { - } + virtual ~IEnv() { + } }; template <typename TSelf> struct TEnvTemplate: public IEnv { template <typename TTask, typename TParam> TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TParam param) { - return ::NRainCheck::SpawnTask<TTask, TSelf>((TSelf*)this, param); + return ::NRainCheck::SpawnTask<TTask, TSelf>((TSelf*)this, param); } }; @@ -29,19 +29,19 @@ namespace NRainCheck { TSimpleEnvTemplate(unsigned threadCount = 0) : Executor(new ::NActor::TExecutor(threadCount != 0 ? threadCount : 4)) - { - } + { + } - ::NActor::TExecutor* GetExecutor() override { - return Executor.Get(); - } + ::NActor::TExecutor* GetExecutor() override { + return Executor.Get(); + } }; struct TSimpleEnv: public TSimpleEnvTemplate<TSimpleEnv> { - TSimpleEnv(unsigned threadCount = 0) - : TSimpleEnvTemplate<TSimpleEnv>(threadCount) - { - } + TSimpleEnv(unsigned threadCount = 0) + : TSimpleEnvTemplate<TSimpleEnv>(threadCount) + { + } }; } diff --git a/library/cpp/messagebus/rain_check/core/simple.cpp b/library/cpp/messagebus/rain_check/core/simple.cpp index 46a1d0e8aa..70182b2f93 100644 --- a/library/cpp/messagebus/rain_check/core/simple.cpp +++ b/library/cpp/messagebus/rain_check/core/simple.cpp @@ -12,7 +12,7 @@ TSimpleTaskRunner::~TSimpleTaskRunner() { Y_ASSERT(!ContinueFunc); } -bool TSimpleTaskRunner::ReplyReceived() { +bool TSimpleTaskRunner::ReplyReceived() { ContinueFunc = (GetImpl()->*(ContinueFunc.Func))(); return !!ContinueFunc; } diff --git a/library/cpp/messagebus/rain_check/core/simple.h b/library/cpp/messagebus/rain_check/core/simple.h index 82664cabab..20e1bf19f5 100644 --- a/library/cpp/messagebus/rain_check/core/simple.h +++ b/library/cpp/messagebus/rain_check/core/simple.h @@ -15,17 +15,17 @@ namespace NRainCheck { public: TContinueFunc() : Func(nullptr) - { - } + { + } TContinueFunc(void*) : Func(nullptr) - { - } + { + } template <typename TTask> TContinueFunc(TContinueFunc (TTask::*func)()) - : Func((TFunc)func) + : Func((TFunc)func) { static_assert((std::is_base_of<ISimpleTask, TTask>::value), "expect (std::is_base_of<ISimpleTask, TTask>::value)"); } @@ -46,9 +46,9 @@ namespace NRainCheck { bool ReplyReceived() override /* override */; - ISimpleTask* GetImpl() { - return (ISimpleTask*)GetImplBase(); - } + ISimpleTask* GetImpl() { + return (ISimpleTask*)GetImplBase(); + } }; class ISimpleTask: public ITaskBase { diff --git a/library/cpp/messagebus/rain_check/core/simple_ut.cpp b/library/cpp/messagebus/rain_check/core/simple_ut.cpp index 820996d11a..d4545e05aa 100644 --- a/library/cpp/messagebus/rain_check/core/simple_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/simple_ut.cpp @@ -16,8 +16,8 @@ Y_UNIT_TEST_SUITE(RainCheckSimple) { TTaskWithCompletionCallback(TTestEnv* env, TTestSync* testSync) : Env(env) , TestSync(testSync) - { - } + { + } TSubtaskCompletion SleepCompletion; @@ -38,7 +38,7 @@ Y_UNIT_TEST_SUITE(RainCheckSimple) { SleepCompletion.SetCompletionCallback(&TTaskWithCompletionCallback::NextSleepCompletionCallback); } - void NextSleepCompletionCallback(TSubtaskCompletion*) { + void NextSleepCompletionCallback(TSubtaskCompletion*) { TestSync->CheckAndIncrement(2); } diff --git a/library/cpp/messagebus/rain_check/core/sleep.cpp b/library/cpp/messagebus/rain_check/core/sleep.cpp index 4fb645b7d6..f5d0b4cac9 100644 --- a/library/cpp/messagebus/rain_check/core/sleep.cpp +++ b/library/cpp/messagebus/rain_check/core/sleep.cpp @@ -18,7 +18,7 @@ NRainCheck::TSleepService::TSleepService() { } -NRainCheck::TSleepService::~TSleepService() { +NRainCheck::TSleepService::~TSleepService() { if (!!SchedulerHolder) { Scheduler->Stop(); } @@ -40,7 +40,7 @@ namespace { }; } -void TSleepService::Sleep(TSubtaskCompletion* r, TDuration duration) { +void TSleepService::Sleep(TSubtaskCompletion* r, TDuration duration) { TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); r->SetRunning(current); Scheduler->Schedule(new TSleepServiceScheduleItem(r, TInstant::Now() + duration)); diff --git a/library/cpp/messagebus/rain_check/core/sleep.h b/library/cpp/messagebus/rain_check/core/sleep.h index dc63c098df..1a7a1f8674 100644 --- a/library/cpp/messagebus/rain_check/core/sleep.h +++ b/library/cpp/messagebus/rain_check/core/sleep.h @@ -11,7 +11,7 @@ namespace NRainCheck { private: THolder< ::NBus::NPrivate::TScheduler> SchedulerHolder; ::NBus::NPrivate::TScheduler* const Scheduler; - + public: TSleepService(::NBus::NPrivate::TScheduler*); TSleepService(); diff --git a/library/cpp/messagebus/rain_check/core/sleep_ut.cpp b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp index cddcb2b44a..2ae85a87b1 100644 --- a/library/cpp/messagebus/rain_check/core/sleep_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/sleep_ut.cpp @@ -15,8 +15,8 @@ Y_UNIT_TEST_SUITE(Sleep) { TTestTask(TSimpleEnv* env, TTestSync* testSync) : Env(env) , TestSync(testSync) - { - } + { + } TSubtaskCompletion Sleep; diff --git a/library/cpp/messagebus/rain_check/core/spawn.cpp b/library/cpp/messagebus/rain_check/core/spawn.cpp index 879192e26a..c570355fbe 100644 --- a/library/cpp/messagebus/rain_check/core/spawn.cpp +++ b/library/cpp/messagebus/rain_check/core/spawn.cpp @@ -1,5 +1,5 @@ #include "spawn.h" -void NRainCheck::NPrivate::SpawnTaskImpl(TTaskRunnerBase* task) { +void NRainCheck::NPrivate::SpawnTaskImpl(TTaskRunnerBase* task) { task->Schedule(); } diff --git a/library/cpp/messagebus/rain_check/core/spawn.h b/library/cpp/messagebus/rain_check/core/spawn.h index f61b8cc8c2..f2b146bf29 100644 --- a/library/cpp/messagebus/rain_check/core/spawn.h +++ b/library/cpp/messagebus/rain_check/core/spawn.h @@ -38,7 +38,7 @@ namespace NRainCheck { template <typename TTask, typename TEnv, typename TParam> TIntrusivePtr<typename TTask::TTaskRunner> SpawnTask(TEnv* env, TParam param1, ISubtaskListener* subtaskListener = &TNopSubtaskListener::Instance) { return NPrivate::SpawnTaskWithRunner< - TTask, typename TTask::ITask, typename TTask::TTaskRunner, TEnv, TParam>(env, param1, subtaskListener); + TTask, typename TTask::ITask, typename TTask::TTaskRunner, TEnv, TParam>(env, param1, subtaskListener); } // Instantiate and start subtask of given task. diff --git a/library/cpp/messagebus/rain_check/core/spawn_ut.cpp b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp index a0391953e9..ba5a5e41cf 100644 --- a/library/cpp/messagebus/rain_check/core/spawn_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/spawn_ut.cpp @@ -19,8 +19,8 @@ Y_UNIT_TEST_SUITE(Spawn) { TTestTask(TSimpleEnv*, TTestSync* testSync) : TestSync(testSync) , I(0) - { - } + { + } TSystemEvent Started; @@ -60,8 +60,8 @@ Y_UNIT_TEST_SUITE(Spawn) { TSubtask(TTestEnv* env, TTestSync* testSync) : Env(env) , TestSync(testSync) - { - } + { + } TContinueFunc Start() override { Sleep(TDuration::MilliSeconds(1)); @@ -77,8 +77,8 @@ Y_UNIT_TEST_SUITE(Spawn) { TSpawnTask(TTestEnv* env, TTestSync* testSync) : Env(env) , TestSync(testSync) - { - } + { + } TSubtaskCompletion SubtaskCompletion; @@ -109,12 +109,12 @@ Y_UNIT_TEST_SUITE(Spawn) { TTestSync* const TestSync; unsigned I; - TSpawnLongTask(TTestEnv* env, TTestSync* testSync) - : Env(env) - , TestSync(testSync) - , I(0) - { - } + TSpawnLongTask(TTestEnv* env, TTestSync* testSync) + : Env(env) + , TestSync(testSync) + , I(0) + { + } std::array<TSubtaskCompletion, 3> Subtasks; diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp index 3c5c609d07..a098437d53 100644 --- a/library/cpp/messagebus/rain_check/core/task.cpp +++ b/library/cpp/messagebus/rain_check/core/task.cpp @@ -11,12 +11,12 @@ using namespace NRainCheck::NPrivate; using namespace NActor; namespace { - Y_POD_STATIC_THREAD(TTaskRunnerBase*) - ThreadCurrentTask; + Y_POD_STATIC_THREAD(TTaskRunnerBase*) + ThreadCurrentTask; } -void TNopSubtaskListener::SetDone() { -} +void TNopSubtaskListener::SetDone() { +} TNopSubtaskListener TNopSubtaskListener::Instance; @@ -46,12 +46,12 @@ namespace { ~TRunningInThisThreadGuard() { Y_ASSERT(ThreadCurrentTask == Task); - ThreadCurrentTask = nullptr; + ThreadCurrentTask = nullptr; } }; } -void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) { +void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) { Y_ASSERT(RefCount() > 0); TRunningInThisThreadGuard g(this); @@ -90,16 +90,16 @@ void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) { } } -bool TTaskRunnerBase::IsRunningInThisThread() const { +bool TTaskRunnerBase::IsRunningInThisThread() const { return ThreadCurrentTask == this; } -TSubtaskCompletion::~TSubtaskCompletion() { +TSubtaskCompletion::~TSubtaskCompletion() { ESubtaskState state = State.Get(); Y_ASSERT(state == CREATED || state == DONE || state == CANCELED); } -void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) { +void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) { Y_ASSERT(IsComplete()); if (!!CompletionFunc) { @@ -111,7 +111,7 @@ void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) { } } -void NRainCheck::TSubtaskCompletion::Cancel() { +void NRainCheck::TSubtaskCompletion::Cancel() { for (;;) { ESubtaskState state = State.Get(); if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) { @@ -129,7 +129,7 @@ void NRainCheck::TSubtaskCompletion::Cancel() { } } -void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) { +void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) { Y_ASSERT(!TaskRunner); Y_ASSERT(!!parent); @@ -150,7 +150,7 @@ void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) { } } -void TSubtaskCompletion::SetDone() { +void TSubtaskCompletion::SetDone() { Y_ASSERT(!!TaskRunner); TTaskRunnerBase* temp = TaskRunner; TaskRunner = nullptr; @@ -194,11 +194,11 @@ void NRainCheck::TTaskRunnerBase::ReleaseRef() } #endif -void TTaskRunnerBase::AssertInThisThread() const { +void TTaskRunnerBase::AssertInThisThread() const { Y_ASSERT(IsRunningInThisThread()); } -TTaskRunnerBase* TTaskRunnerBase::CurrentTask() { +TTaskRunnerBase* TTaskRunnerBase::CurrentTask() { Y_VERIFY(!!ThreadCurrentTask); return ThreadCurrentTask; } diff --git a/library/cpp/messagebus/rain_check/core/task.h b/library/cpp/messagebus/rain_check/core/task.h index 16a0b3cb19..7d8778bcda 100644 --- a/library/cpp/messagebus/rain_check/core/task.h +++ b/library/cpp/messagebus/rain_check/core/task.h @@ -14,8 +14,8 @@ namespace NRainCheck { struct ISubtaskListener { virtual void SetDone() = 0; - virtual ~ISubtaskListener() { - } + virtual ~ISubtaskListener() { + } }; struct TNopSubtaskListener: public ISubtaskListener { @@ -33,17 +33,17 @@ namespace NRainCheck { public: TSubtaskCompletionFunc() : Func(nullptr) - { - } + { + } TSubtaskCompletionFunc(void*) : Func(nullptr) - { - } + { + } template <typename TTask> TSubtaskCompletionFunc(void (TTask::*func)(TSubtaskCompletion*)) - : Func((TFunc)func) + : Func((TFunc)func) { static_assert((std::is_base_of<ITaskBase, TTask>::value), "expect (std::is_base_of<ITaskBase, TTask>::value)"); } @@ -56,13 +56,13 @@ namespace NRainCheck { template <typename T> class TTaskFuture; -#define SUBTASK_STATE_MAP(XX) \ - XX(CREATED, "Initial") \ - XX(RUNNING, "Running") \ - XX(DONE, "Completed") \ - XX(CANCEL_REQUESTED, "Cancel requested, but still executing") \ - XX(CANCELED, "Canceled") \ - /**/ +#define SUBTASK_STATE_MAP(XX) \ + XX(CREATED, "Initial") \ + XX(RUNNING, "Running") \ + XX(DONE, "Completed") \ + XX(CANCEL_REQUESTED, "Cancel requested, but still executing") \ + XX(CANCELED, "Canceled") \ + /**/ enum ESubtaskState { SUBTASK_STATE_MAP(ENUM_VALUE_GEN_NO_VALUE) @@ -70,38 +70,38 @@ namespace NRainCheck { ENUM_TO_STRING(ESubtaskState, SUBTASK_STATE_MAP) - class TSubtaskCompletion : TNonCopyable, public ISubtaskListener { + class TSubtaskCompletion : TNonCopyable, public ISubtaskListener { friend struct TTaskAccessor; - + private: TAtomicBox<ESubtaskState> State; TTaskRunnerBase* volatile TaskRunner; TSubtaskCompletionFunc CompletionFunc; - + public: - TSubtaskCompletion() - : State(CREATED) - , TaskRunner() - { - } + TSubtaskCompletion() + : State(CREATED) + , TaskRunner() + { + } ~TSubtaskCompletion() override; // Either done or cancel requested or cancelled bool IsComplete() const { ESubtaskState state = State.Get(); switch (state) { - case RUNNING: - return false; - case DONE: - return true; - case CANCEL_REQUESTED: - return false; - case CANCELED: - return true; - case CREATED: - Y_FAIL("not started"); - default: - Y_FAIL("unknown value: %u", (unsigned)state); + case RUNNING: + return false; + case DONE: + return true; + case CANCEL_REQUESTED: + return false; + case CANCELED: + return true; + case CREATED: + Y_FAIL("not started"); + default: + Y_FAIL("unknown value: %u", (unsigned)state); } } @@ -128,7 +128,7 @@ namespace NRainCheck { }; // See ISimpleTask, ICoroTask - class TTaskRunnerBase: public TAtomicRefCount<TTaskRunnerBase>, public NActor::TActor<TTaskRunnerBase> { + class TTaskRunnerBase: public TAtomicRefCount<TTaskRunnerBase>, public NActor::TActor<TTaskRunnerBase> { friend class NActor::TActor<TTaskRunnerBase>; friend class TContinueFunc; friend struct TTaskAccessor; @@ -163,9 +163,9 @@ namespace NRainCheck { protected: //void RetainRef(); //void ReleaseRef(); - ITaskBase* GetImplBase() { - return Impl.Get(); - } + ITaskBase* GetImplBase() { + return Impl.Get(); + } private: // true if need to call again @@ -174,11 +174,11 @@ namespace NRainCheck { class ITaskBase { public: - virtual ~ITaskBase() { - } + virtual ~ITaskBase() { + } }; // Check that current method executed inside some task. bool AreWeInsideTask(); -} +} diff --git a/library/cpp/messagebus/rain_check/core/track.cpp b/library/cpp/messagebus/rain_check/core/track.cpp index d704ee022a..092a51a214 100644 --- a/library/cpp/messagebus/rain_check/core/track.cpp +++ b/library/cpp/messagebus/rain_check/core/track.cpp @@ -3,7 +3,7 @@ using namespace NRainCheck; using namespace NRainCheck::NPrivate; -void TTaskTrackerReceipt::SetDone() { +void TTaskTrackerReceipt::SetDone() { TaskTracker->GetQueue<TTaskTrackerReceipt*>()->EnqueueAndSchedule(this); } @@ -11,12 +11,12 @@ TString TTaskTrackerReceipt::GetStatusSingleLine() { return Task->GetStatusSingleLine(); } -TTaskTracker::TTaskTracker(NActor::TExecutor* executor) - : NActor::TActor<TTaskTracker>(executor) +TTaskTracker::TTaskTracker(NActor::TExecutor* executor) + : NActor::TActor<TTaskTracker>(executor) { } -TTaskTracker::~TTaskTracker() { +TTaskTracker::~TTaskTracker() { Y_ASSERT(Tasks.Empty()); } @@ -47,7 +47,7 @@ void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncR status->SetResult(s); } -void TTaskTracker::Act(NActor::TDefaultTag) { +void TTaskTracker::Act(NActor::TDefaultTag) { GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->DequeueAll(); GetQueue<ITaskFactory*>()->DequeueAll(); GetQueue<TTaskTrackerReceipt*>()->DequeueAll(); diff --git a/library/cpp/messagebus/rain_check/core/track.h b/library/cpp/messagebus/rain_check/core/track.h index 88a4e6fd9e..d387de7574 100644 --- a/library/cpp/messagebus/rain_check/core/track.h +++ b/library/cpp/messagebus/rain_check/core/track.h @@ -16,18 +16,18 @@ namespace NRainCheck { namespace NPrivate { struct ITaskFactory { virtual TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener*) = 0; - virtual ~ITaskFactory() { - } + virtual ~ITaskFactory() { + } }; struct TTaskTrackerReceipt: public ISubtaskListener, public TIntrusiveListItem<TTaskTrackerReceipt> { TTaskTracker* const TaskTracker; TIntrusivePtr<TTaskRunnerBase> Task; - TTaskTrackerReceipt(TTaskTracker* taskTracker) - : TaskTracker(taskTracker) - { - } + TTaskTrackerReceipt(TTaskTracker* taskTracker) + : TaskTracker(taskTracker) + { + } void SetDone() override; @@ -41,11 +41,11 @@ namespace NRainCheck { } class TTaskTracker - : public TAtomicRefCount<TTaskTracker>, - public NActor::TActor<TTaskTracker>, - public NActor::TQueueInActor<TTaskTracker, NPrivate::ITaskFactory*>, - public NActor::TQueueInActor<TTaskTracker, NPrivate::TTaskTrackerReceipt*>, - public NActor::TQueueInActor<TTaskTracker, TAsyncResult<NPrivate::TTaskTrackerStatus>*> { + : public TAtomicRefCount<TTaskTracker>, + public NActor::TActor<TTaskTracker>, + public NActor::TQueueInActor<TTaskTracker, NPrivate::ITaskFactory*>, + public NActor::TQueueInActor<TTaskTracker, NPrivate::TTaskTrackerReceipt*>, + public NActor::TQueueInActor<TTaskTracker, TAsyncResult<NPrivate::TTaskTrackerStatus>*> { friend struct NPrivate::TTaskTrackerReceipt; private: @@ -77,11 +77,11 @@ namespace NRainCheck { TEnv* const Env; TParam Param; - TTaskFactory(TEnv* env, TParam param) - : Env(env) - , Param(param) - { - } + TTaskFactory(TEnv* env, TParam param) + : Env(env) + , Param(param) + { + } TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener* subtaskListener) override { return NRainCheck::SpawnTask<TTask>(Env, Param, subtaskListener).Get(); diff --git a/library/cpp/messagebus/rain_check/core/track_ut.cpp b/library/cpp/messagebus/rain_check/core/track_ut.cpp index ef76a7a2af..05f7de1319 100644 --- a/library/cpp/messagebus/rain_check/core/track_ut.cpp +++ b/library/cpp/messagebus/rain_check/core/track_ut.cpp @@ -11,10 +11,10 @@ Y_UNIT_TEST_SUITE(TaskTracker) { struct TTaskForTracker: public ISimpleTask { TTestSync* const TestSync; - TTaskForTracker(TTestEnv*, TTestSync* testSync) - : TestSync(testSync) - { - } + TTaskForTracker(TTestEnv*, TTestSync* testSync) + : TestSync(testSync) + { + } TContinueFunc Start() override { TestSync->WaitForAndIncrement(0); diff --git a/library/cpp/messagebus/rain_check/http/client.cpp b/library/cpp/messagebus/rain_check/http/client.cpp index 0fb8a93420..5ef5ceeece 100644 --- a/library/cpp/messagebus/rain_check/http/client.cpp +++ b/library/cpp/messagebus/rain_check/http/client.cpp @@ -14,141 +14,141 @@ #include <util/stream/str.h> namespace NRainCheck { - class THttpCallback: public NNeh::IOnRecv { - public: - THttpCallback(NRainCheck::THttpFuture* future) - : Future(future) - { - Y_VERIFY(!!future, "future is NULL"); - } - - void OnRecv(NNeh::THandle& handle) override { - THolder<THttpCallback> self(this); - NNeh::TResponseRef response = handle.Get(); - Future->SetDoneAndSchedule(response); - } - - private: - NRainCheck::THttpFuture* const Future; - }; - - THttpFuture::THttpFuture() - : Task(nullptr) - , ErrorCode(THttpFuture::NoError) + class THttpCallback: public NNeh::IOnRecv { + public: + THttpCallback(NRainCheck::THttpFuture* future) + : Future(future) + { + Y_VERIFY(!!future, "future is NULL"); + } + + void OnRecv(NNeh::THandle& handle) override { + THolder<THttpCallback> self(this); + NNeh::TResponseRef response = handle.Get(); + Future->SetDoneAndSchedule(response); + } + + private: + NRainCheck::THttpFuture* const Future; + }; + + THttpFuture::THttpFuture() + : Task(nullptr) + , ErrorCode(THttpFuture::NoError) { } - THttpFuture::~THttpFuture() { + THttpFuture::~THttpFuture() { } - bool THttpFuture::HasError() const { - return (ErrorCode != THttpFuture::NoError); - } + bool THttpFuture::HasError() const { + return (ErrorCode != THttpFuture::NoError); + } - THttpFuture::EError THttpFuture::GetErrorCode() const { - return ErrorCode; - } + THttpFuture::EError THttpFuture::GetErrorCode() const { + return ErrorCode; + } - TString THttpFuture::GetErrorDescription() const { - return ErrorDescription; - } + TString THttpFuture::GetErrorDescription() const { + return ErrorDescription; + } - THttpClientService::THttpClientService() - : GetProtocol(NNeh::ProtocolFactory()->Protocol("http")) - , FullProtocol(NNeh::ProtocolFactory()->Protocol("full")) - { - Y_VERIFY(!!GetProtocol, "GET protocol is NULL."); - Y_VERIFY(!!FullProtocol, "POST protocol is NULL."); - } + THttpClientService::THttpClientService() + : GetProtocol(NNeh::ProtocolFactory()->Protocol("http")) + , FullProtocol(NNeh::ProtocolFactory()->Protocol("full")) + { + Y_VERIFY(!!GetProtocol, "GET protocol is NULL."); + Y_VERIFY(!!FullProtocol, "POST protocol is NULL."); + } - THttpClientService::~THttpClientService() { - } + THttpClientService::~THttpClientService() { + } - void THttpClientService::SendPost(TString addr, const TString& data, const THttpHeaders& headers, THttpFuture* future) { - Y_VERIFY(!!future, "future is NULL."); + void THttpClientService::SendPost(TString addr, const TString& data, const THttpHeaders& headers, THttpFuture* future) { + Y_VERIFY(!!future, "future is NULL."); - TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); - future->SetRunning(current); - future->Task = current; + TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); + future->SetRunning(current); + future->Task = current; - THolder<THttpCallback> callback(new THttpCallback(future)); - NNeh::TServiceStatRef stat; - try { + THolder<THttpCallback> callback(new THttpCallback(future)); + NNeh::TServiceStatRef stat; + try { NNeh::TMessage msg(addr.replace(0, NNeh::TParsedLocation(addr).Scheme.size(), "post"), data); - TStringStream headersText; - headers.OutTo(&headersText); - NNeh::NHttp::MakeFullRequest(msg, headersText.Str(), TString()); - FullProtocol->ScheduleRequest(msg, callback.Get(), stat); + TStringStream headersText; + headers.OutTo(&headersText); + NNeh::NHttp::MakeFullRequest(msg, headersText.Str(), TString()); + FullProtocol->ScheduleRequest(msg, callback.Get(), stat); Y_UNUSED(callback.Release()); - } catch (const TNetworkResolutionError& err) { - future->SetFail(THttpFuture::CantResolveNameError, err.AsStrBuf()); - } catch (const yexception& err) { - future->SetFail(THttpFuture::OtherError, err.AsStrBuf()); - } - } - - void THttpClientService::Send(const TString& request, THttpFuture* future) { - Y_VERIFY(!!future, "future is NULL."); - - TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); - future->SetRunning(current); - future->Task = current; - - THolder<THttpCallback> callback(new THttpCallback(future)); - NNeh::TServiceStatRef stat; - try { - GetProtocol->ScheduleRequest(NNeh::TMessage::FromString(request), - callback.Get(), - stat); + } catch (const TNetworkResolutionError& err) { + future->SetFail(THttpFuture::CantResolveNameError, err.AsStrBuf()); + } catch (const yexception& err) { + future->SetFail(THttpFuture::OtherError, err.AsStrBuf()); + } + } + + void THttpClientService::Send(const TString& request, THttpFuture* future) { + Y_VERIFY(!!future, "future is NULL."); + + TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); + future->SetRunning(current); + future->Task = current; + + THolder<THttpCallback> callback(new THttpCallback(future)); + NNeh::TServiceStatRef stat; + try { + GetProtocol->ScheduleRequest(NNeh::TMessage::FromString(request), + callback.Get(), + stat); Y_UNUSED(callback.Release()); - } catch (const TNetworkResolutionError& err) { - future->SetFail(THttpFuture::CantResolveNameError, err.AsStrBuf()); - } catch (const yexception& err) { - future->SetFail(THttpFuture::OtherError, err.AsStrBuf()); - } - } - - bool THttpFuture::HasHttpCode() const { - return !!HttpCode; + } catch (const TNetworkResolutionError& err) { + future->SetFail(THttpFuture::CantResolveNameError, err.AsStrBuf()); + } catch (const yexception& err) { + future->SetFail(THttpFuture::OtherError, err.AsStrBuf()); + } + } + + bool THttpFuture::HasHttpCode() const { + return !!HttpCode; + } + + bool THttpFuture::HasResponseBody() const { + return !!Response; } - bool THttpFuture::HasResponseBody() const { - return !!Response; - } + ui32 THttpFuture::GetHttpCode() const { + Y_ASSERT(IsDone()); + Y_ASSERT(HasHttpCode()); - ui32 THttpFuture::GetHttpCode() const { - Y_ASSERT(IsDone()); - Y_ASSERT(HasHttpCode()); + return static_cast<ui32>(*HttpCode); + } + + TString THttpFuture::GetResponseBody() const { + Y_ASSERT(IsDone()); + Y_ASSERT(HasResponseBody()); + + return Response->Data; + } - return static_cast<ui32>(*HttpCode); + void THttpFuture::SetDoneAndSchedule(TAutoPtr<NNeh::TResponse> response) { + if (!response->IsError()) { + ErrorCode = THttpFuture::NoError; + HttpCode = HttpCodes::HTTP_OK; + } else { + ErrorCode = THttpFuture::BadHttpCodeError; + ErrorDescription = response->GetErrorText(); + + HttpCode = TryGetHttpCodeFromErrorDescription(ErrorDescription); + } + Response.Reset(response); + SetDone(); } - TString THttpFuture::GetResponseBody() const { - Y_ASSERT(IsDone()); - Y_ASSERT(HasResponseBody()); - - return Response->Data; - } - - void THttpFuture::SetDoneAndSchedule(TAutoPtr<NNeh::TResponse> response) { - if (!response->IsError()) { - ErrorCode = THttpFuture::NoError; - HttpCode = HttpCodes::HTTP_OK; - } else { - ErrorCode = THttpFuture::BadHttpCodeError; - ErrorDescription = response->GetErrorText(); - - HttpCode = TryGetHttpCodeFromErrorDescription(ErrorDescription); - } - Response.Reset(response); - SetDone(); - } - - void THttpFuture::SetFail(THttpFuture::EError errorCode, const TStringBuf& errorDescription) { - ErrorCode = errorCode; - ErrorDescription = errorDescription; - Response.Destroy(); - SetDone(); + void THttpFuture::SetFail(THttpFuture::EError errorCode, const TStringBuf& errorDescription) { + ErrorCode = errorCode; + ErrorDescription = errorDescription; + Response.Destroy(); + SetDone(); } } diff --git a/library/cpp/messagebus/rain_check/http/client.h b/library/cpp/messagebus/rain_check/http/client.h index b7f822ae10..d4199c4c98 100644 --- a/library/cpp/messagebus/rain_check/http/client.h +++ b/library/cpp/messagebus/rain_check/http/client.h @@ -13,66 +13,66 @@ class THttpHeaders; namespace NNeh { - class IProtocol; - struct TResponse; -} + class IProtocol; + struct TResponse; +} namespace NRainCheck { - class THttpCallback; - class THttpClientService; + class THttpCallback; + class THttpClientService; - class THttpFuture: public TSubtaskCompletion { - public: - enum EError { - NoError = 0, + class THttpFuture: public TSubtaskCompletion { + public: + enum EError { + NoError = 0, - CantResolveNameError = 1, - BadHttpCodeError = 2, + CantResolveNameError = 1, + BadHttpCodeError = 2, - OtherError = 100 - }; + OtherError = 100 + }; - private: - friend class THttpCallback; - friend class THttpClientService; + private: + friend class THttpCallback; + friend class THttpClientService; - public: - THttpFuture(); - ~THttpFuture() override; + public: + THttpFuture(); + ~THttpFuture() override; - bool HasHttpCode() const; - bool HasResponseBody() const; + bool HasHttpCode() const; + bool HasResponseBody() const; - ui32 GetHttpCode() const; - TString GetResponseBody() const; + ui32 GetHttpCode() const; + TString GetResponseBody() const; - bool HasError() const; - EError GetErrorCode() const; - TString GetErrorDescription() const; + bool HasError() const; + EError GetErrorCode() const; + TString GetErrorDescription() const; - private: - void SetDoneAndSchedule(TAutoPtr<NNeh::TResponse> response); - void SetFail(EError errorCode, const TStringBuf& errorDescription); + private: + void SetDoneAndSchedule(TAutoPtr<NNeh::TResponse> response); + void SetFail(EError errorCode, const TStringBuf& errorDescription); - private: - TTaskRunnerBase* Task; - TMaybe<HttpCodes> HttpCode; - THolder<NNeh::TResponse> Response; - EError ErrorCode; - TString ErrorDescription; - }; + private: + TTaskRunnerBase* Task; + TMaybe<HttpCodes> HttpCode; + THolder<NNeh::TResponse> Response; + EError ErrorCode; + TString ErrorDescription; + }; - class THttpClientService { - public: - THttpClientService(); - virtual ~THttpClientService(); + class THttpClientService { + public: + THttpClientService(); + virtual ~THttpClientService(); - void Send(const TString& request, THttpFuture* future); - void SendPost(TString addr, const TString& data, const THttpHeaders& headers, THttpFuture* future); + void Send(const TString& request, THttpFuture* future); + void SendPost(TString addr, const TString& data, const THttpHeaders& headers, THttpFuture* future); - private: - NNeh::IProtocol* const GetProtocol; - NNeh::IProtocol* const FullProtocol; - }; + private: + NNeh::IProtocol* const GetProtocol; + NNeh::IProtocol* const FullProtocol; + }; -} +} diff --git a/library/cpp/messagebus/rain_check/http/client_ut.cpp b/library/cpp/messagebus/rain_check/http/client_ut.cpp index 51d0296a37..1628114391 100644 --- a/library/cpp/messagebus/rain_check/http/client_ut.cpp +++ b/library/cpp/messagebus/rain_check/http/client_ut.cpp @@ -28,116 +28,116 @@ using namespace NRainCheck; using namespace NBus::NTest; namespace { - class THttpClientEnv: public TTestEnvTemplate<THttpClientEnv> { - public: - THttpClientService HttpClientService; - }; - - const TString TEST_SERVICE = "test-service"; - const TString TEST_GET_PARAMS = "p=GET"; - const TString TEST_POST_PARAMS = "p=POST"; - const TString TEST_POST_HEADERS = "Content-Type: application/json\r\n"; - const TString TEST_GET_RECV = "GET was ok."; - const TString TEST_POST_RECV = "POST was ok."; - - TString BuildServiceLocation(ui32 port) { + class THttpClientEnv: public TTestEnvTemplate<THttpClientEnv> { + public: + THttpClientService HttpClientService; + }; + + const TString TEST_SERVICE = "test-service"; + const TString TEST_GET_PARAMS = "p=GET"; + const TString TEST_POST_PARAMS = "p=POST"; + const TString TEST_POST_HEADERS = "Content-Type: application/json\r\n"; + const TString TEST_GET_RECV = "GET was ok."; + const TString TEST_POST_RECV = "POST was ok."; + + TString BuildServiceLocation(ui32 port) { return Sprintf("http://*:%" PRIu32 "/%s", port, TEST_SERVICE.data()); - } + } - TString BuildPostServiceLocation(ui32 port) { + TString BuildPostServiceLocation(ui32 port) { return Sprintf("post://*:%" PRIu32 "/%s", port + 1, TEST_SERVICE.data()); - } - - TString BuildGetTestRequest(ui32 port) { - return BuildServiceLocation(port) + "?" + TEST_GET_PARAMS; - } - - class TSimpleServer { - public: - inline void ServeRequest(const NNeh::IRequestRef& req) { - NNeh::TData response; - if (req->Data() == TEST_GET_PARAMS) { - response.assign(TEST_GET_RECV.begin(), TEST_GET_RECV.end()); - } else { - response.assign(TEST_POST_RECV.begin(), TEST_POST_RECV.end()); - } - req->SendReply(response); - } - }; - - NNeh::IServicesRef RunServer(ui32 port, TSimpleServer& server) { - NNeh::IServicesRef runner = NNeh::CreateLoop(); - runner->Add(BuildServiceLocation(port), server); - runner->Add(BuildPostServiceLocation(port), server); - - try { - const int THR_POOL_SIZE = 2; - runner->ForkLoop(THR_POOL_SIZE); - } catch (...) { + } + + TString BuildGetTestRequest(ui32 port) { + return BuildServiceLocation(port) + "?" + TEST_GET_PARAMS; + } + + class TSimpleServer { + public: + inline void ServeRequest(const NNeh::IRequestRef& req) { + NNeh::TData response; + if (req->Data() == TEST_GET_PARAMS) { + response.assign(TEST_GET_RECV.begin(), TEST_GET_RECV.end()); + } else { + response.assign(TEST_POST_RECV.begin(), TEST_POST_RECV.end()); + } + req->SendReply(response); + } + }; + + NNeh::IServicesRef RunServer(ui32 port, TSimpleServer& server) { + NNeh::IServicesRef runner = NNeh::CreateLoop(); + runner->Add(BuildServiceLocation(port), server); + runner->Add(BuildPostServiceLocation(port), server); + + try { + const int THR_POOL_SIZE = 2; + runner->ForkLoop(THR_POOL_SIZE); + } catch (...) { Y_FAIL("Can't run server: %s", CurrentExceptionMessage().data()); } - return runner; + return runner; } - enum ERequestType { - RT_HTTP_GET = 0, - RT_HTTP_POST = 1 - }; - - using TTaskParam = std::pair<TIpPort, ERequestType>; - - class THttpClientTask: public ISimpleTask { - public: - THttpClientTask(THttpClientEnv* env, TTaskParam param) - : Env(env) - , ServerPort(param.first) - , ReqType(param.second) - { - } - - TContinueFunc Start() override { - switch (ReqType) { - case RT_HTTP_GET: { - TString getRequest = BuildGetTestRequest(ServerPort); - for (size_t i = 0; i < 3; ++i) { - Requests.push_back(new THttpFuture()); - Env->HttpClientService.Send(getRequest, Requests[i].Get()); - } - break; + enum ERequestType { + RT_HTTP_GET = 0, + RT_HTTP_POST = 1 + }; + + using TTaskParam = std::pair<TIpPort, ERequestType>; + + class THttpClientTask: public ISimpleTask { + public: + THttpClientTask(THttpClientEnv* env, TTaskParam param) + : Env(env) + , ServerPort(param.first) + , ReqType(param.second) + { + } + + TContinueFunc Start() override { + switch (ReqType) { + case RT_HTTP_GET: { + TString getRequest = BuildGetTestRequest(ServerPort); + for (size_t i = 0; i < 3; ++i) { + Requests.push_back(new THttpFuture()); + Env->HttpClientService.Send(getRequest, Requests[i].Get()); + } + break; } - case RT_HTTP_POST: { - TString servicePath = BuildPostServiceLocation(ServerPort); - TStringInput headersText(TEST_POST_HEADERS); - THttpHeaders headers(&headersText); - for (size_t i = 0; i < 3; ++i) { - Requests.push_back(new THttpFuture()); - Env->HttpClientService.SendPost(servicePath, TEST_POST_PARAMS, headers, Requests[i].Get()); - } - break; + case RT_HTTP_POST: { + TString servicePath = BuildPostServiceLocation(ServerPort); + TStringInput headersText(TEST_POST_HEADERS); + THttpHeaders headers(&headersText); + for (size_t i = 0; i < 3; ++i) { + Requests.push_back(new THttpFuture()); + Env->HttpClientService.SendPost(servicePath, TEST_POST_PARAMS, headers, Requests[i].Get()); + } + break; } } - - return &THttpClientTask::GotReplies; + + return &THttpClientTask::GotReplies; } - TContinueFunc GotReplies() { - const TString& TEST_OK_RECV = (ReqType == RT_HTTP_GET) ? TEST_GET_RECV : TEST_POST_RECV; - for (size_t i = 0; i < Requests.size(); ++i) { - UNIT_ASSERT_EQUAL(Requests[i]->GetHttpCode(), 200); - UNIT_ASSERT_EQUAL(Requests[i]->GetResponseBody(), TEST_OK_RECV); - } + TContinueFunc GotReplies() { + const TString& TEST_OK_RECV = (ReqType == RT_HTTP_GET) ? TEST_GET_RECV : TEST_POST_RECV; + for (size_t i = 0; i < Requests.size(); ++i) { + UNIT_ASSERT_EQUAL(Requests[i]->GetHttpCode(), 200); + UNIT_ASSERT_EQUAL(Requests[i]->GetResponseBody(), TEST_OK_RECV); + } + + Env->TestSync.CheckAndIncrement(0); - Env->TestSync.CheckAndIncrement(0); - - return nullptr; + return nullptr; } - THttpClientEnv* const Env; - const TIpPort ServerPort; - const ERequestType ReqType; + THttpClientEnv* const Env; + const TIpPort ServerPort; + const ERequestType ReqType; - TVector<TSimpleSharedPtr<THttpFuture>> Requests; - }; + TVector<TSimpleSharedPtr<THttpFuture>> Requests; + }; } // anonymous namespace @@ -175,12 +175,12 @@ Y_UNIT_TEST_SUITE(RainCheckHttpClient) { } Y_UNIT_TEST(HttpCodeExtraction) { - // Find "request failed(" string, then copy len("HTTP/1.X NNN") chars and try to convert NNN to HTTP code. + // Find "request failed(" string, then copy len("HTTP/1.X NNN") chars and try to convert NNN to HTTP code. -#define CHECK_VALID_LINE(line, code) \ - UNIT_ASSERT_NO_EXCEPTION(TryGetHttpCodeFromErrorDescription(line)); \ - UNIT_ASSERT(!!TryGetHttpCodeFromErrorDescription(line)); \ - UNIT_ASSERT_EQUAL(*TryGetHttpCodeFromErrorDescription(line), code) +#define CHECK_VALID_LINE(line, code) \ + UNIT_ASSERT_NO_EXCEPTION(TryGetHttpCodeFromErrorDescription(line)); \ + UNIT_ASSERT(!!TryGetHttpCodeFromErrorDescription(line)); \ + UNIT_ASSERT_EQUAL(*TryGetHttpCodeFromErrorDescription(line), code) CHECK_VALID_LINE(TStringBuf("library/cpp/neh/http.cpp:<LINE>: request failed(HTTP/1.0 200 Some random message"), 200); CHECK_VALID_LINE(TStringBuf("library/cpp/neh/http.cpp:<LINE>: request failed(HTTP/1.0 404 Some random message"), 404); @@ -189,9 +189,9 @@ Y_UNIT_TEST_SUITE(RainCheckHttpClient) { CHECK_VALID_LINE(TStringBuf("request failed(HTTP/1.1 2004 Some random message"), 200); #undef CHECK_VALID_LINE -#define CHECK_INVALID_LINE(line) \ - UNIT_ASSERT_NO_EXCEPTION(TryGetHttpCodeFromErrorDescription(line)); \ - UNIT_ASSERT(!TryGetHttpCodeFromErrorDescription(line)) +#define CHECK_INVALID_LINE(line) \ + UNIT_ASSERT_NO_EXCEPTION(TryGetHttpCodeFromErrorDescription(line)); \ + UNIT_ASSERT(!TryGetHttpCodeFromErrorDescription(line)) CHECK_INVALID_LINE(TStringBuf("library/cpp/neh/http.cpp:<LINE>: request failed(HTTP/1.1 1 Some random message")); CHECK_INVALID_LINE(TStringBuf("request failed(HTTP/1.0 asdf Some random message")); diff --git a/library/cpp/messagebus/rain_check/http/http_code_extractor.cpp b/library/cpp/messagebus/rain_check/http/http_code_extractor.cpp index b0925c5a6b..51d75762f6 100644 --- a/library/cpp/messagebus/rain_check/http/http_code_extractor.cpp +++ b/library/cpp/messagebus/rain_check/http/http_code_extractor.cpp @@ -8,32 +8,32 @@ #include <util/string/cast.h> namespace NRainCheck { - TMaybe<HttpCodes> TryGetHttpCodeFromErrorDescription(const TStringBuf& errorMessage) { + TMaybe<HttpCodes> TryGetHttpCodeFromErrorDescription(const TStringBuf& errorMessage) { // Try to get HttpCode from library/cpp/neh response. // If response has HttpCode and it is not 200 OK, library/cpp/neh will send a message // "library/cpp/neh/http.cpp:<LINE>: request failed(<FIRST-HTTP-RESPONSE-LINE>)" // (see library/cpp/neh/http.cpp:625). So, we will try to parse this message and - // find out HttpCode in it. It is bad temporary solution, but we have no choice. + // find out HttpCode in it. It is bad temporary solution, but we have no choice. const TStringBuf SUBSTR = "request failed("; - const size_t SUBSTR_LEN = SUBSTR.size(); + const size_t SUBSTR_LEN = SUBSTR.size(); const size_t FIRST_LINE_LEN = TStringBuf("HTTP/1.X NNN").size(); - TMaybe<HttpCodes> httpCode; + TMaybe<HttpCodes> httpCode; - const size_t substrPos = errorMessage.find(SUBSTR); - if (substrPos != TStringBuf::npos) { - const TStringBuf firstLineStart = errorMessage.SubStr(substrPos + SUBSTR_LEN, FIRST_LINE_LEN); - try { - httpCode = static_cast<HttpCodes>(ParseHttpRetCode(firstLineStart)); - if (*httpCode < HTTP_CONTINUE || *httpCode >= HTTP_CODE_MAX) { - httpCode = Nothing(); - } - } catch (const TFromStringException& ex) { - // Can't parse HttpCode: it is OK, because ErrorDescription can be random string. + const size_t substrPos = errorMessage.find(SUBSTR); + if (substrPos != TStringBuf::npos) { + const TStringBuf firstLineStart = errorMessage.SubStr(substrPos + SUBSTR_LEN, FIRST_LINE_LEN); + try { + httpCode = static_cast<HttpCodes>(ParseHttpRetCode(firstLineStart)); + if (*httpCode < HTTP_CONTINUE || *httpCode >= HTTP_CODE_MAX) { + httpCode = Nothing(); + } + } catch (const TFromStringException& ex) { + // Can't parse HttpCode: it is OK, because ErrorDescription can be random string. } } - - return httpCode; + + return httpCode; } } diff --git a/library/cpp/messagebus/rain_check/http/http_code_extractor.h b/library/cpp/messagebus/rain_check/http/http_code_extractor.h index 322fe24fa0..33b565fa1c 100644 --- a/library/cpp/messagebus/rain_check/http/http_code_extractor.h +++ b/library/cpp/messagebus/rain_check/http/http_code_extractor.h @@ -10,7 +10,7 @@ namespace NRainCheck { // If response has HttpCode and it is not 200 OK, library/cpp/neh will send a message // "library/cpp/neh/http.cpp:<LINE>: request failed(<FIRST-HTTP-RESPONSE-LINE>)" // (see library/cpp/neh/http.cpp:625). So, we will try to parse this message and - // find out HttpCode in it. It is bad temporary solution, but we have no choice. - TMaybe<HttpCodes> TryGetHttpCodeFromErrorDescription(const TStringBuf& errorMessage); + // find out HttpCode in it. It is bad temporary solution, but we have no choice. + TMaybe<HttpCodes> TryGetHttpCodeFromErrorDescription(const TStringBuf& errorMessage); -} +} diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp index d5d20ab2d6..daac8d9a99 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.cpp @@ -4,17 +4,17 @@ using namespace NRainCheck; using namespace NBus; TBusClientService::TBusClientService( - const NBus::TBusSessionConfig& config, - NBus::TBusProtocol* proto, - NBus::TBusMessageQueue* queue) { + const NBus::TBusSessionConfig& config, + NBus::TBusProtocol* proto, + NBus::TBusMessageQueue* queue) { Session = queue->CreateSource(proto, this, config); } -TBusClientService::~TBusClientService() { +TBusClientService::~TBusClientService() { Session->Shutdown(); } -void TBusClientService::SendCommon(NBus::TBusMessage* message, const NBus::TNetAddr&, TBusFuture* future) { +void TBusClientService::SendCommon(NBus::TBusMessage* message, const NBus::TNetAddr&, TBusFuture* future) { TTaskRunnerBase* current = TTaskRunnerBase::CurrentTask(); future->SetRunning(current); @@ -29,8 +29,8 @@ void TBusClientService::SendCommon(NBus::TBusMessage* message, const NBus::TNetA } void TBusClientService::ProcessResultCommon(NBus::TBusMessageAutoPtr message, - const NBus::TNetAddr&, TBusFuture* future, - NBus::EMessageStatus status) { + const NBus::TNetAddr&, TBusFuture* future, + NBus::EMessageStatus status) { Y_UNUSED(message.Release()); if (status == NBus::MESSAGE_OK) { @@ -41,8 +41,8 @@ void TBusClientService::ProcessResultCommon(NBus::TBusMessageAutoPtr message, } void TBusClientService::SendOneWay( - NBus::TBusMessageAutoPtr message, const NBus::TNetAddr& addr, - TBusFuture* future) { + NBus::TBusMessageAutoPtr message, const NBus::TNetAddr& addr, + TBusFuture* future) { SendCommon(message.Get(), addr, future); EMessageStatus ok = Session->SendMessageOneWay(message.Get(), &addr, false); @@ -54,8 +54,8 @@ NBus::TBusClientSessionPtr TBusClientService::GetSessionForMonitoring() const { } void TBusClientService::Send( - TBusMessageAutoPtr message, const TNetAddr& addr, - TBusFuture* future) { + TBusMessageAutoPtr message, const TNetAddr& addr, + TBusFuture* future) { SendCommon(message.Get(), addr, future); EMessageStatus ok = Session->SendMessage(message.Get(), &addr, false); @@ -63,35 +63,35 @@ void TBusClientService::Send( } void TBusClientService::OnReply( - TAutoPtr<TBusMessage> request, - TAutoPtr<TBusMessage> response) { - TBusFuture* future = (TBusFuture*)request->Data; + TAutoPtr<TBusMessage> request, + TAutoPtr<TBusMessage> response) { + TBusFuture* future = (TBusFuture*)request->Data; Y_ASSERT(future->Request.Get() == request.Get()); Y_UNUSED(request.Release()); future->SetDoneAndSchedule(MESSAGE_OK, response); } void NRainCheck::TBusClientService::OnMessageSentOneWay( - TAutoPtr<NBus::TBusMessage> request) { - TBusFuture* future = (TBusFuture*)request->Data; + TAutoPtr<NBus::TBusMessage> request) { + TBusFuture* future = (TBusFuture*)request->Data; Y_ASSERT(future->Request.Get() == request.Get()); Y_UNUSED(request.Release()); future->SetDoneAndSchedule(MESSAGE_OK, nullptr); } void TBusClientService::OnError( - TAutoPtr<TBusMessage> message, NBus::EMessageStatus status) { + TAutoPtr<TBusMessage> message, NBus::EMessageStatus status) { if (message->Data == nullptr) { return; } - TBusFuture* future = (TBusFuture*)message->Data; + TBusFuture* future = (TBusFuture*)message->Data; Y_ASSERT(future->Request.Get() == message.Get()); Y_UNUSED(message.Release()); future->SetDoneAndSchedule(status, nullptr); } -void TBusFuture::SetDoneAndSchedule(EMessageStatus status, TAutoPtr<TBusMessage> response) { +void TBusFuture::SetDoneAndSchedule(EMessageStatus status, TAutoPtr<TBusMessage> response) { Status = status; Response.Reset(response.Release()); SetDone(); diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h index b47ec13408..0a291cdea6 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_client.h @@ -7,24 +7,24 @@ namespace NRainCheck { class TBusFuture: public TSubtaskCompletion { friend class TBusClientService; - + private: THolder<NBus::TBusMessage> Request; THolder<NBus::TBusMessage> Response; NBus::EMessageStatus Status; - + private: TTaskRunnerBase* Task; void SetDoneAndSchedule(NBus::EMessageStatus, TAutoPtr<NBus::TBusMessage>); - + public: // TODO: add MESSAGE_UNDEFINED - TBusFuture() - : Status(NBus::MESSAGE_DONT_ASK) - , Task(nullptr) - { - } + TBusFuture() + : Status(NBus::MESSAGE_DONT_ASK) + , Task(nullptr) + { + } NBus::TBusMessage* GetRequest() const { return Request.Get(); @@ -44,7 +44,7 @@ namespace NRainCheck { class TBusClientService: private NBus::IBusClientHandler { private: NBus::TBusClientSessionPtr Session; - + public: TBusClientService(const NBus::TBusSessionConfig&, NBus::TBusProtocol*, NBus::TBusMessageQueue*); ~TBusClientService() override; diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp index e63ba86be7..1b3618558b 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_client_ut.cpp @@ -28,8 +28,8 @@ struct TMessageBusClientEnv: public TTestEnvTemplate<TMessageBusClientEnv> { TMessageBusClientEnv() : Queue(CreateMessageQueue(GetExecutor())) , BusClientService(TBusSessionConfig(), &Proto, Queue.Get()) - { - } + { + } }; Y_UNIT_TEST_SUITE(RainCheckMessageBusClient) { @@ -44,7 +44,7 @@ Y_UNIT_TEST_SUITE(RainCheckMessageBusClient) { { } - TVector<TSimpleSharedPtr<TBusFuture>> Requests; + TVector<TSimpleSharedPtr<TBusFuture>> Requests; TContinueFunc Start() override { for (unsigned i = 0; i < 3; ++i) { @@ -108,7 +108,7 @@ Y_UNIT_TEST_SUITE(RainCheckMessageBusClient) { { } - TVector<TSimpleSharedPtr<TBusFuture>> Requests; + TVector<TSimpleSharedPtr<TBusFuture>> Requests; TContinueFunc Start() override { Env->TestSync.CheckAndIncrement(0); diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp index 1868cfa06e..5d4b13d664 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.cpp @@ -9,9 +9,9 @@ TBusTaskStarter::TBusTaskStarter(TAutoPtr<ITaskFactory> taskFactory) { } -void TBusTaskStarter::OnMessage(NBus::TOnMessageContext& onMessage) { +void TBusTaskStarter::OnMessage(NBus::TOnMessageContext& onMessage) { TaskFactory->NewTask(onMessage); } -TBusTaskStarter::~TBusTaskStarter() { +TBusTaskStarter::~TBusTaskStarter() { } diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h index 10fc8b0dc7..1334f05fe4 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h @@ -12,14 +12,14 @@ namespace NRainCheck { private: struct ITaskFactory { virtual void NewTask(NBus::TOnMessageContext&) = 0; - virtual ~ITaskFactory() { - } + virtual ~ITaskFactory() { + } }; THolder<ITaskFactory> TaskFactory; void OnMessage(NBus::TOnMessageContext&) override; - + public: TBusTaskStarter(TAutoPtr<ITaskFactory>); ~TBusTaskStarter() override; @@ -30,10 +30,10 @@ namespace NRainCheck { struct TTaskFactory: public ITaskFactory { TEnv* const Env; - TTaskFactory(TEnv* env) - : Env(env) - { - } + TTaskFactory(TEnv* env) + : Env(env) + { + } void NewTask(NBus::TOnMessageContext& context) override { SpawnTask<TTask, TEnv, NBus::TOnMessageContext&>(Env, context); diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp b/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp index 9b3bd58ad7..7c11399f1b 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server_ut.cpp @@ -19,7 +19,7 @@ Y_UNIT_TEST_SUITE(RainCheckMessageBusServer) { private: TMessageBusServerEnv* const Env; TOnMessageContext MessageContext; - + public: TSimpleServerTask(TMessageBusServerEnv* env, TOnMessageContext& messageContext) : Env(env) diff --git a/library/cpp/messagebus/rain_check/test/helper/misc.cpp b/library/cpp/messagebus/rain_check/test/helper/misc.cpp index b176878cab..c0fcb27252 100644 --- a/library/cpp/messagebus/rain_check/test/helper/misc.cpp +++ b/library/cpp/messagebus/rain_check/test/helper/misc.cpp @@ -4,7 +4,7 @@ using namespace NRainCheck; -void TSpawnNopTasksCoroTask::Run() { +void TSpawnNopTasksCoroTask::Run() { Y_VERIFY(Count <= Completion.size()); for (unsigned i = 0; i < Count; ++i) { SpawnSubtask<TNopCoroTask>(Env, &Completion[i], ""); @@ -13,7 +13,7 @@ void TSpawnNopTasksCoroTask::Run() { WaitForSubtasks(); } -TContinueFunc TSpawnNopTasksSimpleTask::Start() { +TContinueFunc TSpawnNopTasksSimpleTask::Start() { Y_VERIFY(Count <= Completion.size()); for (unsigned i = 0; i < Count; ++i) { SpawnSubtask<TNopSimpleTask>(Env, &Completion[i], ""); @@ -22,6 +22,6 @@ TContinueFunc TSpawnNopTasksSimpleTask::Start() { return &TSpawnNopTasksSimpleTask::Join; } -TContinueFunc TSpawnNopTasksSimpleTask::Join() { +TContinueFunc TSpawnNopTasksSimpleTask::Join() { return nullptr; } diff --git a/library/cpp/messagebus/rain_check/test/helper/misc.h b/library/cpp/messagebus/rain_check/test/helper/misc.h index 5d8648462d..9150be4d2f 100644 --- a/library/cpp/messagebus/rain_check/test/helper/misc.h +++ b/library/cpp/messagebus/rain_check/test/helper/misc.h @@ -6,8 +6,8 @@ namespace NRainCheck { struct TNopSimpleTask: public ISimpleTask { - TNopSimpleTask(IEnv*, const void*) { - } + TNopSimpleTask(IEnv*, const void*) { + } TContinueFunc Start() override { return nullptr; @@ -15,11 +15,11 @@ namespace NRainCheck { }; struct TNopCoroTask: public ICoroTask { - TNopCoroTask(IEnv*, const void*) { - } + TNopCoroTask(IEnv*, const void*) { + } - void Run() override { - } + void Run() override { + } }; struct TSpawnNopTasksCoroTask: public ICoroTask { @@ -29,8 +29,8 @@ namespace NRainCheck { TSpawnNopTasksCoroTask(IEnv* env, unsigned count) : Env(env) , Count(count) - { - } + { + } std::array<TSubtaskCompletion, 2> Completion; @@ -44,8 +44,8 @@ namespace NRainCheck { TSpawnNopTasksSimpleTask(IEnv* env, unsigned count) : Env(env) , Count(count) - { - } + { + } std::array<TSubtaskCompletion, 2> Completion; diff --git a/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp b/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp index c71499e84a..22edbd8c6b 100644 --- a/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/rain_check/test/perftest/perftest.cpp @@ -16,17 +16,17 @@ struct TRainCheckPerftestEnv: public TSimpleEnvTemplate<TRainCheckPerftestEnv> { TRainCheckPerftestEnv() : TSimpleEnvTemplate<TRainCheckPerftestEnv>(4) , SubtasksPerTask(1000) - { - } + { + } }; struct TCoroOuter: public ICoroTask { TRainCheckPerftestEnv* const Env; - TCoroOuter(TRainCheckPerftestEnv* env) - : Env(env) - { - } + TCoroOuter(TRainCheckPerftestEnv* env) + : Env(env) + { + } void Run() override { for (;;) { @@ -63,10 +63,10 @@ struct TCoroOuter: public ICoroTask { struct TSimpleOuter: public ISimpleTask { TRainCheckPerftestEnv* const Env; - TSimpleOuter(TRainCheckPerftestEnv* env, const void*) - : Env(env) - { - } + TSimpleOuter(TRainCheckPerftestEnv* env, const void*) + : Env(env) + { + } TInstant StartInstant; unsigned Count; @@ -119,10 +119,10 @@ struct TSimpleOuter: public ISimpleTask { struct TReproduceCrashTask: public ISimpleTask { TRainCheckPerftestEnv* const Env; - TReproduceCrashTask(TRainCheckPerftestEnv* env) - : Env(env) - { - } + TReproduceCrashTask(TRainCheckPerftestEnv* env) + : Env(env) + { + } std::array<TSubtaskCompletion, SUBTASKS> Completion; |