#pragma once #include "defs.h" #include "queue_chunk.h" template <typename T, ui32 Size = 512, ui32 ConcurrencyFactor = 1, typename TChunk = TQueueChunk<T, Size>> class TUnorderedCache : TNonCopyable { static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); public: static constexpr ui32 Concurrency = ConcurrencyFactor * 4; private: struct TReadSlot { TChunk* volatile ReadFrom; volatile ui32 ReadPosition; char Padding[64 - sizeof(TChunk*) - sizeof(ui32)]; // 1 slot per cache line }; struct TWriteSlot { TChunk* volatile WriteTo; volatile ui32 WritePosition; char Padding[64 - sizeof(TChunk*) - sizeof(ui32)]; // 1 slot per cache line }; static_assert(sizeof(TReadSlot) == 64, "expect sizeof(TReadSlot) == 64"); static_assert(sizeof(TWriteSlot) == 64, "expect sizeof(TWriteSlot) == 64"); private: TReadSlot ReadSlots[Concurrency]; TWriteSlot WriteSlots[Concurrency]; static_assert(sizeof(TChunk*) == sizeof(TAtomic), "expect sizeof(TChunk*) == sizeof(TAtomic)"); private: struct TLockedWriter { TWriteSlot* Slot; TChunk* WriteTo; TLockedWriter() : Slot(nullptr) , WriteTo(nullptr) { } TLockedWriter(TWriteSlot* slot, TChunk* writeTo) : Slot(slot) , WriteTo(writeTo) { } ~TLockedWriter() noexcept { Drop(); } void Drop() { if (Slot) { AtomicStore(&Slot->WriteTo, WriteTo); Slot = nullptr; } } TLockedWriter(const TLockedWriter&) = delete; TLockedWriter& operator=(const TLockedWriter&) = delete; TLockedWriter(TLockedWriter&& rhs) : Slot(rhs.Slot) , WriteTo(rhs.WriteTo) { rhs.Slot = nullptr; } TLockedWriter& operator=(TLockedWriter&& rhs) { if (Y_LIKELY(this != &rhs)) { Drop(); Slot = rhs.Slot; WriteTo = rhs.WriteTo; rhs.Slot = nullptr; } return *this; } }; private: TLockedWriter LockWriter(ui64 writerRotation) { ui32 cycle = 0; for (;;) { TWriteSlot* slot = &WriteSlots[writerRotation % Concurrency]; if (AtomicLoad(&slot->WriteTo) != nullptr) { if (TChunk* writeTo = AtomicSwap(&slot->WriteTo, nullptr)) { return TLockedWriter(slot, writeTo); } } ++writerRotation; // Do a spinlock pause after a full cycle if (++cycle == Concurrency) { SpinLockPause(); cycle = 0; } } } void WriteOne(TLockedWriter& lock, T x) { Y_VERIFY_DEBUG(x != 0); const ui32 pos = AtomicLoad(&lock.Slot->WritePosition); if (pos != TChunk::EntriesCount) { AtomicStore(&lock.Slot->WritePosition, pos + 1); AtomicStore(&lock.WriteTo->Entries[pos], x); } else { TChunk* next = new TChunk(); AtomicStore(&next->Entries[0], x); AtomicStore(&lock.Slot->WritePosition, 1u); AtomicStore(&lock.WriteTo->Next, next); lock.WriteTo = next; } } public: TUnorderedCache() { for (ui32 i = 0; i < Concurrency; ++i) { ReadSlots[i].ReadFrom = new TChunk(); ReadSlots[i].ReadPosition = 0; WriteSlots[i].WriteTo = ReadSlots[i].ReadFrom; WriteSlots[i].WritePosition = 0; } } ~TUnorderedCache() { Y_VERIFY(!Pop(0)); for (ui64 i = 0; i < Concurrency; ++i) { if (ReadSlots[i].ReadFrom) { delete ReadSlots[i].ReadFrom; ReadSlots[i].ReadFrom = nullptr; } WriteSlots[i].WriteTo = nullptr; } } T Pop(ui64 readerRotation) noexcept { ui64 readerIndex = readerRotation; const ui64 endIndex = readerIndex + Concurrency; for (; readerIndex != endIndex; ++readerIndex) { TReadSlot* slot = &ReadSlots[readerIndex % Concurrency]; if (AtomicLoad(&slot->ReadFrom) != nullptr) { if (TChunk* readFrom = AtomicSwap(&slot->ReadFrom, nullptr)) { const ui32 pos = AtomicLoad(&slot->ReadPosition); if (pos != TChunk::EntriesCount) { if (T ret = AtomicLoad(&readFrom->Entries[pos])) { AtomicStore(&slot->ReadPosition, pos + 1); AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk return ret; // found, return } else { AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk } } else if (TChunk* next = AtomicLoad(&readFrom->Next)) { if (T ret = AtomicLoad(&next->Entries[0])) { AtomicStore(&slot->ReadPosition, 1u); AtomicStore(&slot->ReadFrom, next); // release lock with next chunk delete readFrom; return ret; } else { AtomicStore(&slot->ReadPosition, 0u); AtomicStore(&slot->ReadFrom, next); // release lock with new chunk delete readFrom; } } else { // nothing in old chunk and no next chunk, just release lock with old chunk AtomicStore(&slot->ReadFrom, readFrom); } } } } return 0; // got nothing after full cycle, return } void Push(T x, ui64 writerRotation) { TLockedWriter lock = LockWriter(writerRotation); WriteOne(lock, x); } void PushBulk(T* x, ui32 xcount, ui64 writerRotation) { for (;;) { // Fill no more then one queue chunk per round const ui32 xround = Min(xcount, (ui32)TChunk::EntriesCount); { TLockedWriter lock = LockWriter(writerRotation++); for (T* end = x + xround; x != end; ++x) WriteOne(lock, *x); } if (xcount <= TChunk::EntriesCount) break; xcount -= TChunk::EntriesCount; } } };