aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/rain_check/core/track.h
blob: 88a4e6fd9ebc00cb1f5c83be61b69de86e726223 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#pragma once

#include "spawn.h"
#include "task.h"

#include <library/cpp/messagebus/async_result.h>
#include <library/cpp/messagebus/actor/queue_in_actor.h>
#include <library/cpp/messagebus/misc/atomic_box.h>

#include <util/generic/intrlist.h>
#include <util/system/event.h>

namespace NRainCheck {
    class TTaskTracker;

    namespace NPrivate {
        struct ITaskFactory {
            virtual TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener*) = 0;
            virtual ~ITaskFactory() { 
            } 
        };

        struct TTaskTrackerReceipt: public ISubtaskListener, public TIntrusiveListItem<TTaskTrackerReceipt> {
            TTaskTracker* const TaskTracker;
            TIntrusivePtr<TTaskRunnerBase> Task;

            TTaskTrackerReceipt(TTaskTracker* taskTracker) 
                : TaskTracker(taskTracker) 
            { 
            } 

            void SetDone() override;

            TString GetStatusSingleLine();
        };

        struct TTaskTrackerStatus {
            ui32 Size;
        };

    }

    class TTaskTracker
       : public TAtomicRefCount<TTaskTracker>, 
          public NActor::TActor<TTaskTracker>, 
          public NActor::TQueueInActor<TTaskTracker, NPrivate::ITaskFactory*>, 
          public NActor::TQueueInActor<TTaskTracker, NPrivate::TTaskTrackerReceipt*>, 
          public NActor::TQueueInActor<TTaskTracker, TAsyncResult<NPrivate::TTaskTrackerStatus>*> { 
        friend struct NPrivate::TTaskTrackerReceipt;

    private:
        TAtomicBox<bool> ShutdownFlag;
        TSystemEvent ShutdownEvent;

        TIntrusiveList<NPrivate::TTaskTrackerReceipt> Tasks;

        template <typename TItem>
        NActor::TQueueInActor<TTaskTracker, TItem>* GetQueue() {
            return this;
        }

    public:
        TTaskTracker(NActor::TExecutor* executor);
        ~TTaskTracker() override;

        void Shutdown();

        void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::ITaskFactory*);
        void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::TTaskTrackerReceipt*);
        void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<NPrivate::TTaskTrackerStatus>*);

        void Act(NActor::TDefaultTag);

        template <typename TTask, typename TEnv, typename TParam>
        void Spawn(TEnv* env, TParam param) {
            struct TTaskFactory: public NPrivate::ITaskFactory {
                TEnv* const Env;
                TParam Param;

                TTaskFactory(TEnv* env, TParam param) 
                    : Env(env) 
                    , Param(param) 
                { 
                } 

                TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener* subtaskListener) override {
                    return NRainCheck::SpawnTask<TTask>(Env, Param, subtaskListener).Get();
                }
            };

            GetQueue<NPrivate::ITaskFactory*>()->EnqueueAndSchedule(new TTaskFactory(env, param));
        }

        ui32 Size();
    };

}