aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/messagebus/rain_check/core/task.cpp
blob: 0d4dd198a578d09c5cfbf32b096c1268f90c1c14 (plain) (tree)
1
2
3
4
5
6
                       
 
                                                         
 
                                  
                            





                                     
                                         
 
                                     












                                                                                                   
                   






                                                        
                                         


                                      
                                                
                                        


         
                                                            
                             








                                                 

                                                               
                    
                                           




















                                          
                                                     

                                     
                                           
                                      
                                                                     
 
                                                                  
                           








                                                           
                                               















                                                                                 
                                                              
                          








                                                    
                                                                                       





                                                    
                                    
                           
                                       
                         










                                                                  
                                                                           

























                                              
                                                  
                                      
 
                                                 
                                        

                             


                                               
                                                
                           
 
                                    
                                        
 
#include "rain_check.h"

#include <library/cpp/messagebus/actor/temp_tls_vector.h>

#include <util/system/type_name.h>
#include <util/system/tls.h>

using namespace NRainCheck;
using namespace NRainCheck::NPrivate;

using namespace NActor;

namespace {
    Y_POD_STATIC_THREAD(TTaskRunnerBase*)
    ThreadCurrentTask;
}

void TNopSubtaskListener::SetDone() {
}

TNopSubtaskListener TNopSubtaskListener::Instance;

TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl)
    : TActor<TTaskRunnerBase>(env->GetExecutor())
    , Impl(impl)
    , ParentTask(parentTask)
    //, HoldsSelfReference(false)
    , Done(false)
    , SetDoneCalled(false)
{
}

TTaskRunnerBase::~TTaskRunnerBase() {
    Y_ASSERT(Done);
}

namespace {
    struct TRunningInThisThreadGuard {
        TTaskRunnerBase* const Task;
        TRunningInThisThreadGuard(TTaskRunnerBase* task)
            : Task(task)
        {
            Y_ASSERT(!ThreadCurrentTask);
            ThreadCurrentTask = task;
        }

        ~TRunningInThisThreadGuard() {
            Y_ASSERT(ThreadCurrentTask == Task);
            ThreadCurrentTask = nullptr;
        }
    };
}

void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) {
    Y_ASSERT(RefCount() > 0);

    TRunningInThisThreadGuard g(this);

    //RetainRef();

    for (;;) {
        TTempTlsVector<TSubtaskCompletion*> temp;

        temp.GetVector()->swap(Pending);

        for (auto& pending : *temp.GetVector()) {
            if (pending->IsComplete()) {
                pending->FireCompletionCallback(GetImplBase());
            } else {
                Pending.push_back(pending);
            }
        }

        if (!Pending.empty()) {
            return;
        }

        if (!Done) {
            Done = !ReplyReceived();
        } else {
            if (Pending.empty()) {
                if (!SetDoneCalled) {
                    ParentTask->SetDone();
                    SetDoneCalled = true;
                }
                //ReleaseRef();
                return;
            }
        }
    }
}

bool TTaskRunnerBase::IsRunningInThisThread() const {
    return ThreadCurrentTask == this;
}

TSubtaskCompletion::~TSubtaskCompletion() {
    ESubtaskState state = State.Get();
    Y_ASSERT(state == CREATED || state == DONE || state == CANCELED);
}

void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) {
    Y_ASSERT(IsComplete());

    if (!!CompletionFunc) {
        TSubtaskCompletionFunc temp = CompletionFunc;
        // completion func must be reset before calling it,
        // because function may set it back
        CompletionFunc = TSubtaskCompletionFunc();
        (task->*(temp.Func))(this);
    }
}

void NRainCheck::TSubtaskCompletion::Cancel() {
    for (;;) {
        ESubtaskState state = State.Get();
        if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) {
            return;
        }
        if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) {
            return;
        }
        if (state == DONE && State.CompareAndSet(DONE, CANCELED)) {
            return;
        }
        if (state == CANCEL_REQUESTED || state == CANCELED) {
            return;
        }
    }
}

void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) {
    Y_ASSERT(!TaskRunner);
    Y_ASSERT(!!parent);

    TaskRunner = parent;

    parent->Pending.push_back(this);

    parent->RefV();

    for (;;) {
        ESubtaskState current = State.Get();
        if (current != CREATED && current != DONE) {
            Y_ABORT("current state should be CREATED or DONE: %s", ToCString(current));
        }
        if (State.CompareAndSet(current, RUNNING)) {
            return;
        }
    }
}

void TSubtaskCompletion::SetDone() {
    Y_ASSERT(!!TaskRunner);
    TTaskRunnerBase* temp = TaskRunner;
    TaskRunner = nullptr;

    for (;;) {
        ESubtaskState state = State.Get();
        if (state == RUNNING) {
            if (State.CompareAndSet(RUNNING, DONE)) {
                break;
            }
        } else if (state == CANCEL_REQUESTED) {
            if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) {
                break;
            }
        } else {
            Y_ABORT("cannot SetDone: unknown state: %s", ToCString(state));
        }
    }

    temp->ScheduleV();
    temp->UnRefV();
}

#if 0
void NRainCheck::TTaskRunnerBase::RetainRef()
{
    if (HoldsSelfReference) {
        return;
    }
    HoldsSelfReference = true;
    Ref();
}

void NRainCheck::TTaskRunnerBase::ReleaseRef()
{
    if (!HoldsSelfReference) {
        return;
    }
    HoldsSelfReference = false;
    DecRef();
}
#endif

void TTaskRunnerBase::AssertInThisThread() const {
    Y_ASSERT(IsRunningInThisThread());
}

TTaskRunnerBase* TTaskRunnerBase::CurrentTask() {
    Y_ABORT_UNLESS(!!ThreadCurrentTask);
    return ThreadCurrentTask;
}

ITaskBase* TTaskRunnerBase::CurrentTaskImpl() {
    return CurrentTask()->GetImplBase();
}

TString TTaskRunnerBase::GetStatusSingleLine() {
    return TypeName(*Impl);
}

bool NRainCheck::AreWeInsideTask() {
    return ThreadCurrentTask != nullptr;
}