aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/rain_check/core/task.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/rain_check/core/task.cpp
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/rain_check/core/task.cpp')
-rw-r--r--library/cpp/messagebus/rain_check/core/task.cpp342
1 files changed, 171 insertions, 171 deletions
diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp
index a098437d53..d20ae30402 100644
--- a/library/cpp/messagebus/rain_check/core/task.cpp
+++ b/library/cpp/messagebus/rain_check/core/task.cpp
@@ -1,216 +1,216 @@
#include "rain_check.h"
-
+
#include <library/cpp/messagebus/actor/temp_tls_vector.h>
-
+
#include <util/system/type_name.h>
#include <util/system/tls.h>
-
-using namespace NRainCheck;
-using namespace NRainCheck::NPrivate;
-
-using namespace NActor;
-
-namespace {
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+
+using namespace NActor;
+
+namespace {
Y_POD_STATIC_THREAD(TTaskRunnerBase*)
ThreadCurrentTask;
-}
-
+}
+
void TNopSubtaskListener::SetDone() {
}
-
-TNopSubtaskListener TNopSubtaskListener::Instance;
-
-TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl)
- : TActor<TTaskRunnerBase>(env->GetExecutor())
- , Impl(impl)
- , ParentTask(parentTask)
- //, HoldsSelfReference(false)
- , Done(false)
- , SetDoneCalled(false)
-{
-}
-
-TTaskRunnerBase::~TTaskRunnerBase() {
+
+TNopSubtaskListener TNopSubtaskListener::Instance;
+
+TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl)
+ : TActor<TTaskRunnerBase>(env->GetExecutor())
+ , Impl(impl)
+ , ParentTask(parentTask)
+ //, HoldsSelfReference(false)
+ , Done(false)
+ , SetDoneCalled(false)
+{
+}
+
+TTaskRunnerBase::~TTaskRunnerBase() {
Y_ASSERT(Done);
-}
-
-namespace {
- struct TRunningInThisThreadGuard {
- TTaskRunnerBase* const Task;
- TRunningInThisThreadGuard(TTaskRunnerBase* task)
- : Task(task)
- {
+}
+
+namespace {
+ struct TRunningInThisThreadGuard {
+ TTaskRunnerBase* const Task;
+ TRunningInThisThreadGuard(TTaskRunnerBase* task)
+ : Task(task)
+ {
Y_ASSERT(!ThreadCurrentTask);
- ThreadCurrentTask = task;
- }
-
- ~TRunningInThisThreadGuard() {
+ ThreadCurrentTask = task;
+ }
+
+ ~TRunningInThisThreadGuard() {
Y_ASSERT(ThreadCurrentTask == Task);
ThreadCurrentTask = nullptr;
- }
- };
-}
-
+ }
+ };
+}
+
void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) {
Y_ASSERT(RefCount() > 0);
-
- TRunningInThisThreadGuard g(this);
-
- //RetainRef();
-
- for (;;) {
- TTempTlsVector<TSubtaskCompletion*> temp;
-
- temp.GetVector()->swap(Pending);
-
+
+ TRunningInThisThreadGuard g(this);
+
+ //RetainRef();
+
+ for (;;) {
+ TTempTlsVector<TSubtaskCompletion*> temp;
+
+ temp.GetVector()->swap(Pending);
+
for (auto& pending : *temp.GetVector()) {
if (pending->IsComplete()) {
pending->FireCompletionCallback(GetImplBase());
- } else {
+ } else {
Pending.push_back(pending);
- }
- }
-
- if (!Pending.empty()) {
- return;
- }
-
- if (!Done) {
- Done = !ReplyReceived();
- } else {
- if (Pending.empty()) {
- if (!SetDoneCalled) {
- ParentTask->SetDone();
- SetDoneCalled = true;
- }
- //ReleaseRef();
- return;
- }
- }
- }
-}
-
+ }
+ }
+
+ if (!Pending.empty()) {
+ return;
+ }
+
+ if (!Done) {
+ Done = !ReplyReceived();
+ } else {
+ if (Pending.empty()) {
+ if (!SetDoneCalled) {
+ ParentTask->SetDone();
+ SetDoneCalled = true;
+ }
+ //ReleaseRef();
+ return;
+ }
+ }
+ }
+}
+
bool TTaskRunnerBase::IsRunningInThisThread() const {
- return ThreadCurrentTask == this;
-}
-
+ return ThreadCurrentTask == this;
+}
+
TSubtaskCompletion::~TSubtaskCompletion() {
- ESubtaskState state = State.Get();
+ ESubtaskState state = State.Get();
Y_ASSERT(state == CREATED || state == DONE || state == CANCELED);
-}
-
+}
+
void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) {
Y_ASSERT(IsComplete());
-
- if (!!CompletionFunc) {
- TSubtaskCompletionFunc temp = CompletionFunc;
- // completion func must be reset before calling it,
- // because function may set it back
- CompletionFunc = TSubtaskCompletionFunc();
- (task->*(temp.Func))(this);
- }
-}
-
+
+ if (!!CompletionFunc) {
+ TSubtaskCompletionFunc temp = CompletionFunc;
+ // completion func must be reset before calling it,
+ // because function may set it back
+ CompletionFunc = TSubtaskCompletionFunc();
+ (task->*(temp.Func))(this);
+ }
+}
+
void NRainCheck::TSubtaskCompletion::Cancel() {
- for (;;) {
- ESubtaskState state = State.Get();
- if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) {
- return;
- }
- if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) {
- return;
- }
- if (state == DONE && State.CompareAndSet(DONE, CANCELED)) {
- return;
- }
- if (state == CANCEL_REQUESTED || state == CANCELED) {
- return;
- }
- }
-}
-
+ for (;;) {
+ ESubtaskState state = State.Get();
+ if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) {
+ return;
+ }
+ if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) {
+ return;
+ }
+ if (state == DONE && State.CompareAndSet(DONE, CANCELED)) {
+ return;
+ }
+ if (state == CANCEL_REQUESTED || state == CANCELED) {
+ return;
+ }
+ }
+}
+
void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) {
Y_ASSERT(!TaskRunner);
Y_ASSERT(!!parent);
-
- TaskRunner = parent;
-
- parent->Pending.push_back(this);
-
- parent->RefV();
-
- for (;;) {
- ESubtaskState current = State.Get();
- if (current != CREATED && current != DONE) {
+
+ TaskRunner = parent;
+
+ parent->Pending.push_back(this);
+
+ parent->RefV();
+
+ for (;;) {
+ ESubtaskState current = State.Get();
+ if (current != CREATED && current != DONE) {
Y_FAIL("current state should be CREATED or DONE: %s", ToCString(current));
- }
- if (State.CompareAndSet(current, RUNNING)) {
- return;
- }
- }
-}
-
+ }
+ if (State.CompareAndSet(current, RUNNING)) {
+ return;
+ }
+ }
+}
+
void TSubtaskCompletion::SetDone() {
Y_ASSERT(!!TaskRunner);
- TTaskRunnerBase* temp = TaskRunner;
+ TTaskRunnerBase* temp = TaskRunner;
TaskRunner = nullptr;
-
- for (;;) {
- ESubtaskState state = State.Get();
- if (state == RUNNING) {
- if (State.CompareAndSet(RUNNING, DONE)) {
- break;
- }
- } else if (state == CANCEL_REQUESTED) {
- if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) {
- break;
- }
- } else {
+
+ for (;;) {
+ ESubtaskState state = State.Get();
+ if (state == RUNNING) {
+ if (State.CompareAndSet(RUNNING, DONE)) {
+ break;
+ }
+ } else if (state == CANCEL_REQUESTED) {
+ if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) {
+ break;
+ }
+ } else {
Y_FAIL("cannot SetDone: unknown state: %s", ToCString(state));
- }
- }
-
- temp->ScheduleV();
- temp->UnRefV();
-}
-
-#if 0
-void NRainCheck::TTaskRunnerBase::RetainRef()
-{
- if (HoldsSelfReference) {
- return;
- }
- HoldsSelfReference = true;
- Ref();
-}
-
-void NRainCheck::TTaskRunnerBase::ReleaseRef()
-{
- if (!HoldsSelfReference) {
- return;
- }
- HoldsSelfReference = false;
- DecRef();
-}
-#endif
-
+ }
+ }
+
+ temp->ScheduleV();
+ temp->UnRefV();
+}
+
+#if 0
+void NRainCheck::TTaskRunnerBase::RetainRef()
+{
+ if (HoldsSelfReference) {
+ return;
+ }
+ HoldsSelfReference = true;
+ Ref();
+}
+
+void NRainCheck::TTaskRunnerBase::ReleaseRef()
+{
+ if (!HoldsSelfReference) {
+ return;
+ }
+ HoldsSelfReference = false;
+ DecRef();
+}
+#endif
+
void TTaskRunnerBase::AssertInThisThread() const {
Y_ASSERT(IsRunningInThisThread());
-}
-
+}
+
TTaskRunnerBase* TTaskRunnerBase::CurrentTask() {
Y_VERIFY(!!ThreadCurrentTask);
- return ThreadCurrentTask;
-}
-
+ return ThreadCurrentTask;
+}
+
ITaskBase* TTaskRunnerBase::CurrentTaskImpl() {
return CurrentTask()->GetImplBase();
}
TString TTaskRunnerBase::GetStatusSingleLine() {
return TypeName(*Impl);
-}
-
+}
+
bool NRainCheck::AreWeInsideTask() {
return ThreadCurrentTask != nullptr;
}