blob: 092a51a214353b9e5f6317cfb10d535dc658a8f0 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
#include "track.h"
using namespace NRainCheck;
using namespace NRainCheck::NPrivate;
void TTaskTrackerReceipt::SetDone() {
TaskTracker->GetQueue<TTaskTrackerReceipt*>()->EnqueueAndSchedule(this);
}
TString TTaskTrackerReceipt::GetStatusSingleLine() {
return Task->GetStatusSingleLine();
}
TTaskTracker::TTaskTracker(NActor::TExecutor* executor)
: NActor::TActor<TTaskTracker>(executor)
{
}
TTaskTracker::~TTaskTracker() {
Y_ASSERT(Tasks.Empty());
}
void TTaskTracker::Shutdown() {
ShutdownFlag.Set(true);
Schedule();
ShutdownEvent.WaitI();
}
void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, ITaskFactory* taskFactory) {
THolder<ITaskFactory> holder(taskFactory);
THolder<TTaskTrackerReceipt> receipt(new TTaskTrackerReceipt(this));
receipt->Task = taskFactory->NewTask(receipt.Get());
Tasks.PushBack(receipt.Release());
}
void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TTaskTrackerReceipt* receipt) {
Y_ASSERT(!receipt->Empty());
receipt->Unlink();
delete receipt;
}
void TTaskTracker::ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<TTaskTrackerStatus>* status) {
TTaskTrackerStatus s;
s.Size = Tasks.Size();
status->SetResult(s);
}
void TTaskTracker::Act(NActor::TDefaultTag) {
GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->DequeueAll();
GetQueue<ITaskFactory*>()->DequeueAll();
GetQueue<TTaskTrackerReceipt*>()->DequeueAll();
if (ShutdownFlag.Get()) {
if (Tasks.Empty()) {
ShutdownEvent.Signal();
}
}
}
ui32 TTaskTracker::Size() {
TAsyncResult<TTaskTrackerStatus> r;
GetQueue<TAsyncResult<TTaskTrackerStatus>*>()->EnqueueAndSchedule(&r);
return r.GetResult().Size;
}
|