aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/rain_check/core/track.cpp
blob: 092a51a214353b9e5f6317cfb10d535dc658a8f0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include "track.h"

using namespace NRainCheck;
using namespace NRainCheck::NPrivate;

void TTaskTrackerReceipt::SetDone() {
    TaskTracker->GetQueue<TTaskTrackerReceipt*>()->EnqueueAndSchedule(this);
}

TString TTaskTrackerReceipt::GetStatusSingleLine() {
    return Task->GetStatusSingleLine();
}

TTaskTracker::TTaskTracker(NActor::TExecutor* executor)
    : NActor::TActor<TTaskTracker>(executor)
{
}

TTaskTracker::~TTaskTracker() {
    Y_ASSERT(Tasks.Empty());
}

void TTaskTracker::Shutdown() {
    ShutdownFlag.Set(true);
    Schedule();
    ShutdownEvent.WaitI();
}

void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, ITaskFactory* taskFactory) {
    THolder<ITaskFactory> holder(taskFactory);

    THolder<TTaskTrackerReceipt> receipt(new TTaskTrackerReceipt(this));
    receipt->Task = taskFactory->NewTask(receipt.Get());

    Tasks.PushBack(receipt.Release());
}

void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TTaskTrackerReceipt* receipt) {
    Y_ASSERT(!receipt->Empty());
    receipt->Unlink();
    delete receipt;
}

void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<TTaskTrackerStatus>* status) {
    TTaskTrackerStatus s;
    s.Size = Tasks.Size();
    status->SetResult(s);
}

void TTaskTracker::Act(NActor::TDefaultTag) {
    GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->DequeueAll();
    GetQueue<ITaskFactory*>()->DequeueAll();
    GetQueue<TTaskTrackerReceipt*>()->DequeueAll();

    if (ShutdownFlag.Get()) {
        if (Tasks.Empty()) {
            ShutdownEvent.Signal();
        }
    }
}

ui32 TTaskTracker::Size() {
    TAsyncResult<TTaskTrackerStatus> r;
    GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->EnqueueAndSchedule(&r);
    return r.GetResult().Size;
}