aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/scheduler_queue.h
blob: 3b8fac28f0d0e65cd005b28f91c5b60cbc70fa0a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#pragma once

#include <library/cpp/actors/util/queue_chunk.h>

namespace NActors {
    class IEventHandle;
    class ISchedulerCookie;

    namespace NSchedulerQueue {
        struct TEntry {
            ui64 InstantMicroseconds;
            IEventHandle* Ev;
            ISchedulerCookie* Cookie;
        };

        struct TChunk : TQueueChunkDerived<TEntry, 512, TChunk> {};

        class TReader;
        class TWriter;
        class TWriterWithPadding;

        class TReader : ::TNonCopyable {
            TChunk* ReadFrom;
            ui32 ReadPosition;

            friend class TWriter;

        public:
            TReader()
                : ReadFrom(new TChunk())
                , ReadPosition(0)
            {
            }

            ~TReader() {
                while (TEntry* x = Pop()) {
                    if (x->Cookie)
                        x->Cookie->Detach();
                    delete x->Ev;
                }
                delete ReadFrom;
            }

            TEntry* Pop() {
                TChunk* head = ReadFrom;
                if (ReadPosition != TChunk::EntriesCount) {
                    if (AtomicLoad(&head->Entries[ReadPosition].InstantMicroseconds) != 0)
                        return const_cast<TEntry*>(&head->Entries[ReadPosition++]);
                    else
                        return nullptr;
                } else if (TChunk* next = AtomicLoad(&head->Next)) {
                    ReadFrom = next;
                    delete head;
                    ReadPosition = 0;
                    return Pop();
                }

                return nullptr;
            }
        };

        class TWriter : ::TNonCopyable {
            TChunk* WriteTo;
            ui32 WritePosition;

        public:
            TWriter()
                : WriteTo(nullptr)
                , WritePosition(0)
            {
            }

            void Init(const TReader& reader) {
                WriteTo = reader.ReadFrom;
                WritePosition = 0;
            }

            void Push(ui64 instantMicrosends, IEventHandle* ev, ISchedulerCookie* cookie) {
                if (Y_UNLIKELY(instantMicrosends == 0)) {
                    // Protect against Pop() getting stuck forever
                    instantMicrosends = 1;
                }
                if (WritePosition != TChunk::EntriesCount) {
                    volatile TEntry& entry = WriteTo->Entries[WritePosition];
                    entry.Cookie = cookie;
                    entry.Ev = ev;
                    AtomicStore(&entry.InstantMicroseconds, instantMicrosends);
                    ++WritePosition;
                } else {
                    TChunk* next = new TChunk();
                    volatile TEntry& entry = next->Entries[0];
                    entry.Cookie = cookie;
                    entry.Ev = ev;
                    entry.InstantMicroseconds = instantMicrosends;
                    AtomicStore(&WriteTo->Next, next);
                    WriteTo = next;
                    WritePosition = 1;
                }
            }
        };

        class TWriterWithPadding: public TWriter {
        private:
            ui8 CacheLinePadding[64 - sizeof(TWriter)];

            void UnusedCacheLinePadding() {
                Y_UNUSED(CacheLinePadding);
            }
        };

        struct TQueueType {
            TReader Reader;
            TWriter Writer;

            TQueueType() {
                Writer.Init(Reader);
            }
        };
    }
}