aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/rain_check/core/task.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/rain_check/core/task.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/rain_check/core/task.cpp')
-rw-r--r--library/cpp/messagebus/rain_check/core/task.cpp216
1 files changed, 216 insertions, 0 deletions
diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp
new file mode 100644
index 0000000000..a098437d53
--- /dev/null
+++ b/library/cpp/messagebus/rain_check/core/task.cpp
@@ -0,0 +1,216 @@
+#include "rain_check.h"
+
+#include <library/cpp/messagebus/actor/temp_tls_vector.h>
+
+#include <util/system/type_name.h>
+#include <util/system/tls.h>
+
+using namespace NRainCheck;
+using namespace NRainCheck::NPrivate;
+
+using namespace NActor;
+
+namespace {
+ Y_POD_STATIC_THREAD(TTaskRunnerBase*)
+ ThreadCurrentTask;
+}
+
+void TNopSubtaskListener::SetDone() {
+}
+
+TNopSubtaskListener TNopSubtaskListener::Instance;
+
+TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl)
+ : TActor<TTaskRunnerBase>(env->GetExecutor())
+ , Impl(impl)
+ , ParentTask(parentTask)
+ //, HoldsSelfReference(false)
+ , Done(false)
+ , SetDoneCalled(false)
+{
+}
+
+TTaskRunnerBase::~TTaskRunnerBase() {
+ Y_ASSERT(Done);
+}
+
+namespace {
+ struct TRunningInThisThreadGuard {
+ TTaskRunnerBase* const Task;
+ TRunningInThisThreadGuard(TTaskRunnerBase* task)
+ : Task(task)
+ {
+ Y_ASSERT(!ThreadCurrentTask);
+ ThreadCurrentTask = task;
+ }
+
+ ~TRunningInThisThreadGuard() {
+ Y_ASSERT(ThreadCurrentTask == Task);
+ ThreadCurrentTask = nullptr;
+ }
+ };
+}
+
+void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) {
+ Y_ASSERT(RefCount() > 0);
+
+ TRunningInThisThreadGuard g(this);
+
+ //RetainRef();
+
+ for (;;) {
+ TTempTlsVector<TSubtaskCompletion*> temp;
+
+ temp.GetVector()->swap(Pending);
+
+ for (auto& pending : *temp.GetVector()) {
+ if (pending->IsComplete()) {
+ pending->FireCompletionCallback(GetImplBase());
+ } else {
+ Pending.push_back(pending);
+ }
+ }
+
+ if (!Pending.empty()) {
+ return;
+ }
+
+ if (!Done) {
+ Done = !ReplyReceived();
+ } else {
+ if (Pending.empty()) {
+ if (!SetDoneCalled) {
+ ParentTask->SetDone();
+ SetDoneCalled = true;
+ }
+ //ReleaseRef();
+ return;
+ }
+ }
+ }
+}
+
+bool TTaskRunnerBase::IsRunningInThisThread() const {
+ return ThreadCurrentTask == this;
+}
+
+TSubtaskCompletion::~TSubtaskCompletion() {
+ ESubtaskState state = State.Get();
+ Y_ASSERT(state == CREATED || state == DONE || state == CANCELED);
+}
+
+void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) {
+ Y_ASSERT(IsComplete());
+
+ if (!!CompletionFunc) {
+ TSubtaskCompletionFunc temp = CompletionFunc;
+ // completion func must be reset before calling it,
+ // because function may set it back
+ CompletionFunc = TSubtaskCompletionFunc();
+ (task->*(temp.Func))(this);
+ }
+}
+
+void NRainCheck::TSubtaskCompletion::Cancel() {
+ for (;;) {
+ ESubtaskState state = State.Get();
+ if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) {
+ return;
+ }
+ if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) {
+ return;
+ }
+ if (state == DONE && State.CompareAndSet(DONE, CANCELED)) {
+ return;
+ }
+ if (state == CANCEL_REQUESTED || state == CANCELED) {
+ return;
+ }
+ }
+}
+
+void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) {
+ Y_ASSERT(!TaskRunner);
+ Y_ASSERT(!!parent);
+
+ TaskRunner = parent;
+
+ parent->Pending.push_back(this);
+
+ parent->RefV();
+
+ for (;;) {
+ ESubtaskState current = State.Get();
+ if (current != CREATED && current != DONE) {
+ Y_FAIL("current state should be CREATED or DONE: %s", ToCString(current));
+ }
+ if (State.CompareAndSet(current, RUNNING)) {
+ return;
+ }
+ }
+}
+
+void TSubtaskCompletion::SetDone() {
+ Y_ASSERT(!!TaskRunner);
+ TTaskRunnerBase* temp = TaskRunner;
+ TaskRunner = nullptr;
+
+ for (;;) {
+ ESubtaskState state = State.Get();
+ if (state == RUNNING) {
+ if (State.CompareAndSet(RUNNING, DONE)) {
+ break;
+ }
+ } else if (state == CANCEL_REQUESTED) {
+ if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) {
+ break;
+ }
+ } else {
+ Y_FAIL("cannot SetDone: unknown state: %s", ToCString(state));
+ }
+ }
+
+ temp->ScheduleV();
+ temp->UnRefV();
+}
+
+#if 0
+void NRainCheck::TTaskRunnerBase::RetainRef()
+{
+ if (HoldsSelfReference) {
+ return;
+ }
+ HoldsSelfReference = true;
+ Ref();
+}
+
+void NRainCheck::TTaskRunnerBase::ReleaseRef()
+{
+ if (!HoldsSelfReference) {
+ return;
+ }
+ HoldsSelfReference = false;
+ DecRef();
+}
+#endif
+
+void TTaskRunnerBase::AssertInThisThread() const {
+ Y_ASSERT(IsRunningInThisThread());
+}
+
+TTaskRunnerBase* TTaskRunnerBase::CurrentTask() {
+ Y_VERIFY(!!ThreadCurrentTask);
+ return ThreadCurrentTask;
+}
+
+ITaskBase* TTaskRunnerBase::CurrentTaskImpl() {
+ return CurrentTask()->GetImplBase();
+}
+
+TString TTaskRunnerBase::GetStatusSingleLine() {
+ return TypeName(*Impl);
+}
+
+bool NRainCheck::AreWeInsideTask() {
+ return ThreadCurrentTask != nullptr;
+}