aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/actor/queue_in_actor.h
blob: 9865996532ca43fcc2376f836b24776b56881046 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#pragma once

#include "actor.h"
#include "queue_for_actor.h"

#include <functional>

namespace NActor {
    template <typename TItem>
    class IQueueInActor {
    public:
        virtual void EnqueueAndScheduleV(const TItem& item) = 0;
        virtual void DequeueAllV() = 0;
        virtual void DequeueAllLikelyEmptyV() = 0;

        virtual ~IQueueInActor() {
        }
    };

    template <typename TThis, typename TItem, typename TActorTag = TDefaultTag, typename TQueueTag = TDefaultTag>
    class TQueueInActor: public IQueueInActor<TItem> {
        typedef TQueueInActor<TThis, TItem, TActorTag, TQueueTag> TSelf;

    public:
        // TODO: make protected
        TQueueForActor<TItem> QueueInActor;

    private:
        TActor<TThis, TActorTag>* GetActor() {
            return GetThis();
        }

        TThis* GetThis() {
            return static_cast<TThis*>(this);
        }

        void ProcessItem(const TItem& item) {
            GetThis()->ProcessItem(TActorTag(), TQueueTag(), item);
        }

    public:
        void EnqueueAndNoSchedule(const TItem& item) {
            QueueInActor.Enqueue(item);
        }

        void EnqueueAndSchedule(const TItem& item) {
            EnqueueAndNoSchedule(item);
            GetActor()->Schedule();
        }

        void EnqueueAndScheduleV(const TItem& item) override {
            EnqueueAndSchedule(item);
        }

        void Clear() {
            QueueInActor.Clear();
        }

        void DequeueAll() {
            QueueInActor.DequeueAll(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1));
        }

        void DequeueAllV() override {
            return DequeueAll();
        }

        void DequeueAllLikelyEmpty() {
            QueueInActor.DequeueAllLikelyEmpty(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1));
        }

        void DequeueAllLikelyEmptyV() override {
            return DequeueAllLikelyEmpty();
        }

        bool IsEmpty() {
            return QueueInActor.IsEmpty();
        }
    };

}