diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/util | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/util')
28 files changed, 3578 insertions, 0 deletions
diff --git a/library/cpp/actors/util/affinity.cpp b/library/cpp/actors/util/affinity.cpp new file mode 100644 index 0000000000..cc1b6e70ec --- /dev/null +++ b/library/cpp/actors/util/affinity.cpp @@ -0,0 +1,93 @@ +#include "affinity.h" + +#ifdef _linux_ +#include <sched.h> +#endif + +class TAffinity::TImpl { +#ifdef _linux_ + cpu_set_t Mask; +#endif +public: + TImpl() { +#ifdef _linux_ + int ar = sched_getaffinity(0, sizeof(cpu_set_t), &Mask); + Y_VERIFY_DEBUG(ar == 0); +#endif + } + + explicit TImpl(const ui8* cpus, ui32 size) { +#ifdef _linux_ + CPU_ZERO(&Mask); + for (ui32 i = 0; i != size; ++i) { + if (cpus[i]) { + CPU_SET(i, &Mask); + } + } +#else + Y_UNUSED(cpus); + Y_UNUSED(size); +#endif + } + + void Set() const { +#ifdef _linux_ + int ar = sched_setaffinity(0, sizeof(cpu_set_t), &Mask); + Y_VERIFY_DEBUG(ar == 0); +#endif + } + + operator TCpuMask() const { + TCpuMask result; +#ifdef _linux_ + for (ui32 i = 0; i != CPU_SETSIZE; ++i) { + result.Cpus.emplace_back(CPU_ISSET(i, &Mask)); + } + result.RemoveTrailingZeros(); +#endif + return result; + } + +}; + +TAffinity::TAffinity() { +} + +TAffinity::~TAffinity() { +} + +TAffinity::TAffinity(const ui8* x, ui32 sz) { + if (x && sz) { + Impl.Reset(new TImpl(x, sz)); + } +} + +TAffinity::TAffinity(const TCpuMask& mask) { + if (!mask.IsEmpty()) { + static_assert(sizeof(ui8) == sizeof(mask.Cpus[0])); + const ui8* x = reinterpret_cast<const ui8*>(&mask.Cpus[0]); + const ui32 sz = mask.Size(); + Impl.Reset(new TImpl(x, sz)); + } +} + +void TAffinity::Current() { + Impl.Reset(new TImpl()); +} + +void TAffinity::Set() const { + if (!!Impl) { + Impl->Set(); + } +} + +bool TAffinity::Empty() const { + return !Impl; +} + +TAffinity::operator TCpuMask() const { + if (!!Impl) { + return *Impl; + } + return TCpuMask(); +} diff --git a/library/cpp/actors/util/affinity.h b/library/cpp/actors/util/affinity.h new file mode 100644 index 0000000000..ae106ed180 --- /dev/null +++ b/library/cpp/actors/util/affinity.h @@ -0,0 +1,49 @@ +#pragma once + +#include "defs.h" +#include "cpumask.h" + +// Platform-specific class to set or get thread affinity +class TAffinity: public TThrRefBase, TNonCopyable { + class TImpl; + THolder<TImpl> Impl; + +public: + TAffinity(); + TAffinity(const ui8* cpus, ui32 size); + explicit TAffinity(const TCpuMask& mask); + ~TAffinity(); + + void Current(); + void Set() const; + bool Empty() const; + + operator TCpuMask() const; +}; + +// Scoped affinity setter +class TAffinityGuard : TNonCopyable { + bool Stacked; + TAffinity OldAffinity; + +public: + TAffinityGuard(const TAffinity* affinity) { + Stacked = false; + if (affinity && !affinity->Empty()) { + OldAffinity.Current(); + affinity->Set(); + Stacked = true; + } + } + + ~TAffinityGuard() { + Release(); + } + + void Release() { + if (Stacked) { + OldAffinity.Set(); + Stacked = false; + } + } +}; diff --git a/library/cpp/actors/util/cpumask.h b/library/cpp/actors/util/cpumask.h new file mode 100644 index 0000000000..29741aa1d6 --- /dev/null +++ b/library/cpp/actors/util/cpumask.h @@ -0,0 +1,133 @@ +#pragma once + +#include "defs.h" + +#include <library/cpp/containers/stack_vector/stack_vec.h> + +#include <util/string/split.h> +#include <util/generic/yexception.h> + +using TCpuId = ui32; + +// Simple data structure to operate with set of cpus +struct TCpuMask { + TStackVec<bool, 1024> Cpus; + + // Creates empty mask + TCpuMask() {} + + // Creates mask with single cpu set + explicit TCpuMask(TCpuId cpuId) { + Set(cpuId); + } + + // Initialize mask from raw boolean array + template <class T> + TCpuMask(const T* cpus, TCpuId size) { + Cpus.reserve(size); + for (TCpuId i = 0; i != size; ++i) { + Cpus.emplace_back(bool(cpus[i])); + } + } + + // Parse a numerical list of processors. The numbers are separated by commas and may include ranges. For example: 0,5,7,9-11 + explicit TCpuMask(const TString& cpuList) { + try { + for (TStringBuf s : StringSplitter(cpuList).Split(',')) { + TCpuId l, r; + if (s.find('-') != TString::npos) { + StringSplitter(s).Split('-').CollectInto(&l, &r); + } else { + l = r = FromString<TCpuId>(s); + } + if (r >= Cpus.size()) { + Cpus.resize(r + 1, false); + } + for (TCpuId cpu = l; cpu <= r; cpu++) { + Cpus[cpu] = true; + } + } + } catch (...) { + ythrow TWithBackTrace<yexception>() << "Exception occured while parsing cpu list '" << cpuList << "': " << CurrentExceptionMessage(); + } + } + + // Returns size of underlying vector + TCpuId Size() const { + return Cpus.size(); + } + + // Returns number of set bits in mask + TCpuId CpuCount() const { + TCpuId result = 0; + for (bool value : Cpus) { + result += value; + } + return result; + } + + bool IsEmpty() const { + for (bool value : Cpus) { + if (value) { + return false; + } + } + return true; + } + + bool IsSet(TCpuId cpu) const { + return cpu < Cpus.size() && Cpus[cpu]; + } + + void Set(TCpuId cpu) { + if (cpu >= Cpus.size()) { + Cpus.resize(cpu + 1, false); + } + Cpus[cpu] = true; + } + + void Reset(TCpuId cpu) { + if (cpu < Cpus.size()) { + Cpus[cpu] = false; + } + } + + void RemoveTrailingZeros() { + while (!Cpus.empty() && !Cpus.back()) { + Cpus.pop_back(); + } + } + + explicit operator bool() const { + return !IsEmpty(); + } + + TCpuMask operator &(const TCpuMask& rhs) const { + TCpuMask result; + TCpuId size = Max(Size(), rhs.Size()); + result.Cpus.reserve(size); + for (TCpuId cpu = 0; cpu < size; cpu++) { + result.Cpus.emplace_back(IsSet(cpu) && rhs.IsSet(cpu)); + } + return result; + } + + TCpuMask operator |(const TCpuMask& rhs) const { + TCpuMask result; + TCpuId size = Max(Size(), rhs.Size()); + result.Cpus.reserve(size); + for (TCpuId cpu = 0; cpu < size; cpu++) { + result.Cpus.emplace_back(IsSet(cpu) || rhs.IsSet(cpu)); + } + return result; + } + + TCpuMask operator -(const TCpuMask& rhs) const { + TCpuMask result; + result.Cpus.reserve(Size()); + for (TCpuId cpu = 0; cpu < Size(); cpu++) { + result.Cpus.emplace_back(IsSet(cpu) && !rhs.IsSet(cpu)); + } + return result; + } +}; diff --git a/library/cpp/actors/util/datetime.h b/library/cpp/actors/util/datetime.h new file mode 100644 index 0000000000..cbec5965d6 --- /dev/null +++ b/library/cpp/actors/util/datetime.h @@ -0,0 +1,82 @@ +#pragma once + +#include <util/system/defaults.h> +#include <util/system/hp_timer.h> +#include <util/system/platform.h> + +#if defined(_win_) +#include <intrin.h> +#pragma intrinsic(__rdtsc) +#endif // _win_ + +#if defined(_darwin_) && !defined(_x86_) +#include <mach/mach_time.h> +#endif + +// GetCycleCount() from util/system/datetime.h uses rdtscp, which is more accurate than rdtsc, +// but rdtscp disables processor's out-of-order execution, so it can be slow +Y_FORCE_INLINE ui64 GetCycleCountFast() { +#if defined(_MSC_VER) + // Generates the rdtsc instruction, which returns the processor time stamp. + // The processor time stamp records the number of clock cycles since the last reset. + return __rdtsc(); +#elif defined(__clang__) && !defined(_arm64_) + return __builtin_readcyclecounter(); +#elif defined(_x86_64_) + unsigned hi, lo; + __asm__ __volatile__("rdtsc" + : "=a"(lo), "=d"(hi)); + return ((unsigned long long)lo) | (((unsigned long long)hi) << 32); +#elif defined(_i386_) + ui64 x; + __asm__ volatile("rdtsc\n\t" + : "=A"(x)); + return x; +#elif defined(_darwin_) + return mach_absolute_time(); +#elif defined(_arm32_) + return MicroSeconds(); +#elif defined(_arm64_) + ui64 x; + + __asm__ __volatile__("isb; mrs %0, cntvct_el0" + : "=r"(x)); + + return x; +#else +#error "unsupported arch" +#endif +} + +// NHPTimer::GetTime fast analog +Y_FORCE_INLINE void GetTimeFast(NHPTimer::STime* pTime) noexcept { + *pTime = GetCycleCountFast(); +} + +namespace NActors { + inline double Ts2Ns(ui64 ts) { + return NHPTimer::GetSeconds(ts) * 1e9; + } + + inline double Ts2Us(ui64 ts) { + return NHPTimer::GetSeconds(ts) * 1e6; + } + + inline double Ts2Ms(ui64 ts) { + return NHPTimer::GetSeconds(ts) * 1e3; + } + + inline ui64 Us2Ts(double us) { + return ui64(NHPTimer::GetClockRate() * us / 1e6); + } + + struct TTimeTracker { + ui64 Ts; + TTimeTracker(): Ts(GetCycleCountFast()) {} + ui64 Elapsed() { + ui64 ts = GetCycleCountFast(); + std::swap(Ts, ts); + return Ts - ts; + } + }; +} diff --git a/library/cpp/actors/util/defs.h b/library/cpp/actors/util/defs.h new file mode 100644 index 0000000000..5c3b57665b --- /dev/null +++ b/library/cpp/actors/util/defs.h @@ -0,0 +1,16 @@ +#pragma once + +// unique tag to fix pragma once gcc glueing: ./library/actors/util/defs.h + +#include <util/system/defaults.h> +#include <util/generic/bt_exception.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/yexception.h> +#include <util/system/atomic.h> +#include <util/system/align.h> +#include <util/generic/vector.h> +#include <util/datetime/base.h> +#include <util/generic/ylimits.h> +#include "intrinsics.h" diff --git a/library/cpp/actors/util/funnel_queue.h b/library/cpp/actors/util/funnel_queue.h new file mode 100644 index 0000000000..0e21e2617c --- /dev/null +++ b/library/cpp/actors/util/funnel_queue.h @@ -0,0 +1,240 @@ +#pragma once + +#include <util/system/atomic.h> +#include <util/generic/noncopyable.h> + +template <typename ElementType> +class TFunnelQueue: private TNonCopyable { +public: + TFunnelQueue() noexcept + : Front(nullptr) + , Back(nullptr) + { + } + + virtual ~TFunnelQueue() noexcept { + for (auto entry = Front; entry; entry = DeleteEntry(entry)) + continue; + } + + /// Push element. Can be used from many threads. Return true if is first element. + bool + Push(ElementType&& element) noexcept { + TEntry* const next = NewEntry(static_cast<ElementType&&>(element)); + TEntry* const prev = AtomicSwap(&Back, next); + AtomicSet(prev ? prev->Next : Front, next); + return !prev; + } + + /// Extract top element. Must be used only from one thread. Return true if have more. + bool + Pop() noexcept { + if (TEntry* const top = AtomicGet(Front)) { + const auto last = AtomicCas(&Back, nullptr, top); + if (last) // This is last element in queue. Queue is empty now. + AtomicCas(&Front, nullptr, top); + else // This element is not last. + for (;;) { + if (const auto next = AtomicGet(top->Next)) { + AtomicSet(Front, next); + break; + } + // But Next is null. Wait next assignment in spin lock. + } + + DeleteEntry(top); + return !last; + } + + return false; + } + + /// Peek top element. Must be used only from one thread. + ElementType& + Top() const noexcept { + return AtomicGet(Front)->Data; + } + + bool + IsEmpty() const noexcept { + return !AtomicGet(Front); + } + +protected: + class TEntry: private TNonCopyable { + friend class TFunnelQueue; + + private: + explicit TEntry(ElementType&& element) noexcept + : Data(static_cast<ElementType&&>(element)) + , Next(nullptr) + { + } + + ~TEntry() noexcept { + } + + public: + ElementType Data; + TEntry* volatile Next; + }; + + TEntry* volatile Front; + TEntry* volatile Back; + + virtual TEntry* NewEntry(ElementType&& element) noexcept { + return new TEntry(static_cast<ElementType&&>(element)); + } + + virtual TEntry* DeleteEntry(TEntry* entry) noexcept { + const auto next = entry->Next; + delete entry; + return next; + } + +protected: + struct TEntryIter { + TEntry* ptr; + + ElementType& operator*() { + return ptr->Data; + } + + ElementType* operator->() { + return &ptr->Data; + } + + TEntryIter& operator++() { + ptr = AtomicGet(ptr->Next); + return *this; + } + + bool operator!=(const TEntryIter& other) const { + return ptr != other.ptr; + } + + bool operator==(const TEntryIter& other) const { + return ptr == other.ptr; + } + }; + + struct TConstEntryIter { + const TEntry* ptr; + + const ElementType& operator*() { + return ptr->Data; + } + + const ElementType* operator->() { + return &ptr->Data; + } + + TEntryIter& operator++() { + ptr = AtomicGet(ptr->Next); + return *this; + } + + bool operator!=(const TConstEntryIter& other) const { + return ptr != other.ptr; + } + + bool operator==(const TConstEntryIter& other) const { + return ptr == other.ptr; + } + }; + +public: + using const_iterator = TConstEntryIter; + using iterator = TEntryIter; + + iterator begin() { + return {AtomicGet(Front)}; + } + const_iterator cbegin() { + return {AtomicGet(Front)}; + } + const_iterator begin() const { + return {AtomicGet(Front)}; + } + + iterator end() { + return {nullptr}; + } + const_iterator cend() { + return {nullptr}; + } + const_iterator end() const { + return {nullptr}; + } +}; + +template <typename ElementType> +class TPooledFunnelQueue: public TFunnelQueue<ElementType> { +public: + TPooledFunnelQueue() noexcept + : Stack(nullptr) + { + } + + virtual ~TPooledFunnelQueue() noexcept override { + for (auto entry = TBase::Front; entry; entry = TBase::DeleteEntry(entry)) + continue; + for (auto entry = Stack; entry; entry = TBase::DeleteEntry(entry)) + continue; + TBase::Back = TBase::Front = Stack = nullptr; + } + +private: + typedef TFunnelQueue<ElementType> TBase; + + typename TBase::TEntry* volatile Stack; + +protected: + virtual typename TBase::TEntry* NewEntry(ElementType&& element) noexcept override { + while (const auto top = AtomicGet(Stack)) + if (AtomicCas(&Stack, top->Next, top)) { + top->Data = static_cast<ElementType&&>(element); + AtomicSet(top->Next, nullptr); + return top; + } + + return TBase::NewEntry(static_cast<ElementType&&>(element)); + } + + virtual typename TBase::TEntry* DeleteEntry(typename TBase::TEntry* entry) noexcept override { + entry->Data = ElementType(); + const auto next = entry->Next; + do + AtomicSet(entry->Next, AtomicGet(Stack)); + while (!AtomicCas(&Stack, entry, entry->Next)); + return next; + } +}; + +template <typename ElementType, template <typename T> class TQueueType = TFunnelQueue> +class TCountedFunnelQueue: public TQueueType<ElementType> { +public: + TCountedFunnelQueue() noexcept + : Count(0) + { + } + + TAtomicBase GetSize() const noexcept { + return AtomicGet(Count); + } + +private: + typedef TQueueType<ElementType> TBase; + + virtual typename TBase::TEntry* NewEntry(ElementType&& element) noexcept override { + AtomicAdd(Count, 1); + return TBase::NewEntry(static_cast<ElementType&&>(element)); + } + + virtual typename TBase::TEntry* DeleteEntry(typename TBase::TEntry* entry) noexcept override { + AtomicSub(Count, 1); + return TBase::DeleteEntry(entry); + } + + TAtomic Count; +}; diff --git a/library/cpp/actors/util/futex.h b/library/cpp/actors/util/futex.h new file mode 100644 index 0000000000..c193f8d128 --- /dev/null +++ b/library/cpp/actors/util/futex.h @@ -0,0 +1,13 @@ +#pragma once + +#ifdef _linux_ + +#include <linux/futex.h> +#include <unistd.h> +#include <sys/syscall.h> + +static long SysFutex(void* addr1, int op, int val1, struct timespec* timeout, void* addr2, int val3) { + return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); +} + +#endif diff --git a/library/cpp/actors/util/intrinsics.h b/library/cpp/actors/util/intrinsics.h new file mode 100644 index 0000000000..df07e36896 --- /dev/null +++ b/library/cpp/actors/util/intrinsics.h @@ -0,0 +1,97 @@ +#pragma once + +#include <util/system/defaults.h> +#include <util/system/atomic.h> +#include <util/system/spinlock.h> + +#include <library/cpp/sse/sse.h> // The header chooses appropriate SSE support + +static_assert(sizeof(TAtomic) == 8, "expect sizeof(TAtomic) == 8"); + +// we need explicit 32 bit operations to keep cache-line friendly packs +// so have to define some atomics additionaly to arcadia one +#ifdef _win_ +#pragma intrinsic(_InterlockedCompareExchange) +#pragma intrinsic(_InterlockedExchangeAdd) +#pragma intrinsic(_InterlockedIncrement) +#pragma intrinsic(_InterlockedDecrement) +#endif + +inline bool AtomicUi32Cas(volatile ui32* a, ui32 exchange, ui32 compare) { +#ifdef _win_ + return _InterlockedCompareExchange((volatile long*)a, exchange, compare) == (long)compare; +#else + ui32 expected = compare; + return __atomic_compare_exchange_n(a, &expected, exchange, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); +#endif +} + +inline ui32 AtomicUi32Add(volatile ui32* a, ui32 add) { +#ifdef _win_ + return _InterlockedExchangeAdd((volatile long*)a, add) + add; +#else + return __atomic_add_fetch(a, add, __ATOMIC_SEQ_CST); +#endif +} + +inline ui32 AtomicUi32Sub(volatile ui32* a, ui32 sub) { +#ifdef _win_ + return _InterlockedExchangeAdd((volatile long*)a, -(long)sub) - sub; +#else + return __atomic_sub_fetch(a, sub, __ATOMIC_SEQ_CST); +#endif +} + +inline ui32 AtomicUi32Increment(volatile ui32* a) { +#ifdef _win_ + return _InterlockedIncrement((volatile long*)a); +#else + return __atomic_add_fetch(a, 1, __ATOMIC_SEQ_CST); +#endif +} + +inline ui32 AtomicUi32Decrement(volatile ui32* a) { +#ifdef _win_ + return _InterlockedDecrement((volatile long*)a); +#else + return __atomic_sub_fetch(a, 1, __ATOMIC_SEQ_CST); +#endif +} + +template <typename T> +inline void AtomicStore(volatile T* a, T x) { + static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); +#ifdef _win_ + *a = x; +#else + __atomic_store_n(a, x, __ATOMIC_RELEASE); +#endif +} + +template <typename T> +inline void RelaxedStore(volatile T* a, T x) { + static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); +#ifdef _win_ + *a = x; +#else + __atomic_store_n(a, x, __ATOMIC_RELAXED); +#endif +} + +template <typename T> +inline T AtomicLoad(volatile T* a) { +#ifdef _win_ + return *a; +#else + return __atomic_load_n(a, __ATOMIC_ACQUIRE); +#endif +} + +template <typename T> +inline T RelaxedLoad(volatile T* a) { +#ifdef _win_ + return *a; +#else + return __atomic_load_n(a, __ATOMIC_RELAXED); +#endif +} diff --git a/library/cpp/actors/util/local_process_key.h b/library/cpp/actors/util/local_process_key.h new file mode 100644 index 0000000000..172f08fc73 --- /dev/null +++ b/library/cpp/actors/util/local_process_key.h @@ -0,0 +1,132 @@ +#pragma once + +#include <util/string/builder.h> +#include <util/generic/strbuf.h> +#include <util/generic/vector.h> +#include <util/generic/hash.h> +#include <util/generic/singleton.h> +#include <util/generic/serialized_enum.h> + +template <typename T> +class TLocalProcessKeyState { + +template <typename U, const char* Name> +friend class TLocalProcessKey; +template <typename U, typename EnumT> +friend class TEnumProcessKey; + +public: + static TLocalProcessKeyState& GetInstance() { + return *Singleton<TLocalProcessKeyState<T>>(); + } + + size_t GetCount() const { + return StartIndex + Names.size(); + } + + TStringBuf GetNameByIndex(size_t index) const { + if (index < StartIndex) { + return StaticNames[index]; + } else { + index -= StartIndex; + Y_ENSURE(index < Names.size()); + return Names[index]; + } + } + + size_t GetIndexByName(TStringBuf name) const { + auto it = Map.find(name); + Y_ENSURE(it != Map.end()); + return it->second; + } + +private: + size_t Register(TStringBuf name) { + auto x = Map.emplace(name, Names.size()+StartIndex); + if (x.second) { + Names.emplace_back(name); + } + + return x.first->second; + } + + size_t Register(TStringBuf name, ui32 index) { + Y_VERIFY(index < StartIndex); + auto x = Map.emplace(name, index); + Y_VERIFY(x.second || x.first->second == index); + StaticNames[index] = name; + return x.first->second; + } + +private: + static constexpr ui32 StartIndex = 2000; + + TVector<TString> FillStaticNames() { + TVector<TString> staticNames; + staticNames.reserve(StartIndex); + for (ui32 i = 0; i < StartIndex; i++) { + staticNames.push_back(TStringBuilder() << "Activity_" << i); + } + return staticNames; + } + + TVector<TString> StaticNames = FillStaticNames(); + TVector<TString> Names; + THashMap<TString, size_t> Map; +}; + +template <typename T, const char* Name> +class TLocalProcessKey { +public: + static TStringBuf GetName() { + return Name; + } + + static size_t GetIndex() { + return Index; + } + +private: + inline static size_t Index = TLocalProcessKeyState<T>::GetInstance().Register(Name); +}; + +template <typename T, typename EnumT> +class TEnumProcessKey { +public: + static TStringBuf GetName(EnumT key) { + return TLocalProcessKeyState<T>::GetInstance().GetNameByIndex(GetIndex(key)); + } + + static size_t GetIndex(EnumT key) { + ui32 index = static_cast<ui32>(key); + if (index < TLocalProcessKeyState<T>::StartIndex) { + return index; + } + Y_VERIFY(index < Enum2Index.size()); + return Enum2Index[index]; + } + +private: + inline static TVector<size_t> RegisterAll() { + static_assert(std::is_enum<EnumT>::value, "Enum is required"); + + TVector<size_t> enum2Index; + auto names = GetEnumNames<EnumT>(); + ui32 maxId = 0; + for (const auto& [k, v] : names) { + maxId = Max(maxId, static_cast<ui32>(k)); + } + enum2Index.resize(maxId+1); + for (ui32 i = 0; i <= maxId && i < TLocalProcessKeyState<T>::StartIndex; i++) { + enum2Index[i] = i; + } + + for (const auto& [k, v] : names) { + ui32 enumId = static_cast<ui32>(k); + enum2Index[enumId] = TLocalProcessKeyState<T>::GetInstance().Register(v, enumId); + } + return enum2Index; + } + + inline static TVector<size_t> Enum2Index = RegisterAll(); +}; diff --git a/library/cpp/actors/util/named_tuple.h b/library/cpp/actors/util/named_tuple.h new file mode 100644 index 0000000000..67f185bba8 --- /dev/null +++ b/library/cpp/actors/util/named_tuple.h @@ -0,0 +1,30 @@ +#pragma once + +#include "defs.h" + +template <typename TDerived> +struct TNamedTupleBase { + friend bool operator==(const TDerived& x, const TDerived& y) { + return x.ConvertToTuple() == y.ConvertToTuple(); + } + + friend bool operator!=(const TDerived& x, const TDerived& y) { + return x.ConvertToTuple() != y.ConvertToTuple(); + } + + friend bool operator<(const TDerived& x, const TDerived& y) { + return x.ConvertToTuple() < y.ConvertToTuple(); + } + + friend bool operator<=(const TDerived& x, const TDerived& y) { + return x.ConvertToTuple() <= y.ConvertToTuple(); + } + + friend bool operator>(const TDerived& x, const TDerived& y) { + return x.ConvertToTuple() > y.ConvertToTuple(); + } + + friend bool operator>=(const TDerived& x, const TDerived& y) { + return x.ConvertToTuple() >= y.ConvertToTuple(); + } +}; diff --git a/library/cpp/actors/util/queue_chunk.h b/library/cpp/actors/util/queue_chunk.h new file mode 100644 index 0000000000..8a4e02d8cb --- /dev/null +++ b/library/cpp/actors/util/queue_chunk.h @@ -0,0 +1,29 @@ +#pragma once + +#include "defs.h" + +template <typename T, ui32 TSize, typename TDerived> +struct TQueueChunkDerived { + static const ui32 EntriesCount = (TSize - sizeof(TQueueChunkDerived*)) / sizeof(T); + static_assert(EntriesCount > 0, "expect EntriesCount > 0"); + + volatile T Entries[EntriesCount]; + TDerived* volatile Next; + + TQueueChunkDerived() { + memset(this, 0, sizeof(TQueueChunkDerived)); + } +}; + +template <typename T, ui32 TSize> +struct TQueueChunk { + static const ui32 EntriesCount = (TSize - sizeof(TQueueChunk*)) / sizeof(T); + static_assert(EntriesCount > 0, "expect EntriesCount > 0"); + + volatile T Entries[EntriesCount]; + TQueueChunk* volatile Next; + + TQueueChunk() { + memset(this, 0, sizeof(TQueueChunk)); + } +}; diff --git a/library/cpp/actors/util/queue_oneone_inplace.h b/library/cpp/actors/util/queue_oneone_inplace.h new file mode 100644 index 0000000000..d7ec8bb21c --- /dev/null +++ b/library/cpp/actors/util/queue_oneone_inplace.h @@ -0,0 +1,118 @@ +#pragma once + +#include "defs.h" +#include "queue_chunk.h" + +template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>> +class TOneOneQueueInplace : TNonCopyable { + static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::valuer"); + + TChunk* ReadFrom; + ui32 ReadPosition; + ui32 WritePosition; + TChunk* WriteTo; + + friend class TReadIterator; + +public: + class TReadIterator { + TChunk* ReadFrom; + ui32 ReadPosition; + + public: + TReadIterator(TChunk* readFrom, ui32 readPosition) + : ReadFrom(readFrom) + , ReadPosition(readPosition) + { + } + + inline T Next() { + TChunk* head = ReadFrom; + if (ReadPosition != TChunk::EntriesCount) { + return AtomicLoad(&head->Entries[ReadPosition++]); + } else if (TChunk* next = AtomicLoad(&head->Next)) { + ReadFrom = next; + ReadPosition = 0; + return Next(); + } + return T{}; + } + }; + + TOneOneQueueInplace() + : ReadFrom(new TChunk()) + , ReadPosition(0) + , WritePosition(0) + , WriteTo(ReadFrom) + { + } + + ~TOneOneQueueInplace() { + Y_VERIFY_DEBUG(Head() == 0); + delete ReadFrom; + } + + struct TPtrCleanDestructor { + static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept { + while (T head = x->Pop()) + delete head; + delete x; + } + }; + + struct TCleanDestructor { + static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept { + while (x->Pop() != nullptr) + continue; + delete x; + } + }; + + struct TPtrCleanInplaceMallocDestructor { + template <typename TPtrVal> + static inline void Destroy(TOneOneQueueInplace<TPtrVal*, TSize>* x) noexcept { + while (TPtrVal* head = x->Pop()) { + head->~TPtrVal(); + free(head); + } + delete x; + } + }; + + void Push(T x) noexcept { + if (WritePosition != TChunk::EntriesCount) { + AtomicStore(&WriteTo->Entries[WritePosition], x); + ++WritePosition; + } else { + TChunk* next = new TChunk(); + next->Entries[0] = x; + AtomicStore(&WriteTo->Next, next); + WriteTo = next; + WritePosition = 1; + } + } + + T Head() { + TChunk* head = ReadFrom; + if (ReadPosition != TChunk::EntriesCount) { + return AtomicLoad(&head->Entries[ReadPosition]); + } else if (TChunk* next = AtomicLoad(&head->Next)) { + ReadFrom = next; + delete head; + ReadPosition = 0; + return Head(); + } + return T{}; + } + + T Pop() { + T ret = Head(); + if (ret) + ++ReadPosition; + return ret; + } + + TReadIterator Iterator() { + return TReadIterator(ReadFrom, ReadPosition); + } +}; diff --git a/library/cpp/actors/util/recentwnd.h b/library/cpp/actors/util/recentwnd.h new file mode 100644 index 0000000000..ba1ede6f29 --- /dev/null +++ b/library/cpp/actors/util/recentwnd.h @@ -0,0 +1,67 @@ +#pragma once + +#include <util/generic/deque.h> + +template <typename TElem, + template <typename, typename...> class TContainer = TDeque> +class TRecentWnd { +public: + TRecentWnd(ui32 wndSize) + : MaxWndSize_(wndSize) + { + } + + void Push(const TElem& elem) { + if (Window_.size() == MaxWndSize_) + Window_.erase(Window_.begin()); + Window_.emplace_back(elem); + } + + void Push(TElem&& elem) { + if (Window_.size() == MaxWndSize_) + Window_.erase(Window_.begin()); + Window_.emplace_back(std::move(elem)); + } + + TElem& Last() { + return Window_.back(); + } + const TElem& Last() const { + return Window_.back(); + } + bool Full() const { + return Window_.size() == MaxWndSize_; + } + ui64 Size() const { + return Window_.size(); + } + + using const_iterator = typename TContainer<TElem>::const_iterator; + + const_iterator begin() { + return Window_.begin(); + } + const_iterator end() { + return Window_.end(); + } + + void Reset(ui32 wndSize = 0) { + Window_.clear(); + if (wndSize != 0) { + MaxWndSize_ = wndSize; + } + } + + void ResetWnd(ui32 wndSize) { + Y_VERIFY(wndSize != 0); + MaxWndSize_ = wndSize; + if (Window_.size() > MaxWndSize_) { + Window_.erase(Window_.begin(), + Window_.begin() + Window_.size() - MaxWndSize_); + } + } + +private: + TContainer<TElem> Window_; + ui32 MaxWndSize_; +}; diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h new file mode 100644 index 0000000000..f5595efbaa --- /dev/null +++ b/library/cpp/actors/util/rope.h @@ -0,0 +1,1161 @@ +#pragma once + +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/hash_set.h> +#include <util/stream/str.h> +#include <util/system/sanitizers.h> +#include <util/system/valgrind.h> + +// exactly one of them must be included +#include "rope_cont_list.h" +//#include "rope_cont_deque.h" + +struct IRopeChunkBackend : TThrRefBase { + using TData = std::tuple<const char*, size_t>; + virtual ~IRopeChunkBackend() = default; + virtual TData GetData() const = 0; + virtual size_t GetCapacity() const = 0; + using TPtr = TIntrusivePtr<IRopeChunkBackend>; +}; + +class TRopeAlignedBuffer : public IRopeChunkBackend { + static constexpr size_t Alignment = 16; + static constexpr size_t MallocAlignment = sizeof(size_t); + + ui32 Size; + const ui32 Capacity; + const ui32 Offset; + alignas(Alignment) char Data[]; + + TRopeAlignedBuffer(size_t size) + : Size(size) + , Capacity(size) + , Offset((Alignment - reinterpret_cast<uintptr_t>(Data)) & (Alignment - 1)) + { + Y_VERIFY(Offset <= Alignment - MallocAlignment); + } + +public: + static TIntrusivePtr<TRopeAlignedBuffer> Allocate(size_t size) { + return new(malloc(sizeof(TRopeAlignedBuffer) + size + Alignment - MallocAlignment)) TRopeAlignedBuffer(size); + } + + void *operator new(size_t) { + Y_FAIL(); + } + + void *operator new(size_t, void *ptr) { + return ptr; + } + + void operator delete(void *ptr) { + free(ptr); + } + + void operator delete(void* p, void* ptr) { + Y_UNUSED(p); + Y_UNUSED(ptr); + } + + TData GetData() const override { + return {Data + Offset, Size}; + } + + size_t GetCapacity() const override { + return Capacity; + } + + char *GetBuffer() { + return Data + Offset; + } + + void AdjustSize(size_t size) { + Y_VERIFY(size <= Capacity); + Size = size; + } +}; + +namespace NRopeDetails { + + template<bool IsConst, typename TRope, typename TList> + struct TIteratorTraits; + + template<typename TRope, typename TList> + struct TIteratorTraits<true, TRope, TList> { + using TRopePtr = const TRope*; + using TListIterator = typename TList::const_iterator; + }; + + template<typename TRope, typename TList> + struct TIteratorTraits<false, TRope, TList> { + using TRopePtr = TRope*; + using TListIterator = typename TList::iterator; + }; + +} // NRopeDetails + +class TRopeArena; + +template<typename T> +struct always_false : std::false_type {}; + +class TRope { + friend class TRopeArena; + + struct TChunk + { + class TBackend { + enum class EType : uintptr_t { + STRING, + ROPE_CHUNK_BACKEND, + }; + + uintptr_t Owner = 0; // lower bits contain type of the owner + + public: + TBackend() = delete; + + TBackend(const TBackend& other) + : Owner(Clone(other.Owner)) + {} + + TBackend(TBackend&& other) + : Owner(std::exchange(other.Owner, 0)) + {} + + TBackend(TString s) + : Owner(Construct<TString>(EType::STRING, std::move(s))) + {} + + TBackend(IRopeChunkBackend::TPtr backend) + : Owner(Construct<IRopeChunkBackend::TPtr>(EType::ROPE_CHUNK_BACKEND, std::move(backend))) + {} + + ~TBackend() { + if (Owner) { + Destroy(Owner); + } + } + + TBackend& operator =(const TBackend& other) { + if (Owner) { + Destroy(Owner); + } + Owner = Clone(other.Owner); + return *this; + } + + TBackend& operator =(TBackend&& other) { + if (Owner) { + Destroy(Owner); + } + Owner = std::exchange(other.Owner, 0); + return *this; + } + + bool operator ==(const TBackend& other) const { + return Owner == other.Owner; + } + + const void *UniqueId() const { + return reinterpret_cast<const void*>(Owner); + } + + const IRopeChunkBackend::TData GetData() const { + return Visit(Owner, [](EType, auto& value) -> IRopeChunkBackend::TData { + using T = std::decay_t<decltype(value)>; + if constexpr (std::is_same_v<T, TString>) { + return {value.data(), value.size()}; + } else if constexpr (std::is_same_v<T, IRopeChunkBackend::TPtr>) { + return value->GetData(); + } else { + return {}; + } + }); + } + + size_t GetCapacity() const { + return Visit(Owner, [](EType, auto& value) { + using T = std::decay_t<decltype(value)>; + if constexpr (std::is_same_v<T, TString>) { + return value.capacity(); + } else if constexpr (std::is_same_v<T, IRopeChunkBackend::TPtr>) { + return value->GetCapacity(); + } else { + Y_FAIL(); + } + }); + } + + private: + static constexpr uintptr_t TypeMask = (1 << 3) - 1; + static constexpr uintptr_t ValueMask = ~TypeMask; + + template<typename T> + struct TObjectHolder { + struct TWrappedObject : TThrRefBase { + T Value; + TWrappedObject(T&& value) + : Value(std::move(value)) + {} + }; + TIntrusivePtr<TWrappedObject> Object; + + TObjectHolder(T&& object) + : Object(MakeIntrusive<TWrappedObject>(std::move(object))) + {} + }; + + template<typename TObject> + static uintptr_t Construct(EType type, TObject object) { + if constexpr (sizeof(TObject) <= sizeof(uintptr_t)) { + uintptr_t res = 0; + new(&res) TObject(std::move(object)); + Y_VERIFY_DEBUG((res & ValueMask) == res); + return res | static_cast<uintptr_t>(type); + } else { + return Construct<TObjectHolder<TObject>>(type, TObjectHolder<TObject>(std::move(object))); + } + } + + template<typename TCallback> + static std::invoke_result_t<TCallback, EType, TString&> VisitRaw(uintptr_t value, TCallback&& callback) { + Y_VERIFY_DEBUG(value); + const EType type = static_cast<EType>(value & TypeMask); + value &= ValueMask; + auto caller = [&](auto& value) { return std::invoke(std::forward<TCallback>(callback), type, value); }; + auto wrapper = [&](auto& value) { + using T = std::decay_t<decltype(value)>; + if constexpr (sizeof(T) <= sizeof(uintptr_t)) { + return caller(value); + } else { + return caller(reinterpret_cast<TObjectHolder<T>&>(value)); + } + }; + switch (type) { + case EType::STRING: return wrapper(reinterpret_cast<TString&>(value)); + case EType::ROPE_CHUNK_BACKEND: return wrapper(reinterpret_cast<IRopeChunkBackend::TPtr&>(value)); + } + Y_FAIL("Unexpected type# %" PRIu64, static_cast<ui64>(type)); + } + + template<typename TCallback> + static std::invoke_result_t<TCallback, EType, TString&> Visit(uintptr_t value, TCallback&& callback) { + return VisitRaw(value, [&](EType type, auto& value) { + return std::invoke(std::forward<TCallback>(callback), type, Unwrap(value)); + }); + } + + template<typename T> static T& Unwrap(T& object) { return object; } + template<typename T> static T& Unwrap(TObjectHolder<T>& holder) { return holder.Object->Value; } + + static uintptr_t Clone(uintptr_t value) { + return VisitRaw(value, [](EType type, auto& value) { return Construct(type, value); }); + } + + static void Destroy(uintptr_t value) { + VisitRaw(value, [](EType, auto& value) { CallDtor(value); }); + } + + template<typename T> + static void CallDtor(T& value) { + value.~T(); + } + }; + + TBackend Backend; // who actually holds the data + const char *Begin; // data start + const char *End; // data end + + static constexpr struct TSlice {} Slice{}; + + template<typename T> + TChunk(T&& backend, const IRopeChunkBackend::TData& data) + : Backend(std::move(backend)) + , Begin(std::get<0>(data)) + , End(Begin + std::get<1>(data)) + { + Y_VERIFY_DEBUG(Begin != End); + } + + TChunk(TString s) + : Backend(std::move(s)) + { + size_t size; + std::tie(Begin, size) = Backend.GetData(); + End = Begin + size; + } + + TChunk(IRopeChunkBackend::TPtr backend) + : TChunk(backend, backend->GetData()) + {} + + TChunk(TSlice, const char *data, size_t size, const TChunk& from) + : TChunk(from.Backend, {data, size}) + {} + + TChunk(TSlice, const char *begin, const char *end, const TChunk& from) + : TChunk(Slice, begin, end - begin, from) + {} + + explicit TChunk(const TChunk& other) + : Backend(other.Backend) + , Begin(other.Begin) + , End(other.End) + {} + + TChunk(TChunk&& other) + : Backend(std::move(other.Backend)) + , Begin(other.Begin) + , End(other.End) + {} + + TChunk& operator =(const TChunk&) = default; + TChunk& operator =(TChunk&&) = default; + + size_t GetSize() const { + return End - Begin; + } + + static void Clear(TChunk& chunk) { + chunk.Begin = nullptr; + } + + static bool IsInUse(const TChunk& chunk) { + return chunk.Begin != nullptr; + } + + size_t GetCapacity() const { + return Backend.GetCapacity(); + } + }; + + using TChunkList = NRopeDetails::TChunkList<TChunk>; + +private: + // we use list here to store chain items as we have to keep valid iterators when erase/insert operations are invoked; + // iterator uses underlying container's iterator, so we have to use container that keeps valid iterators on delete, + // thus, the list + TChunkList Chain; + size_t Size = 0; + +private: + template<bool IsConst> + class TIteratorImpl { + using TTraits = NRopeDetails::TIteratorTraits<IsConst, TRope, TChunkList>; + + typename TTraits::TRopePtr Rope; + typename TTraits::TListIterator Iter; + const char *Ptr; // ptr is always nullptr when iterator is positioned at the rope end + +#ifndef NDEBUG + ui32 ValidityToken; +#endif + + private: + TIteratorImpl(typename TTraits::TRopePtr rope, typename TTraits::TListIterator iter, const char *ptr = nullptr) + : Rope(rope) + , Iter(iter) + , Ptr(ptr) +#ifndef NDEBUG + , ValidityToken(Rope->GetValidityToken()) +#endif + {} + + public: + TIteratorImpl() + : Rope(nullptr) + , Ptr(nullptr) + {} + + template<bool IsOtherConst> + TIteratorImpl(const TIteratorImpl<IsOtherConst>& other) + : Rope(other.Rope) + , Iter(other.Iter) + , Ptr(other.Ptr) +#ifndef NDEBUG + , ValidityToken(other.ValidityToken) +#endif + {} + + void CheckValid() const { +#ifndef NDEBUG + Y_VERIFY(ValidityToken == Rope->GetValidityToken()); +#endif + } + + TIteratorImpl& operator +=(size_t amount) { + CheckValid(); + + while (amount) { + Y_VERIFY_DEBUG(Valid()); + const size_t max = ContiguousSize(); + const size_t num = std::min(amount, max); + amount -= num; + Ptr += num; + if (Ptr == Iter->End) { + AdvanceToNextContiguousBlock(); + } + } + + return *this; + } + + TIteratorImpl operator +(size_t amount) const { + CheckValid(); + + return TIteratorImpl(*this) += amount; + } + + TIteratorImpl& operator -=(size_t amount) { + CheckValid(); + + while (amount) { + const size_t num = Ptr ? std::min<size_t>(amount, Ptr - Iter->Begin) : 0; + amount -= num; + Ptr -= num; + if (amount) { + Y_VERIFY_DEBUG(Iter != GetChainBegin()); + --Iter; + Ptr = Iter->End; + } + } + + return *this; + } + + TIteratorImpl operator -(size_t amount) const { + CheckValid(); + return TIteratorImpl(*this) -= amount; + } + + std::pair<const char*, size_t> operator *() const { + return {ContiguousData(), ContiguousSize()}; + } + + TIteratorImpl& operator ++() { + AdvanceToNextContiguousBlock(); + return *this; + } + + TIteratorImpl operator ++(int) const { + auto it(*this); + it.AdvanceToNextContiguousBlock(); + return it; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Operation with contiguous data + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + // Get the pointer to the contiguous block of data; valid locations are [Data; Data + Size). + const char *ContiguousData() const { + CheckValid(); + return Ptr; + } + + // Get the amount of contiguous block. + size_t ContiguousSize() const { + CheckValid(); + return Ptr ? Iter->End - Ptr : 0; + } + + size_t ChunkOffset() const { + return Ptr ? Ptr - Iter->Begin : 0; + } + + // Advance to next contiguous block of data. + void AdvanceToNextContiguousBlock() { + CheckValid(); + Y_VERIFY_DEBUG(Valid()); + ++Iter; + Ptr = Iter != GetChainEnd() ? Iter->Begin : nullptr; + } + + // Extract some data and advance. Size is not checked here, to it must be provided valid. + void ExtractPlainDataAndAdvance(void *buffer, size_t len) { + CheckValid(); + + while (len) { + Y_VERIFY_DEBUG(Ptr); + + // calculate amount of bytes we need to move + const size_t max = ContiguousSize(); + const size_t num = std::min(len, max); + + // copy data to the buffer and advance buffer pointers + memcpy(buffer, Ptr, num); + buffer = static_cast<char*>(buffer) + num; + len -= num; + + // advance iterator itself + Ptr += num; + if (Ptr == Iter->End) { + AdvanceToNextContiguousBlock(); + } + } + } + + // Checks if the iterator points to the end of the rope or not. + bool Valid() const { + CheckValid(); + return Ptr; + } + + template<bool IsOtherConst> + bool operator ==(const TIteratorImpl<IsOtherConst>& other) const { + Y_VERIFY_DEBUG(Rope == other.Rope); + CheckValid(); + other.CheckValid(); + return Iter == other.Iter && Ptr == other.Ptr; + } + + template<bool IsOtherConst> + bool operator !=(const TIteratorImpl<IsOtherConst>& other) const { + CheckValid(); + other.CheckValid(); + return !(*this == other); + } + + private: + friend class TRope; + + typename TTraits::TListIterator operator ->() const { + CheckValid(); + return Iter; + } + + const TChunk& GetChunk() const { + CheckValid(); + return *Iter; + } + + typename TTraits::TListIterator GetChainBegin() const { + CheckValid(); + return Rope->Chain.begin(); + } + + typename TTraits::TListIterator GetChainEnd() const { + CheckValid(); + return Rope->Chain.end(); + } + + bool PointsToChunkMiddle() const { + CheckValid(); + return Ptr && Ptr != Iter->Begin; + } + }; + +public: +#ifndef NDEBUG + ui32 ValidityToken = 0; + ui32 GetValidityToken() const { return ValidityToken; } + void InvalidateIterators() { ++ValidityToken; } +#else + void InvalidateIterators() {} +#endif + +public: + using TConstIterator = TIteratorImpl<true>; + using TIterator = TIteratorImpl<false>; + +public: + TRope() = default; + TRope(const TRope& rope) = default; + + TRope(TRope&& rope) + : Chain(std::move(rope.Chain)) + , Size(std::exchange(rope.Size, 0)) + { + rope.InvalidateIterators(); + } + + TRope(TString s) { + if (s) { + Size = s.size(); + s.reserve(32); + Chain.PutToEnd(std::move(s)); + } + } + + TRope(IRopeChunkBackend::TPtr item) { + std::tie(std::ignore, Size) = item->GetData(); + Chain.PutToEnd(std::move(item)); + } + + TRope(TConstIterator begin, TConstIterator end) { + Y_VERIFY_DEBUG(begin.Rope == end.Rope); + if (begin.Rope == this) { + TRope temp(begin, end); + *this = std::move(temp); + return; + } + + while (begin.Iter != end.Iter) { + const size_t size = begin.ContiguousSize(); + Chain.PutToEnd(TChunk::Slice, begin.ContiguousData(), size, begin.GetChunk()); + begin.AdvanceToNextContiguousBlock(); + Size += size; + } + + if (begin != end && end.PointsToChunkMiddle()) { + Chain.PutToEnd(TChunk::Slice, begin.Ptr, end.Ptr, begin.GetChunk()); + Size += end.Ptr - begin.Ptr; + } + } + + ~TRope() { + } + + // creates a copy of rope with chunks with inefficient storage ratio being copied with arena allocator + static TRope CopySpaceOptimized(TRope&& origin, size_t worstRatioPer1k, TRopeArena& arena); + + TRope& operator=(const TRope& other) { + Chain = other.Chain; + Size = other.Size; + return *this; + } + + TRope& operator=(TRope&& other) { + Chain = std::move(other.Chain); + Size = std::exchange(other.Size, 0); + InvalidateIterators(); + other.InvalidateIterators(); + return *this; + } + + size_t GetSize() const { + return Size; + } + + bool IsEmpty() const { + return !Size; + } + + operator bool() const { + return Chain; + } + + TIterator Begin() { + return *this ? TIterator(this, Chain.begin(), Chain.GetFirstChunk().Begin) : End(); + } + + TIterator End() { + return TIterator(this, Chain.end()); + } + + TIterator Iterator(TChunkList::iterator it) { + return TIterator(this, it, it != Chain.end() ? it->Begin : nullptr); + } + + TIterator Position(size_t index) { + return Begin() + index; + } + + TConstIterator Begin() const { + return *this ? TConstIterator(this, Chain.begin(), Chain.GetFirstChunk().Begin) : End(); + } + + TConstIterator End() const { + return TConstIterator(this, Chain.end()); + } + + TConstIterator Position(size_t index) const { + return Begin() + index; + } + + TConstIterator begin() const { return Begin(); } + TConstIterator end() const { return End(); } + + void Erase(TIterator begin, TIterator end) { + Cut(begin, end, nullptr); + } + + TRope Extract(TIterator begin, TIterator end) { + TRope res; + Cut(begin, end, &res); + return res; + } + + void ExtractFront(size_t num, TRope *dest) { + Y_VERIFY(Size >= num); + if (num == Size && !*dest) { + *dest = std::move(*this); + return; + } + Size -= num; + dest->Size += num; + TChunkList::iterator it, first = Chain.begin(); + for (it = first; num && num >= it->GetSize(); ++it) { + num -= it->GetSize(); + } + if (it != first) { + if (dest->Chain) { + auto& last = dest->Chain.GetLastChunk(); + if (last.Backend == first->Backend && last.End == first->Begin) { + last.End = first->End; + first = Chain.Erase(first); // TODO(alexvru): "it" gets invalidated here on some containers + } + } + dest->Chain.Splice(dest->Chain.end(), Chain, first, it); + } + if (num) { + auto it = Chain.begin(); + if (dest->Chain) { + auto& last = dest->Chain.GetLastChunk(); + if (last.Backend == first->Backend && last.End == first->Begin) { + first->Begin += num; + last.End = first->Begin; + return; + } + } + dest->Chain.PutToEnd(TChunk::Slice, it->Begin, it->Begin + num, *it); + it->Begin += num; + } + } + + void Insert(TIterator pos, TRope&& rope) { + Y_VERIFY_DEBUG(this == pos.Rope); + Y_VERIFY_DEBUG(this != &rope); + + if (!rope) { + return; // do nothing for empty rope + } + + // adjust size + Size += std::exchange(rope.Size, 0); + + // check if we have to split the block + if (pos.PointsToChunkMiddle()) { + pos.Iter = Chain.InsertBefore(pos.Iter, TChunk::Slice, pos->Begin, pos.Ptr, pos.GetChunk()); + ++pos.Iter; + pos->Begin = pos.Ptr; + } + + // perform glueing if possible + TChunk *ropeLeft = &rope.Chain.GetFirstChunk(); + TChunk *ropeRight = &rope.Chain.GetLastChunk(); + bool gluedLeft = false, gluedRight = false; + if (pos.Iter != Chain.begin()) { // glue left part whenever possible + // obtain iterator to previous chunk + auto prev(pos.Iter); + --prev; + if (prev->End == ropeLeft->Begin && prev->Backend == ropeLeft->Backend) { // it is glueable + prev->End = ropeLeft->End; + gluedLeft = true; + } + } + if (pos.Iter != Chain.end() && ropeRight->End == pos->Begin && ropeRight->Backend == pos->Backend) { + pos->Begin = ropeRight->Begin; + gluedRight = true; + } + if (gluedLeft) { + rope.Chain.EraseFront(); + } + if (gluedRight) { + if (rope) { + rope.Chain.EraseBack(); + } else { // it looks like double-glueing for the same chunk, we have to drop previous one + auto prev(pos.Iter); + --prev; + pos->Begin = prev->Begin; + pos.Iter = Chain.Erase(prev); + } + } + if (rope) { // insert remains + Chain.Splice(pos.Iter, rope.Chain, rope.Chain.begin(), rope.Chain.end()); + } + Y_VERIFY_DEBUG(!rope); + InvalidateIterators(); + } + + void EraseFront(size_t len) { + Y_VERIFY_DEBUG(Size >= len); + Size -= len; + + while (len) { + Y_VERIFY_DEBUG(Chain); + TChunk& item = Chain.GetFirstChunk(); + const size_t itemSize = item.GetSize(); + if (len >= itemSize) { + Chain.EraseFront(); + len -= itemSize; + } else { + item.Begin += len; + break; + } + } + + InvalidateIterators(); + } + + void EraseBack(size_t len) { + Y_VERIFY_DEBUG(Size >= len); + Size -= len; + + while (len) { + Y_VERIFY_DEBUG(Chain); + TChunk& item = Chain.GetLastChunk(); + const size_t itemSize = item.GetSize(); + if (len >= itemSize) { + Chain.EraseBack(); + len -= itemSize; + } else { + item.End -= len; + break; + } + } + + InvalidateIterators(); + } + + bool ExtractFrontPlain(void *buffer, size_t len) { + // check if we have enough data in the rope + if (Size < len) { + return false; + } + Size -= len; + while (len) { + auto& chunk = Chain.GetFirstChunk(); + const size_t num = Min(len, chunk.GetSize()); + memcpy(buffer, chunk.Begin, num); + buffer = static_cast<char*>(buffer) + num; + len -= num; + chunk.Begin += num; + if (chunk.Begin == chunk.End) { + Chain.Erase(Chain.begin()); + } + } + InvalidateIterators(); + return true; + } + + bool FetchFrontPlain(char **ptr, size_t *remain) { + const size_t num = Min(*remain, Size); + ExtractFrontPlain(*ptr, num); + *ptr += num; + *remain -= num; + return !*remain; + } + + static int Compare(const TRope& x, const TRope& y) { + TConstIterator xIter = x.Begin(), yIter = y.Begin(); + while (xIter.Valid() && yIter.Valid()) { + const size_t step = std::min(xIter.ContiguousSize(), yIter.ContiguousSize()); + if (int res = memcmp(xIter.ContiguousData(), yIter.ContiguousData(), step)) { + return res; + } + xIter += step; + yIter += step; + } + return xIter.Valid() - yIter.Valid(); + } + + // Use this method carefully -- it may significantly reduce performance when misused. + TString ConvertToString() const { + TString res = TString::Uninitialized(GetSize()); + Begin().ExtractPlainDataAndAdvance(res.Detach(), res.size()); + return res; + } + + TString DebugString() const { + TStringStream s; + s << "{Size# " << Size; + for (const auto& chunk : Chain) { + const char *data; + std::tie(data, std::ignore) = chunk.Backend.GetData(); + s << " [" << chunk.Begin - data << ", " << chunk.End - data << ")@" << chunk.Backend.UniqueId(); + } + s << "}"; + return s.Str(); + } + + friend bool operator==(const TRope& x, const TRope& y) { return Compare(x, y) == 0; } + friend bool operator!=(const TRope& x, const TRope& y) { return Compare(x, y) != 0; } + friend bool operator< (const TRope& x, const TRope& y) { return Compare(x, y) < 0; } + friend bool operator<=(const TRope& x, const TRope& y) { return Compare(x, y) <= 0; } + friend bool operator> (const TRope& x, const TRope& y) { return Compare(x, y) > 0; } + friend bool operator>=(const TRope& x, const TRope& y) { return Compare(x, y) >= 0; } + +private: + void Cut(TIterator begin, TIterator end, TRope *target) { + // ensure all iterators are belong to us + Y_VERIFY_DEBUG(this == begin.Rope && this == end.Rope); + + // if begin and end are equal, we do nothing -- checking this case allows us to find out that begin does not + // point to End(), for example + if (begin == end) { + return; + } + + auto addBlock = [&](const TChunk& from, const char *begin, const char *end) { + if (target) { + target->Chain.PutToEnd(TChunk::Slice, begin, end, from); + target->Size += end - begin; + } + Size -= end - begin; + }; + + // consider special case -- when begin and end point to the same block; in this case we have to split up this + // block into two parts + if (begin.Iter == end.Iter) { + addBlock(begin.GetChunk(), begin.Ptr, end.Ptr); + const char *firstChunkBegin = begin.PointsToChunkMiddle() ? begin->Begin : nullptr; + begin->Begin = end.Ptr; // this affects both begin and end iterator pointed values + if (firstChunkBegin) { + Chain.InsertBefore(begin.Iter, TChunk::Slice, firstChunkBegin, begin.Ptr, begin.GetChunk()); + } + } else { + // check the first iterator -- if it starts not from the begin of the block, we have to adjust end of the + // first block to match begin iterator and switch to next block + if (begin.PointsToChunkMiddle()) { + addBlock(begin.GetChunk(), begin.Ptr, begin->End); + begin->End = begin.Ptr; + begin.AdvanceToNextContiguousBlock(); + } + + // now drop full blocks + size_t rangeSize = 0; + for (auto it = begin.Iter; it != end.Iter; ++it) { + Y_VERIFY_DEBUG(it->GetSize()); + rangeSize += it->GetSize(); + } + if (rangeSize) { + if (target) { + end.Iter = target->Chain.Splice(target->Chain.end(), Chain, begin.Iter, end.Iter); + target->Size += rangeSize; + } else { + end.Iter = Chain.Erase(begin.Iter, end.Iter); + } + Size -= rangeSize; + } + + // and cut the last block if necessary + if (end.PointsToChunkMiddle()) { + addBlock(end.GetChunk(), end->Begin, end.Ptr); + end->Begin = end.Ptr; + } + } + + InvalidateIterators(); + } +}; + +class TRopeArena { + using TAllocateCallback = std::function<TIntrusivePtr<IRopeChunkBackend>()>; + + TAllocateCallback Allocator; + TRope Arena; + size_t Size = 0; + THashSet<const void*> AccountedBuffers; + +public: + TRopeArena(TAllocateCallback&& allocator) + : Allocator(std::move(allocator)) + {} + + TRope CreateRope(const void *buffer, size_t len) { + TRope res; + + while (len) { + if (Arena) { + auto iter = Arena.Begin(); + Y_VERIFY_DEBUG(iter.Valid()); + char *dest = const_cast<char*>(iter.ContiguousData()); + const size_t bytesToCopy = std::min(len, iter.ContiguousSize()); + memcpy(dest, buffer, bytesToCopy); + buffer = static_cast<const char*>(buffer) + bytesToCopy; + len -= bytesToCopy; + res.Insert(res.End(), Arena.Extract(Arena.Begin(), Arena.Position(bytesToCopy))); + } else { + Arena.Insert(Arena.End(), TRope(Allocator())); + } + } + + // align arena on 8-byte boundary + const size_t align = 8; + if (const size_t padding = Arena.GetSize() % align) { + Arena.EraseFront(padding); + } + + return res; + } + + size_t GetSize() const { + return Size; + } + + void AccountChunk(const TRope::TChunk& chunk) { + if (AccountedBuffers.insert(chunk.Backend.UniqueId()).second) { + Size += chunk.GetCapacity(); + } + } +}; + +struct TRopeUtils { + static void Memset(TRope::TConstIterator dst, char c, size_t size) { + while (size) { + Y_VERIFY_DEBUG(dst.Valid()); + size_t len = std::min(size, dst.ContiguousSize()); + memset(const_cast<char*>(dst.ContiguousData()), c, len); + dst += len; + size -= len; + } + } + + static void Memcpy(TRope::TConstIterator dst, TRope::TConstIterator src, size_t size) { + while (size) { + Y_VERIFY_DEBUG(dst.Valid() && src.Valid(), + "Invalid iterator in memcpy: dst.Valid() - %" PRIu32 ", src.Valid() - %" PRIu32, + (ui32)dst.Valid(), (ui32)src.Valid()); + size_t len = std::min(size, std::min(dst.ContiguousSize(), src.ContiguousSize())); + memcpy(const_cast<char*>(dst.ContiguousData()), src.ContiguousData(), len); + dst += len; + src += len; + size -= len; + } + } + + static void Memcpy(TRope::TConstIterator dst, const char* src, size_t size) { + while (size) { + Y_VERIFY_DEBUG(dst.Valid()); + size_t len = std::min(size, dst.ContiguousSize()); + memcpy(const_cast<char*>(dst.ContiguousData()), src, len); + size -= len; + dst += len; + src += len; + } + } + + static void Memcpy(char* dst, TRope::TConstIterator src, size_t size) { + while (size) { + Y_VERIFY_DEBUG(src.Valid()); + size_t len = std::min(size, src.ContiguousSize()); + memcpy(dst, src.ContiguousData(), len); + size -= len; + dst += len; + src += len; + } + } + + // copy less or equal to sizeBound bytes, until src is valid + static size_t SafeMemcpy(char* dst, TRope::TIterator src, size_t sizeBound) { + size_t origSize = sizeBound; + while (sizeBound && src.Valid()) { + size_t len = Min(sizeBound, src.ContiguousSize()); + memcpy(dst, src.ContiguousData(), len); + sizeBound -= len; + dst += len; + src += len; + } + return origSize - sizeBound; + } +}; + +template<size_t BLOCK, size_t ALIGN = 16> +class TRopeSlideView { + alignas(ALIGN) char Slide[BLOCK]; // use if distance from current point and next chunk is less than BLOCK + TRope::TIterator Position; // current position at rope + size_t Size; + char* Head; // points to data, it might be current rope chunk or Slide + +private: + void FillBlock() { + size_t chunkSize = Position.ContiguousSize(); + if (chunkSize >= BLOCK) { + Size = chunkSize; + Head = const_cast<char*>(Position.ContiguousData()); + } else { + Size = TRopeUtils::SafeMemcpy(Slide, Position, BLOCK); + Head = Slide; + } + } + +public: + TRopeSlideView(TRope::TIterator position) + : Position(position) + { + FillBlock(); + } + + TRopeSlideView(TRope &rope) + : TRopeSlideView(rope.Begin()) + {} + + // if view on slide then copy slide to rope + void FlushBlock() { + if (Head == Slide) { + TRopeUtils::Memcpy(Position, Head, Size); + } + } + + TRope::TIterator operator+=(size_t amount) { + Position += amount; + FillBlock(); + return Position; + } + + TRope::TIterator GetPosition() const { + return Position; + } + + char* GetHead() const { + return Head; + } + + ui8* GetUi8Head() const { + return reinterpret_cast<ui8*>(Head); + } + + size_t ContiguousSize() const { + return Size; + } + + bool IsOnChunk() const { + return Head != Slide; + } +}; + +inline TRope TRope::CopySpaceOptimized(TRope&& origin, size_t worstRatioPer1k, TRopeArena& arena) { + TRope res; + for (TChunk& chunk : origin.Chain) { + size_t ratio = chunk.GetSize() * 1024 / chunk.GetCapacity(); + if (ratio < 1024 - worstRatioPer1k) { + res.Insert(res.End(), arena.CreateRope(chunk.Begin, chunk.GetSize())); + } else { + res.Chain.PutToEnd(std::move(chunk)); + } + } + res.Size = origin.Size; + origin = TRope(); + for (const TChunk& chunk : res.Chain) { + arena.AccountChunk(chunk); + } + return res; +} + + +#if defined(WITH_VALGRIND) || defined(_msan_enabled_) + +inline void CheckRopeIsDefined(TRope::TConstIterator begin, ui64 size) { + while (size) { + ui64 contiguousSize = Min(size, begin.ContiguousSize()); +# if defined(WITH_VALGRIND) + VALGRIND_CHECK_MEM_IS_DEFINED(begin.ContiguousData(), contiguousSize); +# endif +# if defined(_msan_enabled_) + NSan::CheckMemIsInitialized(begin.ContiguousData(), contiguousSize); +# endif + size -= contiguousSize; + begin += contiguousSize; + } +} + +# define CHECK_ROPE_IS_DEFINED(begin, size) CheckRopeIsDefined(begin, size) + +#else + +# define CHECK_ROPE_IS_DEFINED(begin, size) do {} while (false) + +#endif diff --git a/library/cpp/actors/util/rope_cont_deque.h b/library/cpp/actors/util/rope_cont_deque.h new file mode 100644 index 0000000000..d1d122c49c --- /dev/null +++ b/library/cpp/actors/util/rope_cont_deque.h @@ -0,0 +1,181 @@ +#pragma once + +#include <library/cpp/containers/stack_vector/stack_vec.h> +#include <deque> + +namespace NRopeDetails { + +template<typename TChunk> +class TChunkList { + std::deque<TChunk> Chunks; + + static constexpr size_t MaxInplaceItems = 4; + using TInplace = TStackVec<TChunk, MaxInplaceItems>; + TInplace Inplace; + +private: + template<typename TChunksIt, typename TInplaceIt, typename TValue> + struct TIterator { + TChunksIt ChunksIt; + TInplaceIt InplaceIt; + + TIterator() = default; + + TIterator(TChunksIt chunksIt, TInplaceIt inplaceIt) + : ChunksIt(std::move(chunksIt)) + , InplaceIt(inplaceIt) + {} + + template<typename A, typename B, typename C> + TIterator(const TIterator<A, B, C>& other) + : ChunksIt(other.ChunksIt) + , InplaceIt(other.InplaceIt) + {} + + TIterator(const TIterator&) = default; + TIterator(TIterator&&) = default; + TIterator& operator =(const TIterator&) = default; + TIterator& operator =(TIterator&&) = default; + + TValue& operator *() const { return InplaceIt != TInplaceIt() ? *InplaceIt : *ChunksIt; } + TValue* operator ->() const { return InplaceIt != TInplaceIt() ? &*InplaceIt : &*ChunksIt; } + + TIterator& operator ++() { + if (InplaceIt != TInplaceIt()) { + ++InplaceIt; + } else { + ++ChunksIt; + } + return *this; + } + + TIterator& operator --() { + if (InplaceIt != TInplaceIt()) { + --InplaceIt; + } else { + --ChunksIt; + } + return *this; + } + + template<typename A, typename B, typename C> + bool operator ==(const TIterator<A, B, C>& other) const { + return ChunksIt == other.ChunksIt && InplaceIt == other.InplaceIt; + } + + template<typename A, typename B, typename C> + bool operator !=(const TIterator<A, B, C>& other) const { + return ChunksIt != other.ChunksIt || InplaceIt != other.InplaceIt; + } + }; + +public: + using iterator = TIterator<typename std::deque<TChunk>::iterator, typename TInplace::iterator, TChunk>; + using const_iterator = TIterator<typename std::deque<TChunk>::const_iterator, typename TInplace::const_iterator, const TChunk>; + +public: + TChunkList() = default; + TChunkList(const TChunkList& other) = default; + TChunkList(TChunkList&& other) = default; + TChunkList& operator=(const TChunkList& other) = default; + TChunkList& operator=(TChunkList&& other) = default; + + template<typename... TArgs> + void PutToEnd(TArgs&&... args) { + InsertBefore(end(), std::forward<TArgs>(args)...); + } + + template<typename... TArgs> + iterator InsertBefore(iterator pos, TArgs&&... args) { + if (!Inplace) { + pos.InplaceIt = Inplace.end(); + } + if (Chunks.empty() && Inplace.size() < MaxInplaceItems) { + return {{}, Inplace.emplace(pos.InplaceIt, std::forward<TArgs>(args)...)}; + } else { + if (Inplace) { + Y_VERIFY_DEBUG(Chunks.empty()); + for (auto& item : Inplace) { + Chunks.push_back(std::move(item)); + } + pos.ChunksIt = pos.InplaceIt - Inplace.begin() + Chunks.begin(); + Inplace.clear(); + } + return {Chunks.emplace(pos.ChunksIt, std::forward<TArgs>(args)...), {}}; + } + } + + iterator Erase(iterator pos) { + if (Inplace) { + return {{}, Inplace.erase(pos.InplaceIt)}; + } else { + return {Chunks.erase(pos.ChunksIt), {}}; + } + } + + iterator Erase(iterator first, iterator last) { + if (Inplace) { + return {{}, Inplace.erase(first.InplaceIt, last.InplaceIt)}; + } else { + return {Chunks.erase(first.ChunksIt, last.ChunksIt), {}}; + } + } + + void EraseFront() { + if (Inplace) { + Inplace.erase(Inplace.begin()); + } else { + Chunks.pop_front(); + } + } + + void EraseBack() { + if (Inplace) { + Inplace.pop_back(); + } else { + Chunks.pop_back(); + } + } + + iterator Splice(iterator pos, TChunkList& from, iterator first, iterator last) { + if (!Inplace) { + pos.InplaceIt = Inplace.end(); + } + size_t n = 0; + for (auto it = first; it != last; ++it, ++n) + {} + if (Chunks.empty() && Inplace.size() + n <= MaxInplaceItems) { + if (first.InplaceIt != typename TInplace::iterator()) { + Inplace.insert(pos.InplaceIt, first.InplaceIt, last.InplaceIt); + } else { + Inplace.insert(pos.InplaceIt, first.ChunksIt, last.ChunksIt); + } + } else { + if (Inplace) { + Y_VERIFY_DEBUG(Chunks.empty()); + for (auto& item : Inplace) { + Chunks.push_back(std::move(item)); + } + pos.ChunksIt = pos.InplaceIt - Inplace.begin() + Chunks.begin(); + Inplace.clear(); + } + if (first.InplaceIt != typename TInplace::iterator()) { + Chunks.insert(pos.ChunksIt, first.InplaceIt, last.InplaceIt); + } else { + Chunks.insert(pos.ChunksIt, first.ChunksIt, last.ChunksIt); + } + } + return from.Erase(first, last); + } + + operator bool() const { return !Inplace.empty() || !Chunks.empty(); } + TChunk& GetFirstChunk() { return Inplace ? Inplace.front() : Chunks.front(); } + const TChunk& GetFirstChunk() const { return Inplace ? Inplace.front() : Chunks.front(); } + TChunk& GetLastChunk() { return Inplace ? Inplace.back() : Chunks.back(); } + iterator begin() { return {Chunks.begin(), Inplace ? Inplace.begin() : typename TInplace::iterator()}; } + const_iterator begin() const { return {Chunks.begin(), Inplace ? Inplace.begin() : typename TInplace::const_iterator()}; } + iterator end() { return {Chunks.end(), Inplace ? Inplace.end() : typename TInplace::iterator()}; } + const_iterator end() const { return {Chunks.end(), Inplace ? Inplace.end() : typename TInplace::const_iterator()}; } +}; + +} // NRopeDetails diff --git a/library/cpp/actors/util/rope_cont_list.h b/library/cpp/actors/util/rope_cont_list.h new file mode 100644 index 0000000000..18c136284e --- /dev/null +++ b/library/cpp/actors/util/rope_cont_list.h @@ -0,0 +1,159 @@ +#pragma once + +#include <util/generic/intrlist.h> + +namespace NRopeDetails { + +template<typename TChunk> +class TChunkList { + struct TItem : TIntrusiveListItem<TItem>, TChunk { + // delegating constructor + template<typename... TArgs> TItem(TArgs&&... args) : TChunk(std::forward<TArgs>(args)...) {} + }; + + using TList = TIntrusiveList<TItem>; + TList List; + + static constexpr size_t NumInplaceItems = 2; + char InplaceItems[sizeof(TItem) * NumInplaceItems]; + + template<typename... TArgs> + TItem *AllocateItem(TArgs&&... args) { + for (size_t index = 0; index < NumInplaceItems; ++index) { + TItem *chunk = GetInplaceItemPtr(index); + if (!TItem::IsInUse(*chunk)) { + return new(chunk) TItem(std::forward<TArgs>(args)...); + } + } + return new TItem(std::forward<TArgs>(args)...); + } + + void ReleaseItem(TItem *chunk) { + if (IsInplaceItem(chunk)) { + chunk->~TItem(); + TItem::Clear(*chunk); + } else { + delete chunk; + } + } + + void ReleaseItems(TList& list) { + while (list) { + ReleaseItem(list.Front()); + } + } + + void Prepare() { + for (size_t index = 0; index < NumInplaceItems; ++index) { + TItem::Clear(*GetInplaceItemPtr(index)); + } + } + + TItem *GetInplaceItemPtr(size_t index) { return reinterpret_cast<TItem*>(InplaceItems + index * sizeof(TItem)); } + bool IsInplaceItem(TItem *chunk) { return chunk >= GetInplaceItemPtr(0) && chunk < GetInplaceItemPtr(NumInplaceItems); } + +public: + using iterator = typename TList::iterator; + using const_iterator = typename TList::const_iterator; + +public: + TChunkList() { + Prepare(); + } + + ~TChunkList() { + ReleaseItems(List); +#ifndef NDEBUG + for (size_t index = 0; index < NumInplaceItems; ++index) { + Y_VERIFY(!TItem::IsInUse(*GetInplaceItemPtr(index))); + } +#endif + } + + TChunkList(const TChunkList& other) { + Prepare(); + for (const TItem& chunk : other.List) { + PutToEnd(TChunk(chunk)); + } + } + + TChunkList(TChunkList&& other) { + Prepare(); + Splice(end(), other, other.begin(), other.end()); + } + + TChunkList& operator=(const TChunkList& other) { + if (this != &other) { + ReleaseItems(List); + for (const TItem& chunk : other.List) { + PutToEnd(TChunk(chunk)); + } + } + return *this; + } + + TChunkList& operator=(TChunkList&& other) { + if (this != &other) { + ReleaseItems(List); + Splice(end(), other, other.begin(), other.end()); + } + return *this; + } + + template<typename... TArgs> + void PutToEnd(TArgs&&... args) { + InsertBefore(end(), std::forward<TArgs>(args)...); + } + + template<typename... TArgs> + iterator InsertBefore(iterator pos, TArgs&&... args) { + TItem *item = AllocateItem<TArgs...>(std::forward<TArgs>(args)...); + item->LinkBefore(pos.Item()); + return item; + } + + iterator Erase(iterator pos) { + ReleaseItem(&*pos++); + return pos; + } + + iterator Erase(iterator first, iterator last) { + TList temp; + TList::Cut(first, last, temp.end()); + ReleaseItems(temp); + return last; + } + + void EraseFront() { + ReleaseItem(List.PopFront()); + } + + void EraseBack() { + ReleaseItem(List.PopBack()); + } + + iterator Splice(iterator pos, TChunkList& from, iterator first, iterator last) { + for (auto it = first; it != last; ) { + if (from.IsInplaceItem(&*it)) { + TList::Cut(first, it, pos); + InsertBefore(pos, std::move(*it)); + it = first = from.Erase(it); + } else { + ++it; + } + } + TList::Cut(first, last, pos); + return last; + } + + operator bool() const { return static_cast<bool>(List); } + TChunk& GetFirstChunk() { return *List.Front(); } + const TChunk& GetFirstChunk() const { return *List.Front(); } + TChunk& GetLastChunk() { return *List.Back(); } + iterator begin() { return List.begin(); } + const_iterator begin() const { return List.begin(); } + iterator end() { return List.end(); } + const_iterator end() const { return List.end(); } +}; + +} // NRopeDetails diff --git a/library/cpp/actors/util/rope_ut.cpp b/library/cpp/actors/util/rope_ut.cpp new file mode 100644 index 0000000000..cabeed9230 --- /dev/null +++ b/library/cpp/actors/util/rope_ut.cpp @@ -0,0 +1,231 @@ +#include "rope.h" +#include <library/cpp/testing/unittest/registar.h> +#include <util/random/random.h> + +class TRopeStringBackend : public IRopeChunkBackend { + TString Buffer; + +public: + TRopeStringBackend(TString buffer) + : Buffer(std::move(buffer)) + {} + + TData GetData() const override { + return {Buffer.data(), Buffer.size()}; + } + + size_t GetCapacity() const override { + return Buffer.capacity(); + } +}; + +TRope CreateRope(TString s, size_t sliceSize) { + TRope res; + for (size_t i = 0; i < s.size(); ) { + size_t len = std::min(sliceSize, s.size() - i); + if (i % 2) { + res.Insert(res.End(), TRope(MakeIntrusive<TRopeStringBackend>(s.substr(i, len)))); + } else { + res.Insert(res.End(), TRope(s.substr(i, len))); + } + i += len; + } + return res; +} + +TString RopeToString(const TRope& rope) { + TString res; + auto iter = rope.Begin(); + while (iter != rope.End()) { + res.append(iter.ContiguousData(), iter.ContiguousSize()); + iter.AdvanceToNextContiguousBlock(); + } + + UNIT_ASSERT_VALUES_EQUAL(rope.GetSize(), res.size()); + + TString temp = TString::Uninitialized(rope.GetSize()); + rope.Begin().ExtractPlainDataAndAdvance(temp.Detach(), temp.size()); + UNIT_ASSERT_VALUES_EQUAL(temp, res); + + return res; +} + +TString Text = "No elements are copied or moved, only the internal pointers of the list nodes are re-pointed."; + +Y_UNIT_TEST_SUITE(TRope) { + + Y_UNIT_TEST(Leak) { + const size_t begin = 10, end = 20; + TRope rope = CreateRope(Text, 10); + rope.Erase(rope.Begin() + begin, rope.Begin() + end); + } + + Y_UNIT_TEST(BasicRange) { + TRope rope = CreateRope(Text, 10); + for (size_t begin = 0; begin < Text.size(); ++begin) { + for (size_t end = begin; end <= Text.size(); ++end) { + TRope::TIterator rBegin = rope.Begin() + begin; + TRope::TIterator rEnd = rope.Begin() + end; + UNIT_ASSERT_VALUES_EQUAL(RopeToString(TRope(rBegin, rEnd)), Text.substr(begin, end - begin)); + } + } + } + + Y_UNIT_TEST(Erase) { + for (size_t begin = 0; begin < Text.size(); ++begin) { + for (size_t end = begin; end <= Text.size(); ++end) { + TRope rope = CreateRope(Text, 10); + rope.Erase(rope.Begin() + begin, rope.Begin() + end); + TString text = Text; + text.erase(text.begin() + begin, text.begin() + end); + UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), text); + } + } + } + + Y_UNIT_TEST(Insert) { + TRope rope = CreateRope(Text, 10); + for (size_t begin = 0; begin < Text.size(); ++begin) { + for (size_t end = begin; end <= Text.size(); ++end) { + TRope part = TRope(rope.Begin() + begin, rope.Begin() + end); + for (size_t where = 0; where <= Text.size(); ++where) { + TRope x(rope); + x.Insert(x.Begin() + where, TRope(part)); + UNIT_ASSERT_VALUES_EQUAL(x.GetSize(), rope.GetSize() + part.GetSize()); + TString text = Text; + text.insert(text.begin() + where, Text.begin() + begin, Text.begin() + end); + UNIT_ASSERT_VALUES_EQUAL(RopeToString(x), text); + } + } + } + } + + Y_UNIT_TEST(Extract) { + for (size_t begin = 0; begin < Text.size(); ++begin) { + for (size_t end = begin; end <= Text.size(); ++end) { + TRope rope = CreateRope(Text, 10); + TRope part = rope.Extract(rope.Begin() + begin, rope.Begin() + end); + TString text = Text; + text.erase(text.begin() + begin, text.begin() + end); + UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), text); + UNIT_ASSERT_VALUES_EQUAL(RopeToString(part), Text.substr(begin, end - begin)); + } + } + } + + Y_UNIT_TEST(EraseFront) { + for (size_t pos = 0; pos <= Text.size(); ++pos) { + TRope rope = CreateRope(Text, 10); + rope.EraseFront(pos); + UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), Text.substr(pos)); + } + } + + Y_UNIT_TEST(EraseBack) { + for (size_t pos = 0; pos <= Text.size(); ++pos) { + TRope rope = CreateRope(Text, 10); + rope.EraseBack(pos); + UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), Text.substr(0, Text.size() - pos)); + } + } + + Y_UNIT_TEST(ExtractFront) { + for (size_t step = 1; step <= Text.size(); ++step) { + TRope rope = CreateRope(Text, 10); + TRope out; + while (const size_t len = Min(step, rope.GetSize())) { + rope.ExtractFront(len, &out); + UNIT_ASSERT(rope.GetSize() + out.GetSize() == Text.size()); + UNIT_ASSERT_VALUES_EQUAL(RopeToString(out), Text.substr(0, out.GetSize())); + } + } + } + + Y_UNIT_TEST(ExtractFrontPlain) { + for (size_t step = 1; step <= Text.size(); ++step) { + TRope rope = CreateRope(Text, 10); + TString buffer = Text; + auto it = rope.Begin(); + size_t remain = rope.GetSize(); + while (const size_t len = Min(step, remain)) { + TString data = TString::Uninitialized(len); + it.ExtractPlainDataAndAdvance(data.Detach(), data.size()); + UNIT_ASSERT_VALUES_EQUAL(data, buffer.substr(0, len)); + UNIT_ASSERT_VALUES_EQUAL(RopeToString(TRope(it, rope.End())), buffer.substr(len)); + buffer = buffer.substr(len); + remain -= len; + } + } + } + + Y_UNIT_TEST(FetchFrontPlain) { + char s[10]; + char *data = s; + size_t remain = sizeof(s); + TRope rope = TRope(TString("HELLO")); + UNIT_ASSERT(!rope.FetchFrontPlain(&data, &remain)); + UNIT_ASSERT(!rope); + rope.Insert(rope.End(), TRope(TString("WORLD!!!"))); + UNIT_ASSERT(rope.FetchFrontPlain(&data, &remain)); + UNIT_ASSERT(!remain); + UNIT_ASSERT(rope.GetSize() == 3); + UNIT_ASSERT_VALUES_EQUAL(rope.ConvertToString(), "!!!"); + UNIT_ASSERT(!strncmp(s, "HELLOWORLD", 10)); + } + + Y_UNIT_TEST(Glueing) { + TRope rope = CreateRope(Text, 10); + for (size_t begin = 0; begin <= Text.size(); ++begin) { + for (size_t end = begin; end <= Text.size(); ++end) { + TString repr = rope.DebugString(); + TRope temp = rope.Extract(rope.Position(begin), rope.Position(end)); + rope.Insert(rope.Position(begin), std::move(temp)); + UNIT_ASSERT_VALUES_EQUAL(repr, rope.DebugString()); + UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), Text); + } + } + } + + Y_UNIT_TEST(IterWalk) { + TRope rope = CreateRope(Text, 10); + for (size_t step1 = 0; step1 <= rope.GetSize(); ++step1) { + for (size_t step2 = 0; step2 <= step1; ++step2) { + TRope::TConstIterator iter = rope.Begin(); + iter += step1; + iter -= step2; + UNIT_ASSERT(iter == rope.Position(step1 - step2)); + } + } + } + + Y_UNIT_TEST(Compare) { + auto check = [](const TString& x, const TString& y) { + const TRope xRope = CreateRope(x, 7); + const TRope yRope = CreateRope(y, 11); + UNIT_ASSERT_VALUES_EQUAL(xRope == yRope, x == y); + UNIT_ASSERT_VALUES_EQUAL(xRope != yRope, x != y); + UNIT_ASSERT_VALUES_EQUAL(xRope < yRope, x < y); + UNIT_ASSERT_VALUES_EQUAL(xRope <= yRope, x <= y); + UNIT_ASSERT_VALUES_EQUAL(xRope > yRope, x > y); + UNIT_ASSERT_VALUES_EQUAL(xRope >= yRope, x >= y); + }; + + TVector<TString> pool; + for (size_t k = 0; k < 10; ++k) { + size_t len = RandomNumber<size_t>(100) + 100; + TString s = TString::Uninitialized(len); + char *p = s.Detach(); + for (size_t j = 0; j < len; ++j) { + *p++ = RandomNumber<unsigned char>(); + } + pool.push_back(std::move(s)); + } + + for (const TString& x : pool) { + for (const TString& y : pool) { + check(x, y); + } + } + } + +} diff --git a/library/cpp/actors/util/should_continue.cpp b/library/cpp/actors/util/should_continue.cpp new file mode 100644 index 0000000000..258e6a0aff --- /dev/null +++ b/library/cpp/actors/util/should_continue.cpp @@ -0,0 +1,23 @@ +#include "should_continue.h" + +void TProgramShouldContinue::ShouldRestart() { + AtomicSet(State, Restart); +} + +void TProgramShouldContinue::ShouldStop(int returnCode) { + AtomicSet(ReturnCode, returnCode); + AtomicSet(State, Stop); +} + +TProgramShouldContinue::EState TProgramShouldContinue::PollState() { + return static_cast<EState>(AtomicGet(State)); +} + +int TProgramShouldContinue::GetReturnCode() { + return static_cast<int>(AtomicGet(ReturnCode)); +} + +void TProgramShouldContinue::Reset() { + AtomicSet(ReturnCode, 0); + AtomicSet(State, Continue); +} diff --git a/library/cpp/actors/util/should_continue.h b/library/cpp/actors/util/should_continue.h new file mode 100644 index 0000000000..76acc40dc4 --- /dev/null +++ b/library/cpp/actors/util/should_continue.h @@ -0,0 +1,22 @@ +#pragma once +#include "defs.h" + +class TProgramShouldContinue { +public: + enum EState { + Continue, + Stop, + Restart, + }; + + void ShouldRestart(); + void ShouldStop(int returnCode = 0); + + EState PollState(); + int GetReturnCode(); + + void Reset(); +private: + TAtomic ReturnCode = 0; + TAtomic State = Continue; +}; diff --git a/library/cpp/actors/util/thread.h b/library/cpp/actors/util/thread.h new file mode 100644 index 0000000000..d742c8c585 --- /dev/null +++ b/library/cpp/actors/util/thread.h @@ -0,0 +1,26 @@ +#pragma once + +#include <util/generic/strbuf.h> +#include <util/stream/str.h> +#include <util/system/execpath.h> +#include <util/system/thread.h> +#include <util/system/thread.h> +#include <time.h> + +inline void SetCurrentThreadName(const TString& name, + const ui32 maxCharsFromProcessName = 8) { +#if defined(_linux_) + // linux limits threadname by 15 + \0 + + TStringBuf procName(GetExecPath()); + procName = procName.RNextTok('/'); + procName = procName.SubStr(0, maxCharsFromProcessName); + + TStringStream linuxName; + linuxName << procName << "." << name; + TThread::SetCurrentThreadName(linuxName.Str().data()); +#else + Y_UNUSED(maxCharsFromProcessName); + TThread::SetCurrentThreadName(name.data()); +#endif +} diff --git a/library/cpp/actors/util/threadparkpad.cpp b/library/cpp/actors/util/threadparkpad.cpp new file mode 100644 index 0000000000..74069ff15b --- /dev/null +++ b/library/cpp/actors/util/threadparkpad.cpp @@ -0,0 +1,148 @@ +#include "threadparkpad.h" +#include <util/system/winint.h> + +#ifdef _linux_ + +#include "futex.h" + +namespace NActors { + class TThreadParkPad::TImpl { + volatile bool Interrupted; + int Futex; + + public: + TImpl() + : Interrupted(false) + , Futex(0) + { + } + ~TImpl() { + } + + bool Park() noexcept { + __atomic_fetch_sub(&Futex, 1, __ATOMIC_SEQ_CST); + while (__atomic_load_n(&Futex, __ATOMIC_ACQUIRE) == -1) + SysFutex(&Futex, FUTEX_WAIT_PRIVATE, -1, nullptr, nullptr, 0); + return IsInterrupted(); + } + + void Unpark() noexcept { + const int old = __atomic_fetch_add(&Futex, 1, __ATOMIC_SEQ_CST); + if (old == -1) + SysFutex(&Futex, FUTEX_WAKE_PRIVATE, -1, nullptr, nullptr, 0); + } + + void Interrupt() noexcept { + __atomic_store_n(&Interrupted, true, __ATOMIC_SEQ_CST); + Unpark(); + } + + bool IsInterrupted() const noexcept { + return __atomic_load_n(&Interrupted, __ATOMIC_ACQUIRE); + } + }; + +#elif defined _win32_ +#include <util/generic/bt_exception.h> +#include <util/generic/yexception.h> + +namespace NActors { + class TThreadParkPad::TImpl { + TAtomic Interrupted; + HANDLE EvHandle; + + public: + TImpl() + : Interrupted(false) + { + EvHandle = ::CreateEvent(0, false, false, 0); + if (!EvHandle) + ythrow TWithBackTrace<yexception>() << "::CreateEvent failed"; + } + ~TImpl() { + if (EvHandle) + ::CloseHandle(EvHandle); + } + + bool Park() noexcept { + ::WaitForSingleObject(EvHandle, INFINITE); + return AtomicGet(Interrupted); + } + + void Unpark() noexcept { + ::SetEvent(EvHandle); + } + + void Interrupt() noexcept { + AtomicSet(Interrupted, true); + Unpark(); + } + + bool IsInterrupted() const noexcept { + return AtomicGet(Interrupted); + } + }; + +#else + +#include <util/system/event.h> + +namespace NActors { + class TThreadParkPad::TImpl { + TAtomic Interrupted; + TSystemEvent Ev; + + public: + TImpl() + : Interrupted(false) + , Ev(TSystemEvent::rAuto) + { + } + ~TImpl() { + } + + bool Park() noexcept { + Ev.Wait(); + return AtomicGet(Interrupted); + } + + void Unpark() noexcept { + Ev.Signal(); + } + + void Interrupt() noexcept { + AtomicSet(Interrupted, true); + Unpark(); + } + + bool IsInterrupted() const noexcept { + return AtomicGet(Interrupted); + } + }; +#endif + + TThreadParkPad::TThreadParkPad() + : Impl(new TThreadParkPad::TImpl()) + { + } + + TThreadParkPad::~TThreadParkPad() { + } + + bool TThreadParkPad::Park() noexcept { + return Impl->Park(); + } + + void TThreadParkPad::Unpark() noexcept { + Impl->Unpark(); + } + + void TThreadParkPad::Interrupt() noexcept { + Impl->Interrupt(); + } + + bool TThreadParkPad::Interrupted() const noexcept { + return Impl->IsInterrupted(); + } + +} diff --git a/library/cpp/actors/util/threadparkpad.h b/library/cpp/actors/util/threadparkpad.h new file mode 100644 index 0000000000..5b574ccf34 --- /dev/null +++ b/library/cpp/actors/util/threadparkpad.h @@ -0,0 +1,21 @@ +#pragma once + +#include <util/generic/ptr.h> + +namespace NActors { + class TThreadParkPad { + private: + class TImpl; + THolder<TImpl> Impl; + + public: + TThreadParkPad(); + ~TThreadParkPad(); + + bool Park() noexcept; + void Unpark() noexcept; + void Interrupt() noexcept; + bool Interrupted() const noexcept; + }; + +} diff --git a/library/cpp/actors/util/ticket_lock.h b/library/cpp/actors/util/ticket_lock.h new file mode 100644 index 0000000000..3b1fa80393 --- /dev/null +++ b/library/cpp/actors/util/ticket_lock.h @@ -0,0 +1,48 @@ +#pragma once + +#include "intrinsics.h" +#include <util/system/guard.h> +#include <util/system/yassert.h> + +class TTicketLock : TNonCopyable { + ui32 TicketIn; + ui32 TicketOut; + +public: + TTicketLock() + : TicketIn(0) + , TicketOut(0) + { + } + + void Release() noexcept { + AtomicUi32Increment(&TicketOut); + } + + ui32 Acquire() noexcept { + ui32 revolves = 0; + const ui32 ticket = AtomicUi32Increment(&TicketIn) - 1; + while (ticket != AtomicLoad(&TicketOut)) { + Y_VERIFY_DEBUG(ticket >= AtomicLoad(&TicketOut)); + SpinLockPause(); + ++revolves; + } + return revolves; + } + + bool TryAcquire() noexcept { + const ui32 x = AtomicLoad(&TicketOut); + if (x == AtomicLoad(&TicketIn) && AtomicUi32Cas(&TicketIn, x + 1, x)) + return true; + else + return false; + } + + bool IsLocked() noexcept { + const ui32 ticketIn = AtomicLoad(&TicketIn); + const ui32 ticketOut = AtomicLoad(&TicketOut); + return (ticketIn != ticketOut); + } + + typedef ::TGuard<TTicketLock> TGuard; +}; diff --git a/library/cpp/actors/util/timerfd.h b/library/cpp/actors/util/timerfd.h new file mode 100644 index 0000000000..3189e2a672 --- /dev/null +++ b/library/cpp/actors/util/timerfd.h @@ -0,0 +1,65 @@ +#pragma once + +#include "datetime.h" + +#include <util/generic/noncopyable.h> + +#ifdef _linux_ + +#include <util/system/yassert.h> +#include <errno.h> +#include <sys/timerfd.h> + +struct TTimerFd: public TNonCopyable { + int Fd; + + TTimerFd() { + Fd = timerfd_create(CLOCK_MONOTONIC, 0); + Y_VERIFY(Fd != -1, "timerfd_create(CLOCK_MONOTONIC, 0) -> -1; errno:%d: %s", int(errno), strerror(errno)); + } + + ~TTimerFd() { + close(Fd); + } + + void Set(ui64 ts) { + ui64 now = GetCycleCountFast(); + Arm(now >= ts? 1: NHPTimer::GetSeconds(ts - now) * 1e9); + } + + void Reset() { + Arm(0); // disarm timer + } + + void Wait() { + ui64 expirations; + ssize_t s = read(Fd, &expirations, sizeof(ui64)); + Y_UNUSED(s); // Y_VERIFY(s == sizeof(ui64)); + } + + void Wake() { + Arm(1); + } +private: + void Arm(ui64 ns) { + struct itimerspec spec; + spec.it_value.tv_sec = ns / 1'000'000'000; + spec.it_value.tv_nsec = ns % 1'000'000'000; + spec.it_interval.tv_sec = 0; + spec.it_interval.tv_nsec = 0; + int ret = timerfd_settime(Fd, 0, &spec, nullptr); + Y_VERIFY(ret != -1, "timerfd_settime(%d, 0, %" PRIu64 "ns, 0) -> %d; errno:%d: %s", Fd, ns, ret, int(errno), strerror(errno)); + } +}; + +#else + +struct TTimerFd: public TNonCopyable { + int Fd = 0; + void Set(ui64) {} + void Reset() {} + void Wait() {} + void Wake() {} +}; + +#endif diff --git a/library/cpp/actors/util/unordered_cache.h b/library/cpp/actors/util/unordered_cache.h new file mode 100644 index 0000000000..76f036c0cf --- /dev/null +++ b/library/cpp/actors/util/unordered_cache.h @@ -0,0 +1,201 @@ +#pragma once + +#include "defs.h" +#include "queue_chunk.h" + +template <typename T, ui32 Size = 512, ui32 ConcurrencyFactor = 1, typename TChunk = TQueueChunk<T, Size>> +class TUnorderedCache : TNonCopyable { + static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); + +public: + static constexpr ui32 Concurrency = ConcurrencyFactor * 4; + +private: + struct TReadSlot { + TChunk* volatile ReadFrom; + volatile ui32 ReadPosition; + char Padding[64 - sizeof(TChunk*) - sizeof(ui32)]; // 1 slot per cache line + }; + + struct TWriteSlot { + TChunk* volatile WriteTo; + volatile ui32 WritePosition; + char Padding[64 - sizeof(TChunk*) - sizeof(ui32)]; // 1 slot per cache line + }; + + static_assert(sizeof(TReadSlot) == 64, "expect sizeof(TReadSlot) == 64"); + static_assert(sizeof(TWriteSlot) == 64, "expect sizeof(TWriteSlot) == 64"); + +private: + TReadSlot ReadSlots[Concurrency]; + TWriteSlot WriteSlots[Concurrency]; + + static_assert(sizeof(TChunk*) == sizeof(TAtomic), "expect sizeof(TChunk*) == sizeof(TAtomic)"); + +private: + struct TLockedWriter { + TWriteSlot* Slot; + TChunk* WriteTo; + + TLockedWriter() + : Slot(nullptr) + , WriteTo(nullptr) + { } + + TLockedWriter(TWriteSlot* slot, TChunk* writeTo) + : Slot(slot) + , WriteTo(writeTo) + { } + + ~TLockedWriter() noexcept { + Drop(); + } + + void Drop() { + if (Slot) { + AtomicStore(&Slot->WriteTo, WriteTo); + Slot = nullptr; + } + } + + TLockedWriter(const TLockedWriter&) = delete; + TLockedWriter& operator=(const TLockedWriter&) = delete; + + TLockedWriter(TLockedWriter&& rhs) + : Slot(rhs.Slot) + , WriteTo(rhs.WriteTo) + { + rhs.Slot = nullptr; + } + + TLockedWriter& operator=(TLockedWriter&& rhs) { + if (Y_LIKELY(this != &rhs)) { + Drop(); + Slot = rhs.Slot; + WriteTo = rhs.WriteTo; + rhs.Slot = nullptr; + } + return *this; + } + }; + +private: + TLockedWriter LockWriter(ui64 writerRotation) { + ui32 cycle = 0; + for (;;) { + TWriteSlot* slot = &WriteSlots[writerRotation % Concurrency]; + if (AtomicLoad(&slot->WriteTo) != nullptr) { + if (TChunk* writeTo = AtomicSwap(&slot->WriteTo, nullptr)) { + return TLockedWriter(slot, writeTo); + } + } + ++writerRotation; + + // Do a spinlock pause after a full cycle + if (++cycle == Concurrency) { + SpinLockPause(); + cycle = 0; + } + } + } + + void WriteOne(TLockedWriter& lock, T x) { + Y_VERIFY_DEBUG(x != 0); + + const ui32 pos = AtomicLoad(&lock.Slot->WritePosition); + if (pos != TChunk::EntriesCount) { + AtomicStore(&lock.Slot->WritePosition, pos + 1); + AtomicStore(&lock.WriteTo->Entries[pos], x); + } else { + TChunk* next = new TChunk(); + AtomicStore(&next->Entries[0], x); + AtomicStore(&lock.Slot->WritePosition, 1u); + AtomicStore(&lock.WriteTo->Next, next); + lock.WriteTo = next; + } + } + +public: + TUnorderedCache() { + for (ui32 i = 0; i < Concurrency; ++i) { + ReadSlots[i].ReadFrom = new TChunk(); + ReadSlots[i].ReadPosition = 0; + + WriteSlots[i].WriteTo = ReadSlots[i].ReadFrom; + WriteSlots[i].WritePosition = 0; + } + } + + ~TUnorderedCache() { + Y_VERIFY(!Pop(0)); + + for (ui64 i = 0; i < Concurrency; ++i) { + if (ReadSlots[i].ReadFrom) { + delete ReadSlots[i].ReadFrom; + ReadSlots[i].ReadFrom = nullptr; + } + WriteSlots[i].WriteTo = nullptr; + } + } + + T Pop(ui64 readerRotation) noexcept { + ui64 readerIndex = readerRotation; + const ui64 endIndex = readerIndex + Concurrency; + for (; readerIndex != endIndex; ++readerIndex) { + TReadSlot* slot = &ReadSlots[readerIndex % Concurrency]; + if (AtomicLoad(&slot->ReadFrom) != nullptr) { + if (TChunk* readFrom = AtomicSwap(&slot->ReadFrom, nullptr)) { + const ui32 pos = AtomicLoad(&slot->ReadPosition); + if (pos != TChunk::EntriesCount) { + if (T ret = AtomicLoad(&readFrom->Entries[pos])) { + AtomicStore(&slot->ReadPosition, pos + 1); + AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk + return ret; // found, return + } else { + AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk + } + } else if (TChunk* next = AtomicLoad(&readFrom->Next)) { + if (T ret = AtomicLoad(&next->Entries[0])) { + AtomicStore(&slot->ReadPosition, 1u); + AtomicStore(&slot->ReadFrom, next); // release lock with next chunk + delete readFrom; + return ret; + } else { + AtomicStore(&slot->ReadPosition, 0u); + AtomicStore(&slot->ReadFrom, next); // release lock with new chunk + delete readFrom; + } + } else { + // nothing in old chunk and no next chunk, just release lock with old chunk + AtomicStore(&slot->ReadFrom, readFrom); + } + } + } + } + + return 0; // got nothing after full cycle, return + } + + void Push(T x, ui64 writerRotation) { + TLockedWriter lock = LockWriter(writerRotation); + WriteOne(lock, x); + } + + void PushBulk(T* x, ui32 xcount, ui64 writerRotation) { + for (;;) { + // Fill no more then one queue chunk per round + const ui32 xround = Min(xcount, (ui32)TChunk::EntriesCount); + + { + TLockedWriter lock = LockWriter(writerRotation++); + for (T* end = x + xround; x != end; ++x) + WriteOne(lock, *x); + } + + if (xcount <= TChunk::EntriesCount) + break; + + xcount -= TChunk::EntriesCount; + } + } +}; diff --git a/library/cpp/actors/util/unordered_cache_ut.cpp b/library/cpp/actors/util/unordered_cache_ut.cpp new file mode 100644 index 0000000000..37865f2f91 --- /dev/null +++ b/library/cpp/actors/util/unordered_cache_ut.cpp @@ -0,0 +1,138 @@ +#include "unordered_cache.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <util/random/random.h> +#include <util/system/hp_timer.h> +#include <util/system/sanitizers.h> +#include <util/system/thread.h> + +Y_UNIT_TEST_SUITE(UnorderedCache) { + + void DoOnePushOnePop(ui64 count) { + TUnorderedCache<ui64> queue; + + ui64 readRotation = 0; + ui64 writeRotation = 0; + + auto popped = queue.Pop(readRotation++); + UNIT_ASSERT_VALUES_EQUAL(popped, 0u); + + for (ui64 i = 0; i < count; ++i) { + queue.Push(i + 1, writeRotation++); + popped = queue.Pop(readRotation++); + UNIT_ASSERT_VALUES_EQUAL(popped, i + 1); + + popped = queue.Pop(readRotation++); + UNIT_ASSERT_VALUES_EQUAL(popped, 0u); + } + } + + Y_UNIT_TEST(OnePushOnePop) { + DoOnePushOnePop(1); + } + + Y_UNIT_TEST(OnePushOnePop_Repeat1M) { + DoOnePushOnePop(1000000); + } + + /** + * Simplified thread spawning for testing + */ + class TWorkerThread : public ISimpleThread { + private: + std::function<void()> Func; + double Time = 0.0; + + public: + TWorkerThread(std::function<void()> func) + : Func(std::move(func)) + { } + + double GetTime() const { + return Time; + } + + static THolder<TWorkerThread> Spawn(std::function<void()> func) { + THolder<TWorkerThread> thread = MakeHolder<TWorkerThread>(std::move(func)); + thread->Start(); + return thread; + } + + private: + void* ThreadProc() noexcept override { + THPTimer timer; + Func(); + Time = timer.Passed(); + return nullptr; + } + }; + + void DoConcurrentPushPop(size_t threads, ui64 perThreadCount) { + // Concurrency factor 4 is up to 16 threads + TUnorderedCache<ui64, 512, 4> queue; + + auto workerFunc = [&](size_t threadIndex) { + ui64 readRotation = 0; + ui64 writeRotation = 0; + ui64 readsDone = 0; + ui64 writesDone = 0; + for (;;) { + bool canRead = readsDone < writesDone; + bool canWrite = writesDone < perThreadCount; + if (!canRead && !canWrite) { + break; + } + if (canRead && canWrite) { + // Randomly choose between read and write + if (RandomNumber<ui64>(2)) { + canRead = false; + } else { + canWrite = false; + } + } + if (canRead) { + ui64 popped = queue.Pop(readRotation++); + if (popped) { + ++readsDone; + } + } + if (canWrite) { + queue.Push(1 + writesDone * threads + threadIndex, writeRotation++); + ++writesDone; + } + } + }; + + TVector<THolder<TWorkerThread>> workers(threads); + for (size_t i = 0; i < threads; ++i) { + workers[i] = TWorkerThread::Spawn([workerFunc, i]() { + workerFunc(i); + }); + } + + double maxTime = 0; + for (size_t i = 0; i < threads; ++i) { + workers[i]->Join(); + maxTime = Max(maxTime, workers[i]->GetTime()); + } + + auto popped = queue.Pop(0); + UNIT_ASSERT_VALUES_EQUAL(popped, 0u); + + Cerr << "Concurrent with " << threads << " threads: " << maxTime << " seconds" << Endl; + } + + void DoConcurrentPushPop_3times(size_t threads, ui64 perThreadCount) { + for (size_t i = 0; i < 3; ++i) { + DoConcurrentPushPop(threads, perThreadCount); + } + } + + static constexpr ui64 PER_THREAD_COUNT = NSan::PlainOrUnderSanitizer(1000000, 100000); + + Y_UNIT_TEST(ConcurrentPushPop_1thread) { DoConcurrentPushPop_3times(1, PER_THREAD_COUNT); } + Y_UNIT_TEST(ConcurrentPushPop_2threads) { DoConcurrentPushPop_3times(2, PER_THREAD_COUNT); } + Y_UNIT_TEST(ConcurrentPushPop_4threads) { DoConcurrentPushPop_3times(4, PER_THREAD_COUNT); } + Y_UNIT_TEST(ConcurrentPushPop_8threads) { DoConcurrentPushPop_3times(8, PER_THREAD_COUNT); } + Y_UNIT_TEST(ConcurrentPushPop_16threads) { DoConcurrentPushPop_3times(16, PER_THREAD_COUNT); } +} diff --git a/library/cpp/actors/util/ut/ya.make b/library/cpp/actors/util/ut/ya.make new file mode 100644 index 0000000000..3b08b77984 --- /dev/null +++ b/library/cpp/actors/util/ut/ya.make @@ -0,0 +1,18 @@ +UNITTEST_FOR(library/cpp/actors/util) + +IF (WITH_VALGRIND) + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +OWNER( + alexvru + g:kikimr +) + +SRCS( + rope_ut.cpp + unordered_cache_ut.cpp +) + +END() diff --git a/library/cpp/actors/util/ya.make b/library/cpp/actors/util/ya.make new file mode 100644 index 0000000000..37488c3962 --- /dev/null +++ b/library/cpp/actors/util/ya.make @@ -0,0 +1,37 @@ +LIBRARY() + +OWNER( + ddoarn + g:kikimr +) + +SRCS( + affinity.cpp + affinity.h + cpumask.h + datetime.h + defs.h + funnel_queue.h + futex.h + intrinsics.h + local_process_key.h + named_tuple.h + queue_chunk.h + queue_oneone_inplace.h + recentwnd.h + rope.h + should_continue.cpp + should_continue.h + thread.h + threadparkpad.cpp + threadparkpad.h + ticket_lock.h + timerfd.h + unordered_cache.h +) + +PEERDIR( + util +) + +END() |