diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/rain_check/core/task.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/rain_check/core/task.cpp')
-rw-r--r-- | library/cpp/messagebus/rain_check/core/task.cpp | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp new file mode 100644 index 0000000000..a098437d53 --- /dev/null +++ b/library/cpp/messagebus/rain_check/core/task.cpp @@ -0,0 +1,216 @@ +#include "rain_check.h" + +#include <library/cpp/messagebus/actor/temp_tls_vector.h> + +#include <util/system/type_name.h> +#include <util/system/tls.h> + +using namespace NRainCheck; +using namespace NRainCheck::NPrivate; + +using namespace NActor; + +namespace { + Y_POD_STATIC_THREAD(TTaskRunnerBase*) + ThreadCurrentTask; +} + +void TNopSubtaskListener::SetDone() { +} + +TNopSubtaskListener TNopSubtaskListener::Instance; + +TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl) + : TActor<TTaskRunnerBase>(env->GetExecutor()) + , Impl(impl) + , ParentTask(parentTask) + //, HoldsSelfReference(false) + , Done(false) + , SetDoneCalled(false) +{ +} + +TTaskRunnerBase::~TTaskRunnerBase() { + Y_ASSERT(Done); +} + +namespace { + struct TRunningInThisThreadGuard { + TTaskRunnerBase* const Task; + TRunningInThisThreadGuard(TTaskRunnerBase* task) + : Task(task) + { + Y_ASSERT(!ThreadCurrentTask); + ThreadCurrentTask = task; + } + + ~TRunningInThisThreadGuard() { + Y_ASSERT(ThreadCurrentTask == Task); + ThreadCurrentTask = nullptr; + } + }; +} + +void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) { + Y_ASSERT(RefCount() > 0); + + TRunningInThisThreadGuard g(this); + + //RetainRef(); + + for (;;) { + TTempTlsVector<TSubtaskCompletion*> temp; + + temp.GetVector()->swap(Pending); + + for (auto& pending : *temp.GetVector()) { + if (pending->IsComplete()) { + pending->FireCompletionCallback(GetImplBase()); + } else { + Pending.push_back(pending); + } + } + + if (!Pending.empty()) { + return; + } + + if (!Done) { + Done = !ReplyReceived(); + } else { + if (Pending.empty()) { + if (!SetDoneCalled) { + ParentTask->SetDone(); + SetDoneCalled = true; + } + //ReleaseRef(); + return; + } + } + } +} + +bool TTaskRunnerBase::IsRunningInThisThread() const { + return ThreadCurrentTask == this; +} + +TSubtaskCompletion::~TSubtaskCompletion() { + ESubtaskState state = State.Get(); + Y_ASSERT(state == CREATED || state == DONE || state == CANCELED); +} + +void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) { + Y_ASSERT(IsComplete()); + + if (!!CompletionFunc) { + TSubtaskCompletionFunc temp = CompletionFunc; + // completion func must be reset before calling it, + // because function may set it back + CompletionFunc = TSubtaskCompletionFunc(); + (task->*(temp.Func))(this); + } +} + +void NRainCheck::TSubtaskCompletion::Cancel() { + for (;;) { + ESubtaskState state = State.Get(); + if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) { + return; + } + if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) { + return; + } + if (state == DONE && State.CompareAndSet(DONE, CANCELED)) { + return; + } + if (state == CANCEL_REQUESTED || state == CANCELED) { + return; + } + } +} + +void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) { + Y_ASSERT(!TaskRunner); + Y_ASSERT(!!parent); + + TaskRunner = parent; + + parent->Pending.push_back(this); + + parent->RefV(); + + for (;;) { + ESubtaskState current = State.Get(); + if (current != CREATED && current != DONE) { + Y_FAIL("current state should be CREATED or DONE: %s", ToCString(current)); + } + if (State.CompareAndSet(current, RUNNING)) { + return; + } + } +} + +void TSubtaskCompletion::SetDone() { + Y_ASSERT(!!TaskRunner); + TTaskRunnerBase* temp = TaskRunner; + TaskRunner = nullptr; + + for (;;) { + ESubtaskState state = State.Get(); + if (state == RUNNING) { + if (State.CompareAndSet(RUNNING, DONE)) { + break; + } + } else if (state == CANCEL_REQUESTED) { + if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) { + break; + } + } else { + Y_FAIL("cannot SetDone: unknown state: %s", ToCString(state)); + } + } + + temp->ScheduleV(); + temp->UnRefV(); +} + +#if 0 +void NRainCheck::TTaskRunnerBase::RetainRef() +{ + if (HoldsSelfReference) { + return; + } + HoldsSelfReference = true; + Ref(); +} + +void NRainCheck::TTaskRunnerBase::ReleaseRef() +{ + if (!HoldsSelfReference) { + return; + } + HoldsSelfReference = false; + DecRef(); +} +#endif + +void TTaskRunnerBase::AssertInThisThread() const { + Y_ASSERT(IsRunningInThisThread()); +} + +TTaskRunnerBase* TTaskRunnerBase::CurrentTask() { + Y_VERIFY(!!ThreadCurrentTask); + return ThreadCurrentTask; +} + +ITaskBase* TTaskRunnerBase::CurrentTaskImpl() { + return CurrentTask()->GetImplBase(); +} + +TString TTaskRunnerBase::GetStatusSingleLine() { + return TypeName(*Impl); +} + +bool NRainCheck::AreWeInsideTask() { + return ThreadCurrentTask != nullptr; +} |