aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2024-11-20 19:17:36 +0300
committerGitHub <noreply@github.com>2024-11-20 19:17:36 +0300
commit4644e090d713dd56dd5734b49ad80400f1a21414 (patch)
treec01fd44c8caf0ea47583072143715f62ca5f13c0
parentdd01e456bccfc62249474d734efb54aeec53f28a (diff)
downloadydb-4644e090d713dd56dd5734b49ad80400f1a21414.tar.gz
Restore mailbox_queue_revolving.h (#11809)
-rw-r--r--ydb/library/actors/core/mailbox_queue_revolving.h214
1 files changed, 214 insertions, 0 deletions
diff --git a/ydb/library/actors/core/mailbox_queue_revolving.h b/ydb/library/actors/core/mailbox_queue_revolving.h
new file mode 100644
index 0000000000..99388d9f2d
--- /dev/null
+++ b/ydb/library/actors/core/mailbox_queue_revolving.h
@@ -0,0 +1,214 @@
+#pragma once
+
+#include "defs.h"
+#include <ydb/library/actors/util/queue_chunk.h>
+
+namespace NActors {
+ // add some concurrency to basic queue to avoid hangs under contention (we pay with memory, so use only when really expect contention)
+ // ordering: every completed push guarantied to seen before any not-yet-initiated push. parallel pushes could reorder (and that is natural for concurrent queues).
+ // try to place reader/writer on different cache-lines to avoid congestion b/w reader and writers.
+ // if strict ordering does not matter - look at TManyOneQueue.
+
+ template <typename T, ui32 TWriteConcurrency = 3, ui32 TSize = 128>
+ class TRevolvingMailboxQueue {
+ static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
+
+ struct TValTagPair {
+ volatile T Value;
+ volatile ui64 Tag;
+ };
+
+ typedef TQueueChunk<TValTagPair, TSize> TChunk;
+
+ static_assert(sizeof(TAtomic) == sizeof(TChunk*), "expect sizeof(TAtomic) == sizeof(TChunk*)");
+ static_assert(sizeof(TAtomic) == sizeof(ui64), "expect sizeof(TAtomic) == sizeof(ui64)");
+
+ public:
+ class TWriter;
+
+ class TReader {
+ TChunk* ReadFrom[TWriteConcurrency];
+ ui32 ReadPosition[TWriteConcurrency];
+
+ friend class TRevolvingMailboxQueue<T, TWriteConcurrency, TSize>::TWriter; // for access to ReadFrom in constructor
+
+ bool ChunkHead(ui32 idx, ui64* tag, T* value) {
+ TChunk* head = ReadFrom[idx];
+ const ui32 pos = ReadPosition[idx];
+ if (pos != TChunk::EntriesCount) {
+ if (const T xval = AtomicLoad(&head->Entries[pos].Value)) {
+ const ui64 xtag = head->Entries[pos].Tag;
+ if (xtag < *tag) {
+ *value = xval;
+ *tag = xtag;
+ return true;
+ }
+ }
+ } else if (TChunk* next = AtomicLoad(&head->Next)) {
+ ReadFrom[idx] = next;
+ delete head;
+ ReadPosition[idx] = 0;
+ return ChunkHead(idx, tag, value);
+ }
+
+ return false;
+ }
+
+ T Head(bool pop) {
+ ui64 tag = Max<ui64>();
+ T ret = T{};
+ ui32 idx = 0;
+
+ for (ui32 i = 0; i < TWriteConcurrency; ++i)
+ if (ChunkHead(i, &tag, &ret))
+ idx = i;
+
+ // w/o second pass we could reorder updates with 'already scanned' range
+ if (ret) {
+ for (ui32 i = 0; i < TWriteConcurrency; ++i)
+ if (ChunkHead(i, &tag, &ret))
+ idx = i;
+ }
+
+ if (pop && ret)
+ ++ReadPosition[idx];
+
+ return ret;
+ }
+
+ public:
+ TReader() {
+ for (ui32 i = 0; i != TWriteConcurrency; ++i) {
+ ReadFrom[i] = new TChunk();
+ ReadPosition[i] = 0;
+ }
+ }
+
+ ~TReader() {
+ Y_DEBUG_ABORT_UNLESS(Head() == 0);
+ for (ui32 i = 0; i < TWriteConcurrency; ++i)
+ delete ReadFrom[i];
+ }
+
+ T Pop() {
+ return Head(true);
+ }
+
+ T Head() {
+ return Head(false);
+ }
+
+ class TReadIterator {
+ TChunk* ReadFrom[TWriteConcurrency];
+ ui32 ReadPosition[TWriteConcurrency];
+
+ bool ChunkHead(ui32 idx, ui64* tag, T* value) {
+ TChunk* head = ReadFrom[idx];
+ const ui32 pos = ReadPosition[idx];
+ if (pos != TChunk::EntriesCount) {
+ if (const T xval = AtomicLoad(&head->Entries[pos].Value)) {
+ const ui64 xtag = head->Entries[pos].Tag;
+ if (xtag < *tag) {
+ *value = xval;
+ *tag = xtag;
+ return true;
+ }
+ }
+ } else if (TChunk* next = AtomicLoad(&head->Next)) {
+ ReadFrom[idx] = next;
+ ReadPosition[idx] = 0;
+ return ChunkHead(idx, tag, value);
+ }
+
+ return false;
+ }
+
+ public:
+ TReadIterator(TChunk* const* readFrom, const ui32* readPosition) {
+ memcpy(ReadFrom, readFrom, TWriteConcurrency * sizeof(TChunk*));
+ memcpy(ReadPosition, readPosition, TWriteConcurrency * sizeof(ui32));
+ }
+
+ T Next() {
+ ui64 tag = Max<ui64>();
+ T ret = T{};
+ ui32 idx = 0;
+
+ for (ui32 i = 0; i < TWriteConcurrency; ++i)
+ if (ChunkHead(i, &tag, &ret))
+ idx = i;
+
+ // w/o second pass we could reorder updates with 'already scanned' range
+ if (ret) {
+ for (ui32 i = 0; i < TWriteConcurrency; ++i)
+ if (ChunkHead(i, &tag, &ret))
+ idx = i;
+ }
+
+ if (ret)
+ ++ReadPosition[idx];
+
+ return ret;
+ }
+ };
+
+ TReadIterator Iterator() const {
+ return TReadIterator(ReadFrom, ReadPosition);
+ }
+ };
+
+ class TWriter {
+ TChunk* volatile WriteTo[TWriteConcurrency];
+ volatile ui64 Tag;
+ ui32 WritePosition[TWriteConcurrency];
+
+ public:
+ TWriter(const TReader& reader)
+ : Tag(0)
+ {
+ for (ui32 i = 0; i != TWriteConcurrency; ++i) {
+ WriteTo[i] = reader.ReadFrom[i];
+ WritePosition[i] = 0;
+ }
+ }
+
+ bool TryPush(T x) {
+ Y_ABORT_UNLESS(x != 0);
+
+ for (ui32 i = 0; i != TWriteConcurrency; ++i) {
+ if (RelaxedLoad(&WriteTo[i]) != nullptr) {
+ if (TChunk* writeTo = AtomicSwap(&WriteTo[i], nullptr)) {
+ const ui64 nextTag = AtomicIncrement(Tag);
+ Y_DEBUG_ABORT_UNLESS(nextTag < Max<ui64>());
+ const ui32 writePosition = WritePosition[i];
+ if (writePosition != TChunk::EntriesCount) {
+ writeTo->Entries[writePosition].Tag = nextTag;
+ AtomicStore(&writeTo->Entries[writePosition].Value, x);
+ ++WritePosition[i];
+ } else {
+ TChunk* next = new TChunk();
+ next->Entries[0].Tag = nextTag;
+ next->Entries[0].Value = x;
+ AtomicStore(&writeTo->Next, next);
+ writeTo = next;
+ WritePosition[i] = 1;
+ }
+ AtomicStore(WriteTo + i, writeTo);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ ui32 Push(T x) {
+ ui32 spins = 0;
+ while (!TryPush(x)) {
+ ++spins;
+ SpinLockPause();
+ }
+ return spins;
+ }
+ };
+ };
+}