#include "rain_check.h"
#include <library/cpp/messagebus/actor/temp_tls_vector.h>
#include <util/system/type_name.h>
#include <util/system/tls.h>
using namespace NRainCheck;
using namespace NRainCheck::NPrivate;
using namespace NActor;
namespace {
Y_POD_STATIC_THREAD(TTaskRunnerBase*)
ThreadCurrentTask;
}
void TNopSubtaskListener::SetDone() {
}
TNopSubtaskListener TNopSubtaskListener::Instance;
TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl)
: TActor<TTaskRunnerBase>(env->GetExecutor())
, Impl(impl)
, ParentTask(parentTask)
//, HoldsSelfReference(false)
, Done(false)
, SetDoneCalled(false)
{
}
TTaskRunnerBase::~TTaskRunnerBase() {
Y_ASSERT(Done);
}
namespace {
struct TRunningInThisThreadGuard {
TTaskRunnerBase* const Task;
TRunningInThisThreadGuard(TTaskRunnerBase* task)
: Task(task)
{
Y_ASSERT(!ThreadCurrentTask);
ThreadCurrentTask = task;
}
~TRunningInThisThreadGuard() {
Y_ASSERT(ThreadCurrentTask == Task);
ThreadCurrentTask = nullptr;
}
};
}
void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) {
Y_ASSERT(RefCount() > 0);
TRunningInThisThreadGuard g(this);
//RetainRef();
for (;;) {
TTempTlsVector<TSubtaskCompletion*> temp;
temp.GetVector()->swap(Pending);
for (auto& pending : *temp.GetVector()) {
if (pending->IsComplete()) {
pending->FireCompletionCallback(GetImplBase());
} else {
Pending.push_back(pending);
}
}
if (!Pending.empty()) {
return;
}
if (!Done) {
Done = !ReplyReceived();
} else {
if (Pending.empty()) {
if (!SetDoneCalled) {
ParentTask->SetDone();
SetDoneCalled = true;
}
//ReleaseRef();
return;
}
}
}
}
bool TTaskRunnerBase::IsRunningInThisThread() const {
return ThreadCurrentTask == this;
}
TSubtaskCompletion::~TSubtaskCompletion() {
ESubtaskState state = State.Get();
Y_ASSERT(state == CREATED || state == DONE || state == CANCELED);
}
void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) {
Y_ASSERT(IsComplete());
if (!!CompletionFunc) {
TSubtaskCompletionFunc temp = CompletionFunc;
// completion func must be reset before calling it,
// because function may set it back
CompletionFunc = TSubtaskCompletionFunc();
(task->*(temp.Func))(this);
}
}
void NRainCheck::TSubtaskCompletion::Cancel() {
for (;;) {
ESubtaskState state = State.Get();
if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) {
return;
}
if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) {
return;
}
if (state == DONE && State.CompareAndSet(DONE, CANCELED)) {
return;
}
if (state == CANCEL_REQUESTED || state == CANCELED) {
return;
}
}
}
void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) {
Y_ASSERT(!TaskRunner);
Y_ASSERT(!!parent);
TaskRunner = parent;
parent->Pending.push_back(this);
parent->RefV();
for (;;) {
ESubtaskState current = State.Get();
if (current != CREATED && current != DONE) {
Y_ABORT("current state should be CREATED or DONE: %s", ToCString(current));
}
if (State.CompareAndSet(current, RUNNING)) {
return;
}
}
}
void TSubtaskCompletion::SetDone() {
Y_ASSERT(!!TaskRunner);
TTaskRunnerBase* temp = TaskRunner;
TaskRunner = nullptr;
for (;;) {
ESubtaskState state = State.Get();
if (state == RUNNING) {
if (State.CompareAndSet(RUNNING, DONE)) {
break;
}
} else if (state == CANCEL_REQUESTED) {
if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) {
break;
}
} else {
Y_ABORT("cannot SetDone: unknown state: %s", ToCString(state));
}
}
temp->ScheduleV();
temp->UnRefV();
}
#if 0
void NRainCheck::TTaskRunnerBase::RetainRef()
{
if (HoldsSelfReference) {
return;
}
HoldsSelfReference = true;
Ref();
}
void NRainCheck::TTaskRunnerBase::ReleaseRef()
{
if (!HoldsSelfReference) {
return;
}
HoldsSelfReference = false;
DecRef();
}
#endif
void TTaskRunnerBase::AssertInThisThread() const {
Y_ASSERT(IsRunningInThisThread());
}
TTaskRunnerBase* TTaskRunnerBase::CurrentTask() {
Y_ABORT_UNLESS(!!ThreadCurrentTask);
return ThreadCurrentTask;
}
ITaskBase* TTaskRunnerBase::CurrentTaskImpl() {
return CurrentTask()->GetImplBase();
}
TString TTaskRunnerBase::GetStatusSingleLine() {
return TypeName(*Impl);
}
bool NRainCheck::AreWeInsideTask() {
return ThreadCurrentTask != nullptr;
}