blob: 68b5fdc7d96813c281bf80cdd7353b96a7b7bd30 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
#pragma once
#include "scheduler_cookie.h"
#include <library/cpp/actors/util/queue_chunk.h>
namespace NActors {
class IEventHandle;
class ISchedulerCookie;
namespace NSchedulerQueue {
struct TEntry {
ui64 InstantMicroseconds;
IEventHandle* Ev;
ISchedulerCookie* Cookie;
};
struct TChunk : TQueueChunkDerived<TEntry, 512, TChunk> {};
class TReader;
class TWriter;
class TWriterWithPadding;
class TReader : ::TNonCopyable {
TChunk* ReadFrom;
ui32 ReadPosition;
friend class TWriter;
public:
TReader()
: ReadFrom(new TChunk())
, ReadPosition(0)
{
}
~TReader() {
while (TEntry* x = Pop()) {
if (x->Cookie)
x->Cookie->Detach();
delete x->Ev;
}
delete ReadFrom;
}
TEntry* Pop() {
TChunk* head = ReadFrom;
if (ReadPosition != TChunk::EntriesCount) {
if (AtomicLoad(&head->Entries[ReadPosition].InstantMicroseconds) != 0)
return const_cast<TEntry*>(&head->Entries[ReadPosition++]);
else
return nullptr;
} else if (TChunk* next = AtomicLoad(&head->Next)) {
ReadFrom = next;
delete head;
ReadPosition = 0;
return Pop();
}
return nullptr;
}
};
class TWriter : ::TNonCopyable {
TChunk* WriteTo;
ui32 WritePosition;
public:
TWriter()
: WriteTo(nullptr)
, WritePosition(0)
{
}
void Init(const TReader& reader) {
WriteTo = reader.ReadFrom;
WritePosition = 0;
}
void Push(ui64 instantMicrosends, IEventHandle* ev, ISchedulerCookie* cookie) {
if (Y_UNLIKELY(instantMicrosends == 0)) {
// Protect against Pop() getting stuck forever
instantMicrosends = 1;
}
if (WritePosition != TChunk::EntriesCount) {
volatile TEntry& entry = WriteTo->Entries[WritePosition];
entry.Cookie = cookie;
entry.Ev = ev;
AtomicStore(&entry.InstantMicroseconds, instantMicrosends);
++WritePosition;
} else {
TChunk* next = new TChunk();
volatile TEntry& entry = next->Entries[0];
entry.Cookie = cookie;
entry.Ev = ev;
entry.InstantMicroseconds = instantMicrosends;
AtomicStore(&WriteTo->Next, next);
WriteTo = next;
WritePosition = 1;
}
}
};
class TWriterWithPadding: public TWriter {
private:
ui8 CacheLinePadding[64 - sizeof(TWriter)];
void UnusedCacheLinePadding() {
Y_UNUSED(CacheLinePadding);
}
};
struct TQueueType {
TReader Reader;
TWriter Writer;
TQueueType() {
Writer.Init(Reader);
}
};
}
}
|