diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/rain_check/core/task.cpp | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/rain_check/core/task.cpp')
-rw-r--r-- | library/cpp/messagebus/rain_check/core/task.cpp | 342 |
1 files changed, 171 insertions, 171 deletions
diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp index a098437d53..d20ae30402 100644 --- a/library/cpp/messagebus/rain_check/core/task.cpp +++ b/library/cpp/messagebus/rain_check/core/task.cpp @@ -1,216 +1,216 @@ #include "rain_check.h" - + #include <library/cpp/messagebus/actor/temp_tls_vector.h> - + #include <util/system/type_name.h> #include <util/system/tls.h> - -using namespace NRainCheck; -using namespace NRainCheck::NPrivate; - -using namespace NActor; - -namespace { + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; + +using namespace NActor; + +namespace { Y_POD_STATIC_THREAD(TTaskRunnerBase*) ThreadCurrentTask; -} - +} + void TNopSubtaskListener::SetDone() { } - -TNopSubtaskListener TNopSubtaskListener::Instance; - -TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl) - : TActor<TTaskRunnerBase>(env->GetExecutor()) - , Impl(impl) - , ParentTask(parentTask) - //, HoldsSelfReference(false) - , Done(false) - , SetDoneCalled(false) -{ -} - -TTaskRunnerBase::~TTaskRunnerBase() { + +TNopSubtaskListener TNopSubtaskListener::Instance; + +TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl) + : TActor<TTaskRunnerBase>(env->GetExecutor()) + , Impl(impl) + , ParentTask(parentTask) + //, HoldsSelfReference(false) + , Done(false) + , SetDoneCalled(false) +{ +} + +TTaskRunnerBase::~TTaskRunnerBase() { Y_ASSERT(Done); -} - -namespace { - struct TRunningInThisThreadGuard { - TTaskRunnerBase* const Task; - TRunningInThisThreadGuard(TTaskRunnerBase* task) - : Task(task) - { +} + +namespace { + struct TRunningInThisThreadGuard { + TTaskRunnerBase* const Task; + TRunningInThisThreadGuard(TTaskRunnerBase* task) + : Task(task) + { Y_ASSERT(!ThreadCurrentTask); - ThreadCurrentTask = task; - } - - ~TRunningInThisThreadGuard() { + ThreadCurrentTask = task; + } + + ~TRunningInThisThreadGuard() { Y_ASSERT(ThreadCurrentTask == Task); ThreadCurrentTask = nullptr; - } - }; -} - + } + }; +} + void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) { Y_ASSERT(RefCount() > 0); - - TRunningInThisThreadGuard g(this); - - //RetainRef(); - - for (;;) { - TTempTlsVector<TSubtaskCompletion*> temp; - - temp.GetVector()->swap(Pending); - + + TRunningInThisThreadGuard g(this); + + //RetainRef(); + + for (;;) { + TTempTlsVector<TSubtaskCompletion*> temp; + + temp.GetVector()->swap(Pending); + for (auto& pending : *temp.GetVector()) { if (pending->IsComplete()) { pending->FireCompletionCallback(GetImplBase()); - } else { + } else { Pending.push_back(pending); - } - } - - if (!Pending.empty()) { - return; - } - - if (!Done) { - Done = !ReplyReceived(); - } else { - if (Pending.empty()) { - if (!SetDoneCalled) { - ParentTask->SetDone(); - SetDoneCalled = true; - } - //ReleaseRef(); - return; - } - } - } -} - + } + } + + if (!Pending.empty()) { + return; + } + + if (!Done) { + Done = !ReplyReceived(); + } else { + if (Pending.empty()) { + if (!SetDoneCalled) { + ParentTask->SetDone(); + SetDoneCalled = true; + } + //ReleaseRef(); + return; + } + } + } +} + bool TTaskRunnerBase::IsRunningInThisThread() const { - return ThreadCurrentTask == this; -} - + return ThreadCurrentTask == this; +} + TSubtaskCompletion::~TSubtaskCompletion() { - ESubtaskState state = State.Get(); + ESubtaskState state = State.Get(); Y_ASSERT(state == CREATED || state == DONE || state == CANCELED); -} - +} + void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) { Y_ASSERT(IsComplete()); - - if (!!CompletionFunc) { - TSubtaskCompletionFunc temp = CompletionFunc; - // completion func must be reset before calling it, - // because function may set it back - CompletionFunc = TSubtaskCompletionFunc(); - (task->*(temp.Func))(this); - } -} - + + if (!!CompletionFunc) { + TSubtaskCompletionFunc temp = CompletionFunc; + // completion func must be reset before calling it, + // because function may set it back + CompletionFunc = TSubtaskCompletionFunc(); + (task->*(temp.Func))(this); + } +} + void NRainCheck::TSubtaskCompletion::Cancel() { - for (;;) { - ESubtaskState state = State.Get(); - if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) { - return; - } - if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) { - return; - } - if (state == DONE && State.CompareAndSet(DONE, CANCELED)) { - return; - } - if (state == CANCEL_REQUESTED || state == CANCELED) { - return; - } - } -} - + for (;;) { + ESubtaskState state = State.Get(); + if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) { + return; + } + if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) { + return; + } + if (state == DONE && State.CompareAndSet(DONE, CANCELED)) { + return; + } + if (state == CANCEL_REQUESTED || state == CANCELED) { + return; + } + } +} + void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) { Y_ASSERT(!TaskRunner); Y_ASSERT(!!parent); - - TaskRunner = parent; - - parent->Pending.push_back(this); - - parent->RefV(); - - for (;;) { - ESubtaskState current = State.Get(); - if (current != CREATED && current != DONE) { + + TaskRunner = parent; + + parent->Pending.push_back(this); + + parent->RefV(); + + for (;;) { + ESubtaskState current = State.Get(); + if (current != CREATED && current != DONE) { Y_FAIL("current state should be CREATED or DONE: %s", ToCString(current)); - } - if (State.CompareAndSet(current, RUNNING)) { - return; - } - } -} - + } + if (State.CompareAndSet(current, RUNNING)) { + return; + } + } +} + void TSubtaskCompletion::SetDone() { Y_ASSERT(!!TaskRunner); - TTaskRunnerBase* temp = TaskRunner; + TTaskRunnerBase* temp = TaskRunner; TaskRunner = nullptr; - - for (;;) { - ESubtaskState state = State.Get(); - if (state == RUNNING) { - if (State.CompareAndSet(RUNNING, DONE)) { - break; - } - } else if (state == CANCEL_REQUESTED) { - if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) { - break; - } - } else { + + for (;;) { + ESubtaskState state = State.Get(); + if (state == RUNNING) { + if (State.CompareAndSet(RUNNING, DONE)) { + break; + } + } else if (state == CANCEL_REQUESTED) { + if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) { + break; + } + } else { Y_FAIL("cannot SetDone: unknown state: %s", ToCString(state)); - } - } - - temp->ScheduleV(); - temp->UnRefV(); -} - -#if 0 -void NRainCheck::TTaskRunnerBase::RetainRef() -{ - if (HoldsSelfReference) { - return; - } - HoldsSelfReference = true; - Ref(); -} - -void NRainCheck::TTaskRunnerBase::ReleaseRef() -{ - if (!HoldsSelfReference) { - return; - } - HoldsSelfReference = false; - DecRef(); -} -#endif - + } + } + + temp->ScheduleV(); + temp->UnRefV(); +} + +#if 0 +void NRainCheck::TTaskRunnerBase::RetainRef() +{ + if (HoldsSelfReference) { + return; + } + HoldsSelfReference = true; + Ref(); +} + +void NRainCheck::TTaskRunnerBase::ReleaseRef() +{ + if (!HoldsSelfReference) { + return; + } + HoldsSelfReference = false; + DecRef(); +} +#endif + void TTaskRunnerBase::AssertInThisThread() const { Y_ASSERT(IsRunningInThisThread()); -} - +} + TTaskRunnerBase* TTaskRunnerBase::CurrentTask() { Y_VERIFY(!!ThreadCurrentTask); - return ThreadCurrentTask; -} - + return ThreadCurrentTask; +} + ITaskBase* TTaskRunnerBase::CurrentTaskImpl() { return CurrentTask()->GetImplBase(); } TString TTaskRunnerBase::GetStatusSingleLine() { return TypeName(*Impl); -} - +} + bool NRainCheck::AreWeInsideTask() { return ThreadCurrentTask != nullptr; } |