aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/util
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/util
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/util')
-rw-r--r--library/cpp/actors/util/affinity.cpp93
-rw-r--r--library/cpp/actors/util/affinity.h49
-rw-r--r--library/cpp/actors/util/cpumask.h133
-rw-r--r--library/cpp/actors/util/datetime.h82
-rw-r--r--library/cpp/actors/util/defs.h16
-rw-r--r--library/cpp/actors/util/funnel_queue.h240
-rw-r--r--library/cpp/actors/util/futex.h13
-rw-r--r--library/cpp/actors/util/intrinsics.h97
-rw-r--r--library/cpp/actors/util/local_process_key.h132
-rw-r--r--library/cpp/actors/util/named_tuple.h30
-rw-r--r--library/cpp/actors/util/queue_chunk.h29
-rw-r--r--library/cpp/actors/util/queue_oneone_inplace.h118
-rw-r--r--library/cpp/actors/util/recentwnd.h67
-rw-r--r--library/cpp/actors/util/rope.h1161
-rw-r--r--library/cpp/actors/util/rope_cont_deque.h181
-rw-r--r--library/cpp/actors/util/rope_cont_list.h159
-rw-r--r--library/cpp/actors/util/rope_ut.cpp231
-rw-r--r--library/cpp/actors/util/should_continue.cpp23
-rw-r--r--library/cpp/actors/util/should_continue.h22
-rw-r--r--library/cpp/actors/util/thread.h26
-rw-r--r--library/cpp/actors/util/threadparkpad.cpp148
-rw-r--r--library/cpp/actors/util/threadparkpad.h21
-rw-r--r--library/cpp/actors/util/ticket_lock.h48
-rw-r--r--library/cpp/actors/util/timerfd.h65
-rw-r--r--library/cpp/actors/util/unordered_cache.h201
-rw-r--r--library/cpp/actors/util/unordered_cache_ut.cpp138
-rw-r--r--library/cpp/actors/util/ut/ya.make18
-rw-r--r--library/cpp/actors/util/ya.make37
28 files changed, 3578 insertions, 0 deletions
diff --git a/library/cpp/actors/util/affinity.cpp b/library/cpp/actors/util/affinity.cpp
new file mode 100644
index 0000000000..cc1b6e70ec
--- /dev/null
+++ b/library/cpp/actors/util/affinity.cpp
@@ -0,0 +1,93 @@
+#include "affinity.h"
+
+#ifdef _linux_
+#include <sched.h>
+#endif
+
+class TAffinity::TImpl {
+#ifdef _linux_
+ cpu_set_t Mask;
+#endif
+public:
+ TImpl() {
+#ifdef _linux_
+ int ar = sched_getaffinity(0, sizeof(cpu_set_t), &Mask);
+ Y_VERIFY_DEBUG(ar == 0);
+#endif
+ }
+
+ explicit TImpl(const ui8* cpus, ui32 size) {
+#ifdef _linux_
+ CPU_ZERO(&Mask);
+ for (ui32 i = 0; i != size; ++i) {
+ if (cpus[i]) {
+ CPU_SET(i, &Mask);
+ }
+ }
+#else
+ Y_UNUSED(cpus);
+ Y_UNUSED(size);
+#endif
+ }
+
+ void Set() const {
+#ifdef _linux_
+ int ar = sched_setaffinity(0, sizeof(cpu_set_t), &Mask);
+ Y_VERIFY_DEBUG(ar == 0);
+#endif
+ }
+
+ operator TCpuMask() const {
+ TCpuMask result;
+#ifdef _linux_
+ for (ui32 i = 0; i != CPU_SETSIZE; ++i) {
+ result.Cpus.emplace_back(CPU_ISSET(i, &Mask));
+ }
+ result.RemoveTrailingZeros();
+#endif
+ return result;
+ }
+
+};
+
+TAffinity::TAffinity() {
+}
+
+TAffinity::~TAffinity() {
+}
+
+TAffinity::TAffinity(const ui8* x, ui32 sz) {
+ if (x && sz) {
+ Impl.Reset(new TImpl(x, sz));
+ }
+}
+
+TAffinity::TAffinity(const TCpuMask& mask) {
+ if (!mask.IsEmpty()) {
+ static_assert(sizeof(ui8) == sizeof(mask.Cpus[0]));
+ const ui8* x = reinterpret_cast<const ui8*>(&mask.Cpus[0]);
+ const ui32 sz = mask.Size();
+ Impl.Reset(new TImpl(x, sz));
+ }
+}
+
+void TAffinity::Current() {
+ Impl.Reset(new TImpl());
+}
+
+void TAffinity::Set() const {
+ if (!!Impl) {
+ Impl->Set();
+ }
+}
+
+bool TAffinity::Empty() const {
+ return !Impl;
+}
+
+TAffinity::operator TCpuMask() const {
+ if (!!Impl) {
+ return *Impl;
+ }
+ return TCpuMask();
+}
diff --git a/library/cpp/actors/util/affinity.h b/library/cpp/actors/util/affinity.h
new file mode 100644
index 0000000000..ae106ed180
--- /dev/null
+++ b/library/cpp/actors/util/affinity.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include "defs.h"
+#include "cpumask.h"
+
+// Platform-specific class to set or get thread affinity
+class TAffinity: public TThrRefBase, TNonCopyable {
+ class TImpl;
+ THolder<TImpl> Impl;
+
+public:
+ TAffinity();
+ TAffinity(const ui8* cpus, ui32 size);
+ explicit TAffinity(const TCpuMask& mask);
+ ~TAffinity();
+
+ void Current();
+ void Set() const;
+ bool Empty() const;
+
+ operator TCpuMask() const;
+};
+
+// Scoped affinity setter
+class TAffinityGuard : TNonCopyable {
+ bool Stacked;
+ TAffinity OldAffinity;
+
+public:
+ TAffinityGuard(const TAffinity* affinity) {
+ Stacked = false;
+ if (affinity && !affinity->Empty()) {
+ OldAffinity.Current();
+ affinity->Set();
+ Stacked = true;
+ }
+ }
+
+ ~TAffinityGuard() {
+ Release();
+ }
+
+ void Release() {
+ if (Stacked) {
+ OldAffinity.Set();
+ Stacked = false;
+ }
+ }
+};
diff --git a/library/cpp/actors/util/cpumask.h b/library/cpp/actors/util/cpumask.h
new file mode 100644
index 0000000000..29741aa1d6
--- /dev/null
+++ b/library/cpp/actors/util/cpumask.h
@@ -0,0 +1,133 @@
+#pragma once
+
+#include "defs.h"
+
+#include <library/cpp/containers/stack_vector/stack_vec.h>
+
+#include <util/string/split.h>
+#include <util/generic/yexception.h>
+
+using TCpuId = ui32;
+
+// Simple data structure to operate with set of cpus
+struct TCpuMask {
+ TStackVec<bool, 1024> Cpus;
+
+ // Creates empty mask
+ TCpuMask() {}
+
+ // Creates mask with single cpu set
+ explicit TCpuMask(TCpuId cpuId) {
+ Set(cpuId);
+ }
+
+ // Initialize mask from raw boolean array
+ template <class T>
+ TCpuMask(const T* cpus, TCpuId size) {
+ Cpus.reserve(size);
+ for (TCpuId i = 0; i != size; ++i) {
+ Cpus.emplace_back(bool(cpus[i]));
+ }
+ }
+
+ // Parse a numerical list of processors. The numbers are separated by commas and may include ranges. For example: 0,5,7,9-11
+ explicit TCpuMask(const TString& cpuList) {
+ try {
+ for (TStringBuf s : StringSplitter(cpuList).Split(',')) {
+ TCpuId l, r;
+ if (s.find('-') != TString::npos) {
+ StringSplitter(s).Split('-').CollectInto(&l, &r);
+ } else {
+ l = r = FromString<TCpuId>(s);
+ }
+ if (r >= Cpus.size()) {
+ Cpus.resize(r + 1, false);
+ }
+ for (TCpuId cpu = l; cpu <= r; cpu++) {
+ Cpus[cpu] = true;
+ }
+ }
+ } catch (...) {
+ ythrow TWithBackTrace<yexception>() << "Exception occured while parsing cpu list '" << cpuList << "': " << CurrentExceptionMessage();
+ }
+ }
+
+ // Returns size of underlying vector
+ TCpuId Size() const {
+ return Cpus.size();
+ }
+
+ // Returns number of set bits in mask
+ TCpuId CpuCount() const {
+ TCpuId result = 0;
+ for (bool value : Cpus) {
+ result += value;
+ }
+ return result;
+ }
+
+ bool IsEmpty() const {
+ for (bool value : Cpus) {
+ if (value) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool IsSet(TCpuId cpu) const {
+ return cpu < Cpus.size() && Cpus[cpu];
+ }
+
+ void Set(TCpuId cpu) {
+ if (cpu >= Cpus.size()) {
+ Cpus.resize(cpu + 1, false);
+ }
+ Cpus[cpu] = true;
+ }
+
+ void Reset(TCpuId cpu) {
+ if (cpu < Cpus.size()) {
+ Cpus[cpu] = false;
+ }
+ }
+
+ void RemoveTrailingZeros() {
+ while (!Cpus.empty() && !Cpus.back()) {
+ Cpus.pop_back();
+ }
+ }
+
+ explicit operator bool() const {
+ return !IsEmpty();
+ }
+
+ TCpuMask operator &(const TCpuMask& rhs) const {
+ TCpuMask result;
+ TCpuId size = Max(Size(), rhs.Size());
+ result.Cpus.reserve(size);
+ for (TCpuId cpu = 0; cpu < size; cpu++) {
+ result.Cpus.emplace_back(IsSet(cpu) && rhs.IsSet(cpu));
+ }
+ return result;
+ }
+
+ TCpuMask operator |(const TCpuMask& rhs) const {
+ TCpuMask result;
+ TCpuId size = Max(Size(), rhs.Size());
+ result.Cpus.reserve(size);
+ for (TCpuId cpu = 0; cpu < size; cpu++) {
+ result.Cpus.emplace_back(IsSet(cpu) || rhs.IsSet(cpu));
+ }
+ return result;
+ }
+
+ TCpuMask operator -(const TCpuMask& rhs) const {
+ TCpuMask result;
+ result.Cpus.reserve(Size());
+ for (TCpuId cpu = 0; cpu < Size(); cpu++) {
+ result.Cpus.emplace_back(IsSet(cpu) && !rhs.IsSet(cpu));
+ }
+ return result;
+ }
+};
diff --git a/library/cpp/actors/util/datetime.h b/library/cpp/actors/util/datetime.h
new file mode 100644
index 0000000000..cbec5965d6
--- /dev/null
+++ b/library/cpp/actors/util/datetime.h
@@ -0,0 +1,82 @@
+#pragma once
+
+#include <util/system/defaults.h>
+#include <util/system/hp_timer.h>
+#include <util/system/platform.h>
+
+#if defined(_win_)
+#include <intrin.h>
+#pragma intrinsic(__rdtsc)
+#endif // _win_
+
+#if defined(_darwin_) && !defined(_x86_)
+#include <mach/mach_time.h>
+#endif
+
+// GetCycleCount() from util/system/datetime.h uses rdtscp, which is more accurate than rdtsc,
+// but rdtscp disables processor's out-of-order execution, so it can be slow
+Y_FORCE_INLINE ui64 GetCycleCountFast() {
+#if defined(_MSC_VER)
+ // Generates the rdtsc instruction, which returns the processor time stamp.
+ // The processor time stamp records the number of clock cycles since the last reset.
+ return __rdtsc();
+#elif defined(__clang__) && !defined(_arm64_)
+ return __builtin_readcyclecounter();
+#elif defined(_x86_64_)
+ unsigned hi, lo;
+ __asm__ __volatile__("rdtsc"
+ : "=a"(lo), "=d"(hi));
+ return ((unsigned long long)lo) | (((unsigned long long)hi) << 32);
+#elif defined(_i386_)
+ ui64 x;
+ __asm__ volatile("rdtsc\n\t"
+ : "=A"(x));
+ return x;
+#elif defined(_darwin_)
+ return mach_absolute_time();
+#elif defined(_arm32_)
+ return MicroSeconds();
+#elif defined(_arm64_)
+ ui64 x;
+
+ __asm__ __volatile__("isb; mrs %0, cntvct_el0"
+ : "=r"(x));
+
+ return x;
+#else
+#error "unsupported arch"
+#endif
+}
+
+// NHPTimer::GetTime fast analog
+Y_FORCE_INLINE void GetTimeFast(NHPTimer::STime* pTime) noexcept {
+ *pTime = GetCycleCountFast();
+}
+
+namespace NActors {
+ inline double Ts2Ns(ui64 ts) {
+ return NHPTimer::GetSeconds(ts) * 1e9;
+ }
+
+ inline double Ts2Us(ui64 ts) {
+ return NHPTimer::GetSeconds(ts) * 1e6;
+ }
+
+ inline double Ts2Ms(ui64 ts) {
+ return NHPTimer::GetSeconds(ts) * 1e3;
+ }
+
+ inline ui64 Us2Ts(double us) {
+ return ui64(NHPTimer::GetClockRate() * us / 1e6);
+ }
+
+ struct TTimeTracker {
+ ui64 Ts;
+ TTimeTracker(): Ts(GetCycleCountFast()) {}
+ ui64 Elapsed() {
+ ui64 ts = GetCycleCountFast();
+ std::swap(Ts, ts);
+ return Ts - ts;
+ }
+ };
+}
diff --git a/library/cpp/actors/util/defs.h b/library/cpp/actors/util/defs.h
new file mode 100644
index 0000000000..5c3b57665b
--- /dev/null
+++ b/library/cpp/actors/util/defs.h
@@ -0,0 +1,16 @@
+#pragma once
+
+// unique tag to fix pragma once gcc glueing: ./library/actors/util/defs.h
+
+#include <util/system/defaults.h>
+#include <util/generic/bt_exception.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/yexception.h>
+#include <util/system/atomic.h>
+#include <util/system/align.h>
+#include <util/generic/vector.h>
+#include <util/datetime/base.h>
+#include <util/generic/ylimits.h>
+#include "intrinsics.h"
diff --git a/library/cpp/actors/util/funnel_queue.h b/library/cpp/actors/util/funnel_queue.h
new file mode 100644
index 0000000000..0e21e2617c
--- /dev/null
+++ b/library/cpp/actors/util/funnel_queue.h
@@ -0,0 +1,240 @@
+#pragma once
+
+#include <util/system/atomic.h>
+#include <util/generic/noncopyable.h>
+
+template <typename ElementType>
+class TFunnelQueue: private TNonCopyable {
+public:
+ TFunnelQueue() noexcept
+ : Front(nullptr)
+ , Back(nullptr)
+ {
+ }
+
+ virtual ~TFunnelQueue() noexcept {
+ for (auto entry = Front; entry; entry = DeleteEntry(entry))
+ continue;
+ }
+
+ /// Push element. Can be used from many threads. Return true if is first element.
+ bool
+ Push(ElementType&& element) noexcept {
+ TEntry* const next = NewEntry(static_cast<ElementType&&>(element));
+ TEntry* const prev = AtomicSwap(&Back, next);
+ AtomicSet(prev ? prev->Next : Front, next);
+ return !prev;
+ }
+
+ /// Extract top element. Must be used only from one thread. Return true if have more.
+ bool
+ Pop() noexcept {
+ if (TEntry* const top = AtomicGet(Front)) {
+ const auto last = AtomicCas(&Back, nullptr, top);
+ if (last) // This is last element in queue. Queue is empty now.
+ AtomicCas(&Front, nullptr, top);
+ else // This element is not last.
+ for (;;) {
+ if (const auto next = AtomicGet(top->Next)) {
+ AtomicSet(Front, next);
+ break;
+ }
+ // But Next is null. Wait next assignment in spin lock.
+ }
+
+ DeleteEntry(top);
+ return !last;
+ }
+
+ return false;
+ }
+
+ /// Peek top element. Must be used only from one thread.
+ ElementType&
+ Top() const noexcept {
+ return AtomicGet(Front)->Data;
+ }
+
+ bool
+ IsEmpty() const noexcept {
+ return !AtomicGet(Front);
+ }
+
+protected:
+ class TEntry: private TNonCopyable {
+ friend class TFunnelQueue;
+
+ private:
+ explicit TEntry(ElementType&& element) noexcept
+ : Data(static_cast<ElementType&&>(element))
+ , Next(nullptr)
+ {
+ }
+
+ ~TEntry() noexcept {
+ }
+
+ public:
+ ElementType Data;
+ TEntry* volatile Next;
+ };
+
+ TEntry* volatile Front;
+ TEntry* volatile Back;
+
+ virtual TEntry* NewEntry(ElementType&& element) noexcept {
+ return new TEntry(static_cast<ElementType&&>(element));
+ }
+
+ virtual TEntry* DeleteEntry(TEntry* entry) noexcept {
+ const auto next = entry->Next;
+ delete entry;
+ return next;
+ }
+
+protected:
+ struct TEntryIter {
+ TEntry* ptr;
+
+ ElementType& operator*() {
+ return ptr->Data;
+ }
+
+ ElementType* operator->() {
+ return &ptr->Data;
+ }
+
+ TEntryIter& operator++() {
+ ptr = AtomicGet(ptr->Next);
+ return *this;
+ }
+
+ bool operator!=(const TEntryIter& other) const {
+ return ptr != other.ptr;
+ }
+
+ bool operator==(const TEntryIter& other) const {
+ return ptr == other.ptr;
+ }
+ };
+
+ struct TConstEntryIter {
+ const TEntry* ptr;
+
+ const ElementType& operator*() {
+ return ptr->Data;
+ }
+
+ const ElementType* operator->() {
+ return &ptr->Data;
+ }
+
+ TEntryIter& operator++() {
+ ptr = AtomicGet(ptr->Next);
+ return *this;
+ }
+
+ bool operator!=(const TConstEntryIter& other) const {
+ return ptr != other.ptr;
+ }
+
+ bool operator==(const TConstEntryIter& other) const {
+ return ptr == other.ptr;
+ }
+ };
+
+public:
+ using const_iterator = TConstEntryIter;
+ using iterator = TEntryIter;
+
+ iterator begin() {
+ return {AtomicGet(Front)};
+ }
+ const_iterator cbegin() {
+ return {AtomicGet(Front)};
+ }
+ const_iterator begin() const {
+ return {AtomicGet(Front)};
+ }
+
+ iterator end() {
+ return {nullptr};
+ }
+ const_iterator cend() {
+ return {nullptr};
+ }
+ const_iterator end() const {
+ return {nullptr};
+ }
+};
+
+template <typename ElementType>
+class TPooledFunnelQueue: public TFunnelQueue<ElementType> {
+public:
+ TPooledFunnelQueue() noexcept
+ : Stack(nullptr)
+ {
+ }
+
+ virtual ~TPooledFunnelQueue() noexcept override {
+ for (auto entry = TBase::Front; entry; entry = TBase::DeleteEntry(entry))
+ continue;
+ for (auto entry = Stack; entry; entry = TBase::DeleteEntry(entry))
+ continue;
+ TBase::Back = TBase::Front = Stack = nullptr;
+ }
+
+private:
+ typedef TFunnelQueue<ElementType> TBase;
+
+ typename TBase::TEntry* volatile Stack;
+
+protected:
+ virtual typename TBase::TEntry* NewEntry(ElementType&& element) noexcept override {
+ while (const auto top = AtomicGet(Stack))
+ if (AtomicCas(&Stack, top->Next, top)) {
+ top->Data = static_cast<ElementType&&>(element);
+ AtomicSet(top->Next, nullptr);
+ return top;
+ }
+
+ return TBase::NewEntry(static_cast<ElementType&&>(element));
+ }
+
+ virtual typename TBase::TEntry* DeleteEntry(typename TBase::TEntry* entry) noexcept override {
+ entry->Data = ElementType();
+ const auto next = entry->Next;
+ do
+ AtomicSet(entry->Next, AtomicGet(Stack));
+ while (!AtomicCas(&Stack, entry, entry->Next));
+ return next;
+ }
+};
+
+template <typename ElementType, template <typename T> class TQueueType = TFunnelQueue>
+class TCountedFunnelQueue: public TQueueType<ElementType> {
+public:
+ TCountedFunnelQueue() noexcept
+ : Count(0)
+ {
+ }
+
+ TAtomicBase GetSize() const noexcept {
+ return AtomicGet(Count);
+ }
+
+private:
+ typedef TQueueType<ElementType> TBase;
+
+ virtual typename TBase::TEntry* NewEntry(ElementType&& element) noexcept override {
+ AtomicAdd(Count, 1);
+ return TBase::NewEntry(static_cast<ElementType&&>(element));
+ }
+
+ virtual typename TBase::TEntry* DeleteEntry(typename TBase::TEntry* entry) noexcept override {
+ AtomicSub(Count, 1);
+ return TBase::DeleteEntry(entry);
+ }
+
+ TAtomic Count;
+};
diff --git a/library/cpp/actors/util/futex.h b/library/cpp/actors/util/futex.h
new file mode 100644
index 0000000000..c193f8d128
--- /dev/null
+++ b/library/cpp/actors/util/futex.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#ifdef _linux_
+
+#include <linux/futex.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+
+static long SysFutex(void* addr1, int op, int val1, struct timespec* timeout, void* addr2, int val3) {
+ return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+
+#endif
diff --git a/library/cpp/actors/util/intrinsics.h b/library/cpp/actors/util/intrinsics.h
new file mode 100644
index 0000000000..df07e36896
--- /dev/null
+++ b/library/cpp/actors/util/intrinsics.h
@@ -0,0 +1,97 @@
+#pragma once
+
+#include <util/system/defaults.h>
+#include <util/system/atomic.h>
+#include <util/system/spinlock.h>
+
+#include <library/cpp/sse/sse.h> // The header chooses appropriate SSE support
+
+static_assert(sizeof(TAtomic) == 8, "expect sizeof(TAtomic) == 8");
+
+// we need explicit 32 bit operations to keep cache-line friendly packs
+// so have to define some atomics additionaly to arcadia one
+#ifdef _win_
+#pragma intrinsic(_InterlockedCompareExchange)
+#pragma intrinsic(_InterlockedExchangeAdd)
+#pragma intrinsic(_InterlockedIncrement)
+#pragma intrinsic(_InterlockedDecrement)
+#endif
+
+inline bool AtomicUi32Cas(volatile ui32* a, ui32 exchange, ui32 compare) {
+#ifdef _win_
+ return _InterlockedCompareExchange((volatile long*)a, exchange, compare) == (long)compare;
+#else
+ ui32 expected = compare;
+ return __atomic_compare_exchange_n(a, &expected, exchange, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
+#endif
+}
+
+inline ui32 AtomicUi32Add(volatile ui32* a, ui32 add) {
+#ifdef _win_
+ return _InterlockedExchangeAdd((volatile long*)a, add) + add;
+#else
+ return __atomic_add_fetch(a, add, __ATOMIC_SEQ_CST);
+#endif
+}
+
+inline ui32 AtomicUi32Sub(volatile ui32* a, ui32 sub) {
+#ifdef _win_
+ return _InterlockedExchangeAdd((volatile long*)a, -(long)sub) - sub;
+#else
+ return __atomic_sub_fetch(a, sub, __ATOMIC_SEQ_CST);
+#endif
+}
+
+inline ui32 AtomicUi32Increment(volatile ui32* a) {
+#ifdef _win_
+ return _InterlockedIncrement((volatile long*)a);
+#else
+ return __atomic_add_fetch(a, 1, __ATOMIC_SEQ_CST);
+#endif
+}
+
+inline ui32 AtomicUi32Decrement(volatile ui32* a) {
+#ifdef _win_
+ return _InterlockedDecrement((volatile long*)a);
+#else
+ return __atomic_sub_fetch(a, 1, __ATOMIC_SEQ_CST);
+#endif
+}
+
+template <typename T>
+inline void AtomicStore(volatile T* a, T x) {
+ static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
+#ifdef _win_
+ *a = x;
+#else
+ __atomic_store_n(a, x, __ATOMIC_RELEASE);
+#endif
+}
+
+template <typename T>
+inline void RelaxedStore(volatile T* a, T x) {
+ static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
+#ifdef _win_
+ *a = x;
+#else
+ __atomic_store_n(a, x, __ATOMIC_RELAXED);
+#endif
+}
+
+template <typename T>
+inline T AtomicLoad(volatile T* a) {
+#ifdef _win_
+ return *a;
+#else
+ return __atomic_load_n(a, __ATOMIC_ACQUIRE);
+#endif
+}
+
+template <typename T>
+inline T RelaxedLoad(volatile T* a) {
+#ifdef _win_
+ return *a;
+#else
+ return __atomic_load_n(a, __ATOMIC_RELAXED);
+#endif
+}
diff --git a/library/cpp/actors/util/local_process_key.h b/library/cpp/actors/util/local_process_key.h
new file mode 100644
index 0000000000..172f08fc73
--- /dev/null
+++ b/library/cpp/actors/util/local_process_key.h
@@ -0,0 +1,132 @@
+#pragma once
+
+#include <util/string/builder.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/vector.h>
+#include <util/generic/hash.h>
+#include <util/generic/singleton.h>
+#include <util/generic/serialized_enum.h>
+
+template <typename T>
+class TLocalProcessKeyState {
+
+template <typename U, const char* Name>
+friend class TLocalProcessKey;
+template <typename U, typename EnumT>
+friend class TEnumProcessKey;
+
+public:
+ static TLocalProcessKeyState& GetInstance() {
+ return *Singleton<TLocalProcessKeyState<T>>();
+ }
+
+ size_t GetCount() const {
+ return StartIndex + Names.size();
+ }
+
+ TStringBuf GetNameByIndex(size_t index) const {
+ if (index < StartIndex) {
+ return StaticNames[index];
+ } else {
+ index -= StartIndex;
+ Y_ENSURE(index < Names.size());
+ return Names[index];
+ }
+ }
+
+ size_t GetIndexByName(TStringBuf name) const {
+ auto it = Map.find(name);
+ Y_ENSURE(it != Map.end());
+ return it->second;
+ }
+
+private:
+ size_t Register(TStringBuf name) {
+ auto x = Map.emplace(name, Names.size()+StartIndex);
+ if (x.second) {
+ Names.emplace_back(name);
+ }
+
+ return x.first->second;
+ }
+
+ size_t Register(TStringBuf name, ui32 index) {
+ Y_VERIFY(index < StartIndex);
+ auto x = Map.emplace(name, index);
+ Y_VERIFY(x.second || x.first->second == index);
+ StaticNames[index] = name;
+ return x.first->second;
+ }
+
+private:
+ static constexpr ui32 StartIndex = 2000;
+
+ TVector<TString> FillStaticNames() {
+ TVector<TString> staticNames;
+ staticNames.reserve(StartIndex);
+ for (ui32 i = 0; i < StartIndex; i++) {
+ staticNames.push_back(TStringBuilder() << "Activity_" << i);
+ }
+ return staticNames;
+ }
+
+ TVector<TString> StaticNames = FillStaticNames();
+ TVector<TString> Names;
+ THashMap<TString, size_t> Map;
+};
+
+template <typename T, const char* Name>
+class TLocalProcessKey {
+public:
+ static TStringBuf GetName() {
+ return Name;
+ }
+
+ static size_t GetIndex() {
+ return Index;
+ }
+
+private:
+ inline static size_t Index = TLocalProcessKeyState<T>::GetInstance().Register(Name);
+};
+
+template <typename T, typename EnumT>
+class TEnumProcessKey {
+public:
+ static TStringBuf GetName(EnumT key) {
+ return TLocalProcessKeyState<T>::GetInstance().GetNameByIndex(GetIndex(key));
+ }
+
+ static size_t GetIndex(EnumT key) {
+ ui32 index = static_cast<ui32>(key);
+ if (index < TLocalProcessKeyState<T>::StartIndex) {
+ return index;
+ }
+ Y_VERIFY(index < Enum2Index.size());
+ return Enum2Index[index];
+ }
+
+private:
+ inline static TVector<size_t> RegisterAll() {
+ static_assert(std::is_enum<EnumT>::value, "Enum is required");
+
+ TVector<size_t> enum2Index;
+ auto names = GetEnumNames<EnumT>();
+ ui32 maxId = 0;
+ for (const auto& [k, v] : names) {
+ maxId = Max(maxId, static_cast<ui32>(k));
+ }
+ enum2Index.resize(maxId+1);
+ for (ui32 i = 0; i <= maxId && i < TLocalProcessKeyState<T>::StartIndex; i++) {
+ enum2Index[i] = i;
+ }
+
+ for (const auto& [k, v] : names) {
+ ui32 enumId = static_cast<ui32>(k);
+ enum2Index[enumId] = TLocalProcessKeyState<T>::GetInstance().Register(v, enumId);
+ }
+ return enum2Index;
+ }
+
+ inline static TVector<size_t> Enum2Index = RegisterAll();
+};
diff --git a/library/cpp/actors/util/named_tuple.h b/library/cpp/actors/util/named_tuple.h
new file mode 100644
index 0000000000..67f185bba8
--- /dev/null
+++ b/library/cpp/actors/util/named_tuple.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include "defs.h"
+
+template <typename TDerived>
+struct TNamedTupleBase {
+ friend bool operator==(const TDerived& x, const TDerived& y) {
+ return x.ConvertToTuple() == y.ConvertToTuple();
+ }
+
+ friend bool operator!=(const TDerived& x, const TDerived& y) {
+ return x.ConvertToTuple() != y.ConvertToTuple();
+ }
+
+ friend bool operator<(const TDerived& x, const TDerived& y) {
+ return x.ConvertToTuple() < y.ConvertToTuple();
+ }
+
+ friend bool operator<=(const TDerived& x, const TDerived& y) {
+ return x.ConvertToTuple() <= y.ConvertToTuple();
+ }
+
+ friend bool operator>(const TDerived& x, const TDerived& y) {
+ return x.ConvertToTuple() > y.ConvertToTuple();
+ }
+
+ friend bool operator>=(const TDerived& x, const TDerived& y) {
+ return x.ConvertToTuple() >= y.ConvertToTuple();
+ }
+};
diff --git a/library/cpp/actors/util/queue_chunk.h b/library/cpp/actors/util/queue_chunk.h
new file mode 100644
index 0000000000..8a4e02d8cb
--- /dev/null
+++ b/library/cpp/actors/util/queue_chunk.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include "defs.h"
+
+template <typename T, ui32 TSize, typename TDerived>
+struct TQueueChunkDerived {
+ static const ui32 EntriesCount = (TSize - sizeof(TQueueChunkDerived*)) / sizeof(T);
+ static_assert(EntriesCount > 0, "expect EntriesCount > 0");
+
+ volatile T Entries[EntriesCount];
+ TDerived* volatile Next;
+
+ TQueueChunkDerived() {
+ memset(this, 0, sizeof(TQueueChunkDerived));
+ }
+};
+
+template <typename T, ui32 TSize>
+struct TQueueChunk {
+ static const ui32 EntriesCount = (TSize - sizeof(TQueueChunk*)) / sizeof(T);
+ static_assert(EntriesCount > 0, "expect EntriesCount > 0");
+
+ volatile T Entries[EntriesCount];
+ TQueueChunk* volatile Next;
+
+ TQueueChunk() {
+ memset(this, 0, sizeof(TQueueChunk));
+ }
+};
diff --git a/library/cpp/actors/util/queue_oneone_inplace.h b/library/cpp/actors/util/queue_oneone_inplace.h
new file mode 100644
index 0000000000..d7ec8bb21c
--- /dev/null
+++ b/library/cpp/actors/util/queue_oneone_inplace.h
@@ -0,0 +1,118 @@
+#pragma once
+
+#include "defs.h"
+#include "queue_chunk.h"
+
+template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>>
+class TOneOneQueueInplace : TNonCopyable {
+ static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::valuer");
+
+ TChunk* ReadFrom;
+ ui32 ReadPosition;
+ ui32 WritePosition;
+ TChunk* WriteTo;
+
+ friend class TReadIterator;
+
+public:
+ class TReadIterator {
+ TChunk* ReadFrom;
+ ui32 ReadPosition;
+
+ public:
+ TReadIterator(TChunk* readFrom, ui32 readPosition)
+ : ReadFrom(readFrom)
+ , ReadPosition(readPosition)
+ {
+ }
+
+ inline T Next() {
+ TChunk* head = ReadFrom;
+ if (ReadPosition != TChunk::EntriesCount) {
+ return AtomicLoad(&head->Entries[ReadPosition++]);
+ } else if (TChunk* next = AtomicLoad(&head->Next)) {
+ ReadFrom = next;
+ ReadPosition = 0;
+ return Next();
+ }
+ return T{};
+ }
+ };
+
+ TOneOneQueueInplace()
+ : ReadFrom(new TChunk())
+ , ReadPosition(0)
+ , WritePosition(0)
+ , WriteTo(ReadFrom)
+ {
+ }
+
+ ~TOneOneQueueInplace() {
+ Y_VERIFY_DEBUG(Head() == 0);
+ delete ReadFrom;
+ }
+
+ struct TPtrCleanDestructor {
+ static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
+ while (T head = x->Pop())
+ delete head;
+ delete x;
+ }
+ };
+
+ struct TCleanDestructor {
+ static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
+ while (x->Pop() != nullptr)
+ continue;
+ delete x;
+ }
+ };
+
+ struct TPtrCleanInplaceMallocDestructor {
+ template <typename TPtrVal>
+ static inline void Destroy(TOneOneQueueInplace<TPtrVal*, TSize>* x) noexcept {
+ while (TPtrVal* head = x->Pop()) {
+ head->~TPtrVal();
+ free(head);
+ }
+ delete x;
+ }
+ };
+
+ void Push(T x) noexcept {
+ if (WritePosition != TChunk::EntriesCount) {
+ AtomicStore(&WriteTo->Entries[WritePosition], x);
+ ++WritePosition;
+ } else {
+ TChunk* next = new TChunk();
+ next->Entries[0] = x;
+ AtomicStore(&WriteTo->Next, next);
+ WriteTo = next;
+ WritePosition = 1;
+ }
+ }
+
+ T Head() {
+ TChunk* head = ReadFrom;
+ if (ReadPosition != TChunk::EntriesCount) {
+ return AtomicLoad(&head->Entries[ReadPosition]);
+ } else if (TChunk* next = AtomicLoad(&head->Next)) {
+ ReadFrom = next;
+ delete head;
+ ReadPosition = 0;
+ return Head();
+ }
+ return T{};
+ }
+
+ T Pop() {
+ T ret = Head();
+ if (ret)
+ ++ReadPosition;
+ return ret;
+ }
+
+ TReadIterator Iterator() {
+ return TReadIterator(ReadFrom, ReadPosition);
+ }
+};
diff --git a/library/cpp/actors/util/recentwnd.h b/library/cpp/actors/util/recentwnd.h
new file mode 100644
index 0000000000..ba1ede6f29
--- /dev/null
+++ b/library/cpp/actors/util/recentwnd.h
@@ -0,0 +1,67 @@
+#pragma once
+
+#include <util/generic/deque.h>
+
+template <typename TElem,
+ template <typename, typename...> class TContainer = TDeque>
+class TRecentWnd {
+public:
+ TRecentWnd(ui32 wndSize)
+ : MaxWndSize_(wndSize)
+ {
+ }
+
+ void Push(const TElem& elem) {
+ if (Window_.size() == MaxWndSize_)
+ Window_.erase(Window_.begin());
+ Window_.emplace_back(elem);
+ }
+
+ void Push(TElem&& elem) {
+ if (Window_.size() == MaxWndSize_)
+ Window_.erase(Window_.begin());
+ Window_.emplace_back(std::move(elem));
+ }
+
+ TElem& Last() {
+ return Window_.back();
+ }
+ const TElem& Last() const {
+ return Window_.back();
+ }
+ bool Full() const {
+ return Window_.size() == MaxWndSize_;
+ }
+ ui64 Size() const {
+ return Window_.size();
+ }
+
+ using const_iterator = typename TContainer<TElem>::const_iterator;
+
+ const_iterator begin() {
+ return Window_.begin();
+ }
+ const_iterator end() {
+ return Window_.end();
+ }
+
+ void Reset(ui32 wndSize = 0) {
+ Window_.clear();
+ if (wndSize != 0) {
+ MaxWndSize_ = wndSize;
+ }
+ }
+
+ void ResetWnd(ui32 wndSize) {
+ Y_VERIFY(wndSize != 0);
+ MaxWndSize_ = wndSize;
+ if (Window_.size() > MaxWndSize_) {
+ Window_.erase(Window_.begin(),
+ Window_.begin() + Window_.size() - MaxWndSize_);
+ }
+ }
+
+private:
+ TContainer<TElem> Window_;
+ ui32 MaxWndSize_;
+};
diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h
new file mode 100644
index 0000000000..f5595efbaa
--- /dev/null
+++ b/library/cpp/actors/util/rope.h
@@ -0,0 +1,1161 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/hash_set.h>
+#include <util/stream/str.h>
+#include <util/system/sanitizers.h>
+#include <util/system/valgrind.h>
+
+// exactly one of them must be included
+#include "rope_cont_list.h"
+//#include "rope_cont_deque.h"
+
+struct IRopeChunkBackend : TThrRefBase {
+ using TData = std::tuple<const char*, size_t>;
+ virtual ~IRopeChunkBackend() = default;
+ virtual TData GetData() const = 0;
+ virtual size_t GetCapacity() const = 0;
+ using TPtr = TIntrusivePtr<IRopeChunkBackend>;
+};
+
+class TRopeAlignedBuffer : public IRopeChunkBackend {
+ static constexpr size_t Alignment = 16;
+ static constexpr size_t MallocAlignment = sizeof(size_t);
+
+ ui32 Size;
+ const ui32 Capacity;
+ const ui32 Offset;
+ alignas(Alignment) char Data[];
+
+ TRopeAlignedBuffer(size_t size)
+ : Size(size)
+ , Capacity(size)
+ , Offset((Alignment - reinterpret_cast<uintptr_t>(Data)) & (Alignment - 1))
+ {
+ Y_VERIFY(Offset <= Alignment - MallocAlignment);
+ }
+
+public:
+ static TIntrusivePtr<TRopeAlignedBuffer> Allocate(size_t size) {
+ return new(malloc(sizeof(TRopeAlignedBuffer) + size + Alignment - MallocAlignment)) TRopeAlignedBuffer(size);
+ }
+
+ void *operator new(size_t) {
+ Y_FAIL();
+ }
+
+ void *operator new(size_t, void *ptr) {
+ return ptr;
+ }
+
+ void operator delete(void *ptr) {
+ free(ptr);
+ }
+
+ void operator delete(void* p, void* ptr) {
+ Y_UNUSED(p);
+ Y_UNUSED(ptr);
+ }
+
+ TData GetData() const override {
+ return {Data + Offset, Size};
+ }
+
+ size_t GetCapacity() const override {
+ return Capacity;
+ }
+
+ char *GetBuffer() {
+ return Data + Offset;
+ }
+
+ void AdjustSize(size_t size) {
+ Y_VERIFY(size <= Capacity);
+ Size = size;
+ }
+};
+
+namespace NRopeDetails {
+
+ template<bool IsConst, typename TRope, typename TList>
+ struct TIteratorTraits;
+
+ template<typename TRope, typename TList>
+ struct TIteratorTraits<true, TRope, TList> {
+ using TRopePtr = const TRope*;
+ using TListIterator = typename TList::const_iterator;
+ };
+
+ template<typename TRope, typename TList>
+ struct TIteratorTraits<false, TRope, TList> {
+ using TRopePtr = TRope*;
+ using TListIterator = typename TList::iterator;
+ };
+
+} // NRopeDetails
+
+class TRopeArena;
+
+template<typename T>
+struct always_false : std::false_type {};
+
+class TRope {
+ friend class TRopeArena;
+
+ struct TChunk
+ {
+ class TBackend {
+ enum class EType : uintptr_t {
+ STRING,
+ ROPE_CHUNK_BACKEND,
+ };
+
+ uintptr_t Owner = 0; // lower bits contain type of the owner
+
+ public:
+ TBackend() = delete;
+
+ TBackend(const TBackend& other)
+ : Owner(Clone(other.Owner))
+ {}
+
+ TBackend(TBackend&& other)
+ : Owner(std::exchange(other.Owner, 0))
+ {}
+
+ TBackend(TString s)
+ : Owner(Construct<TString>(EType::STRING, std::move(s)))
+ {}
+
+ TBackend(IRopeChunkBackend::TPtr backend)
+ : Owner(Construct<IRopeChunkBackend::TPtr>(EType::ROPE_CHUNK_BACKEND, std::move(backend)))
+ {}
+
+ ~TBackend() {
+ if (Owner) {
+ Destroy(Owner);
+ }
+ }
+
+ TBackend& operator =(const TBackend& other) {
+ if (Owner) {
+ Destroy(Owner);
+ }
+ Owner = Clone(other.Owner);
+ return *this;
+ }
+
+ TBackend& operator =(TBackend&& other) {
+ if (Owner) {
+ Destroy(Owner);
+ }
+ Owner = std::exchange(other.Owner, 0);
+ return *this;
+ }
+
+ bool operator ==(const TBackend& other) const {
+ return Owner == other.Owner;
+ }
+
+ const void *UniqueId() const {
+ return reinterpret_cast<const void*>(Owner);
+ }
+
+ const IRopeChunkBackend::TData GetData() const {
+ return Visit(Owner, [](EType, auto& value) -> IRopeChunkBackend::TData {
+ using T = std::decay_t<decltype(value)>;
+ if constexpr (std::is_same_v<T, TString>) {
+ return {value.data(), value.size()};
+ } else if constexpr (std::is_same_v<T, IRopeChunkBackend::TPtr>) {
+ return value->GetData();
+ } else {
+ return {};
+ }
+ });
+ }
+
+ size_t GetCapacity() const {
+ return Visit(Owner, [](EType, auto& value) {
+ using T = std::decay_t<decltype(value)>;
+ if constexpr (std::is_same_v<T, TString>) {
+ return value.capacity();
+ } else if constexpr (std::is_same_v<T, IRopeChunkBackend::TPtr>) {
+ return value->GetCapacity();
+ } else {
+ Y_FAIL();
+ }
+ });
+ }
+
+ private:
+ static constexpr uintptr_t TypeMask = (1 << 3) - 1;
+ static constexpr uintptr_t ValueMask = ~TypeMask;
+
+ template<typename T>
+ struct TObjectHolder {
+ struct TWrappedObject : TThrRefBase {
+ T Value;
+ TWrappedObject(T&& value)
+ : Value(std::move(value))
+ {}
+ };
+ TIntrusivePtr<TWrappedObject> Object;
+
+ TObjectHolder(T&& object)
+ : Object(MakeIntrusive<TWrappedObject>(std::move(object)))
+ {}
+ };
+
+ template<typename TObject>
+ static uintptr_t Construct(EType type, TObject object) {
+ if constexpr (sizeof(TObject) <= sizeof(uintptr_t)) {
+ uintptr_t res = 0;
+ new(&res) TObject(std::move(object));
+ Y_VERIFY_DEBUG((res & ValueMask) == res);
+ return res | static_cast<uintptr_t>(type);
+ } else {
+ return Construct<TObjectHolder<TObject>>(type, TObjectHolder<TObject>(std::move(object)));
+ }
+ }
+
+ template<typename TCallback>
+ static std::invoke_result_t<TCallback, EType, TString&> VisitRaw(uintptr_t value, TCallback&& callback) {
+ Y_VERIFY_DEBUG(value);
+ const EType type = static_cast<EType>(value & TypeMask);
+ value &= ValueMask;
+ auto caller = [&](auto& value) { return std::invoke(std::forward<TCallback>(callback), type, value); };
+ auto wrapper = [&](auto& value) {
+ using T = std::decay_t<decltype(value)>;
+ if constexpr (sizeof(T) <= sizeof(uintptr_t)) {
+ return caller(value);
+ } else {
+ return caller(reinterpret_cast<TObjectHolder<T>&>(value));
+ }
+ };
+ switch (type) {
+ case EType::STRING: return wrapper(reinterpret_cast<TString&>(value));
+ case EType::ROPE_CHUNK_BACKEND: return wrapper(reinterpret_cast<IRopeChunkBackend::TPtr&>(value));
+ }
+ Y_FAIL("Unexpected type# %" PRIu64, static_cast<ui64>(type));
+ }
+
+ template<typename TCallback>
+ static std::invoke_result_t<TCallback, EType, TString&> Visit(uintptr_t value, TCallback&& callback) {
+ return VisitRaw(value, [&](EType type, auto& value) {
+ return std::invoke(std::forward<TCallback>(callback), type, Unwrap(value));
+ });
+ }
+
+ template<typename T> static T& Unwrap(T& object) { return object; }
+ template<typename T> static T& Unwrap(TObjectHolder<T>& holder) { return holder.Object->Value; }
+
+ static uintptr_t Clone(uintptr_t value) {
+ return VisitRaw(value, [](EType type, auto& value) { return Construct(type, value); });
+ }
+
+ static void Destroy(uintptr_t value) {
+ VisitRaw(value, [](EType, auto& value) { CallDtor(value); });
+ }
+
+ template<typename T>
+ static void CallDtor(T& value) {
+ value.~T();
+ }
+ };
+
+ TBackend Backend; // who actually holds the data
+ const char *Begin; // data start
+ const char *End; // data end
+
+ static constexpr struct TSlice {} Slice{};
+
+ template<typename T>
+ TChunk(T&& backend, const IRopeChunkBackend::TData& data)
+ : Backend(std::move(backend))
+ , Begin(std::get<0>(data))
+ , End(Begin + std::get<1>(data))
+ {
+ Y_VERIFY_DEBUG(Begin != End);
+ }
+
+ TChunk(TString s)
+ : Backend(std::move(s))
+ {
+ size_t size;
+ std::tie(Begin, size) = Backend.GetData();
+ End = Begin + size;
+ }
+
+ TChunk(IRopeChunkBackend::TPtr backend)
+ : TChunk(backend, backend->GetData())
+ {}
+
+ TChunk(TSlice, const char *data, size_t size, const TChunk& from)
+ : TChunk(from.Backend, {data, size})
+ {}
+
+ TChunk(TSlice, const char *begin, const char *end, const TChunk& from)
+ : TChunk(Slice, begin, end - begin, from)
+ {}
+
+ explicit TChunk(const TChunk& other)
+ : Backend(other.Backend)
+ , Begin(other.Begin)
+ , End(other.End)
+ {}
+
+ TChunk(TChunk&& other)
+ : Backend(std::move(other.Backend))
+ , Begin(other.Begin)
+ , End(other.End)
+ {}
+
+ TChunk& operator =(const TChunk&) = default;
+ TChunk& operator =(TChunk&&) = default;
+
+ size_t GetSize() const {
+ return End - Begin;
+ }
+
+ static void Clear(TChunk& chunk) {
+ chunk.Begin = nullptr;
+ }
+
+ static bool IsInUse(const TChunk& chunk) {
+ return chunk.Begin != nullptr;
+ }
+
+ size_t GetCapacity() const {
+ return Backend.GetCapacity();
+ }
+ };
+
+ using TChunkList = NRopeDetails::TChunkList<TChunk>;
+
+private:
+ // we use list here to store chain items as we have to keep valid iterators when erase/insert operations are invoked;
+ // iterator uses underlying container's iterator, so we have to use container that keeps valid iterators on delete,
+ // thus, the list
+ TChunkList Chain;
+ size_t Size = 0;
+
+private:
+ template<bool IsConst>
+ class TIteratorImpl {
+ using TTraits = NRopeDetails::TIteratorTraits<IsConst, TRope, TChunkList>;
+
+ typename TTraits::TRopePtr Rope;
+ typename TTraits::TListIterator Iter;
+ const char *Ptr; // ptr is always nullptr when iterator is positioned at the rope end
+
+#ifndef NDEBUG
+ ui32 ValidityToken;
+#endif
+
+ private:
+ TIteratorImpl(typename TTraits::TRopePtr rope, typename TTraits::TListIterator iter, const char *ptr = nullptr)
+ : Rope(rope)
+ , Iter(iter)
+ , Ptr(ptr)
+#ifndef NDEBUG
+ , ValidityToken(Rope->GetValidityToken())
+#endif
+ {}
+
+ public:
+ TIteratorImpl()
+ : Rope(nullptr)
+ , Ptr(nullptr)
+ {}
+
+ template<bool IsOtherConst>
+ TIteratorImpl(const TIteratorImpl<IsOtherConst>& other)
+ : Rope(other.Rope)
+ , Iter(other.Iter)
+ , Ptr(other.Ptr)
+#ifndef NDEBUG
+ , ValidityToken(other.ValidityToken)
+#endif
+ {}
+
+ void CheckValid() const {
+#ifndef NDEBUG
+ Y_VERIFY(ValidityToken == Rope->GetValidityToken());
+#endif
+ }
+
+ TIteratorImpl& operator +=(size_t amount) {
+ CheckValid();
+
+ while (amount) {
+ Y_VERIFY_DEBUG(Valid());
+ const size_t max = ContiguousSize();
+ const size_t num = std::min(amount, max);
+ amount -= num;
+ Ptr += num;
+ if (Ptr == Iter->End) {
+ AdvanceToNextContiguousBlock();
+ }
+ }
+
+ return *this;
+ }
+
+ TIteratorImpl operator +(size_t amount) const {
+ CheckValid();
+
+ return TIteratorImpl(*this) += amount;
+ }
+
+ TIteratorImpl& operator -=(size_t amount) {
+ CheckValid();
+
+ while (amount) {
+ const size_t num = Ptr ? std::min<size_t>(amount, Ptr - Iter->Begin) : 0;
+ amount -= num;
+ Ptr -= num;
+ if (amount) {
+ Y_VERIFY_DEBUG(Iter != GetChainBegin());
+ --Iter;
+ Ptr = Iter->End;
+ }
+ }
+
+ return *this;
+ }
+
+ TIteratorImpl operator -(size_t amount) const {
+ CheckValid();
+ return TIteratorImpl(*this) -= amount;
+ }
+
+ std::pair<const char*, size_t> operator *() const {
+ return {ContiguousData(), ContiguousSize()};
+ }
+
+ TIteratorImpl& operator ++() {
+ AdvanceToNextContiguousBlock();
+ return *this;
+ }
+
+ TIteratorImpl operator ++(int) const {
+ auto it(*this);
+ it.AdvanceToNextContiguousBlock();
+ return it;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Operation with contiguous data
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ // Get the pointer to the contiguous block of data; valid locations are [Data; Data + Size).
+ const char *ContiguousData() const {
+ CheckValid();
+ return Ptr;
+ }
+
+ // Get the amount of contiguous block.
+ size_t ContiguousSize() const {
+ CheckValid();
+ return Ptr ? Iter->End - Ptr : 0;
+ }
+
+ size_t ChunkOffset() const {
+ return Ptr ? Ptr - Iter->Begin : 0;
+ }
+
+ // Advance to next contiguous block of data.
+ void AdvanceToNextContiguousBlock() {
+ CheckValid();
+ Y_VERIFY_DEBUG(Valid());
+ ++Iter;
+ Ptr = Iter != GetChainEnd() ? Iter->Begin : nullptr;
+ }
+
+ // Extract some data and advance. Size is not checked here, to it must be provided valid.
+ void ExtractPlainDataAndAdvance(void *buffer, size_t len) {
+ CheckValid();
+
+ while (len) {
+ Y_VERIFY_DEBUG(Ptr);
+
+ // calculate amount of bytes we need to move
+ const size_t max = ContiguousSize();
+ const size_t num = std::min(len, max);
+
+ // copy data to the buffer and advance buffer pointers
+ memcpy(buffer, Ptr, num);
+ buffer = static_cast<char*>(buffer) + num;
+ len -= num;
+
+ // advance iterator itself
+ Ptr += num;
+ if (Ptr == Iter->End) {
+ AdvanceToNextContiguousBlock();
+ }
+ }
+ }
+
+ // Checks if the iterator points to the end of the rope or not.
+ bool Valid() const {
+ CheckValid();
+ return Ptr;
+ }
+
+ template<bool IsOtherConst>
+ bool operator ==(const TIteratorImpl<IsOtherConst>& other) const {
+ Y_VERIFY_DEBUG(Rope == other.Rope);
+ CheckValid();
+ other.CheckValid();
+ return Iter == other.Iter && Ptr == other.Ptr;
+ }
+
+ template<bool IsOtherConst>
+ bool operator !=(const TIteratorImpl<IsOtherConst>& other) const {
+ CheckValid();
+ other.CheckValid();
+ return !(*this == other);
+ }
+
+ private:
+ friend class TRope;
+
+ typename TTraits::TListIterator operator ->() const {
+ CheckValid();
+ return Iter;
+ }
+
+ const TChunk& GetChunk() const {
+ CheckValid();
+ return *Iter;
+ }
+
+ typename TTraits::TListIterator GetChainBegin() const {
+ CheckValid();
+ return Rope->Chain.begin();
+ }
+
+ typename TTraits::TListIterator GetChainEnd() const {
+ CheckValid();
+ return Rope->Chain.end();
+ }
+
+ bool PointsToChunkMiddle() const {
+ CheckValid();
+ return Ptr && Ptr != Iter->Begin;
+ }
+ };
+
+public:
+#ifndef NDEBUG
+ ui32 ValidityToken = 0;
+ ui32 GetValidityToken() const { return ValidityToken; }
+ void InvalidateIterators() { ++ValidityToken; }
+#else
+ void InvalidateIterators() {}
+#endif
+
+public:
+ using TConstIterator = TIteratorImpl<true>;
+ using TIterator = TIteratorImpl<false>;
+
+public:
+ TRope() = default;
+ TRope(const TRope& rope) = default;
+
+ TRope(TRope&& rope)
+ : Chain(std::move(rope.Chain))
+ , Size(std::exchange(rope.Size, 0))
+ {
+ rope.InvalidateIterators();
+ }
+
+ TRope(TString s) {
+ if (s) {
+ Size = s.size();
+ s.reserve(32);
+ Chain.PutToEnd(std::move(s));
+ }
+ }
+
+ TRope(IRopeChunkBackend::TPtr item) {
+ std::tie(std::ignore, Size) = item->GetData();
+ Chain.PutToEnd(std::move(item));
+ }
+
+ TRope(TConstIterator begin, TConstIterator end) {
+ Y_VERIFY_DEBUG(begin.Rope == end.Rope);
+ if (begin.Rope == this) {
+ TRope temp(begin, end);
+ *this = std::move(temp);
+ return;
+ }
+
+ while (begin.Iter != end.Iter) {
+ const size_t size = begin.ContiguousSize();
+ Chain.PutToEnd(TChunk::Slice, begin.ContiguousData(), size, begin.GetChunk());
+ begin.AdvanceToNextContiguousBlock();
+ Size += size;
+ }
+
+ if (begin != end && end.PointsToChunkMiddle()) {
+ Chain.PutToEnd(TChunk::Slice, begin.Ptr, end.Ptr, begin.GetChunk());
+ Size += end.Ptr - begin.Ptr;
+ }
+ }
+
+ ~TRope() {
+ }
+
+ // creates a copy of rope with chunks with inefficient storage ratio being copied with arena allocator
+ static TRope CopySpaceOptimized(TRope&& origin, size_t worstRatioPer1k, TRopeArena& arena);
+
+ TRope& operator=(const TRope& other) {
+ Chain = other.Chain;
+ Size = other.Size;
+ return *this;
+ }
+
+ TRope& operator=(TRope&& other) {
+ Chain = std::move(other.Chain);
+ Size = std::exchange(other.Size, 0);
+ InvalidateIterators();
+ other.InvalidateIterators();
+ return *this;
+ }
+
+ size_t GetSize() const {
+ return Size;
+ }
+
+ bool IsEmpty() const {
+ return !Size;
+ }
+
+ operator bool() const {
+ return Chain;
+ }
+
+ TIterator Begin() {
+ return *this ? TIterator(this, Chain.begin(), Chain.GetFirstChunk().Begin) : End();
+ }
+
+ TIterator End() {
+ return TIterator(this, Chain.end());
+ }
+
+ TIterator Iterator(TChunkList::iterator it) {
+ return TIterator(this, it, it != Chain.end() ? it->Begin : nullptr);
+ }
+
+ TIterator Position(size_t index) {
+ return Begin() + index;
+ }
+
+ TConstIterator Begin() const {
+ return *this ? TConstIterator(this, Chain.begin(), Chain.GetFirstChunk().Begin) : End();
+ }
+
+ TConstIterator End() const {
+ return TConstIterator(this, Chain.end());
+ }
+
+ TConstIterator Position(size_t index) const {
+ return Begin() + index;
+ }
+
+ TConstIterator begin() const { return Begin(); }
+ TConstIterator end() const { return End(); }
+
+ void Erase(TIterator begin, TIterator end) {
+ Cut(begin, end, nullptr);
+ }
+
+ TRope Extract(TIterator begin, TIterator end) {
+ TRope res;
+ Cut(begin, end, &res);
+ return res;
+ }
+
+ void ExtractFront(size_t num, TRope *dest) {
+ Y_VERIFY(Size >= num);
+ if (num == Size && !*dest) {
+ *dest = std::move(*this);
+ return;
+ }
+ Size -= num;
+ dest->Size += num;
+ TChunkList::iterator it, first = Chain.begin();
+ for (it = first; num && num >= it->GetSize(); ++it) {
+ num -= it->GetSize();
+ }
+ if (it != first) {
+ if (dest->Chain) {
+ auto& last = dest->Chain.GetLastChunk();
+ if (last.Backend == first->Backend && last.End == first->Begin) {
+ last.End = first->End;
+ first = Chain.Erase(first); // TODO(alexvru): "it" gets invalidated here on some containers
+ }
+ }
+ dest->Chain.Splice(dest->Chain.end(), Chain, first, it);
+ }
+ if (num) {
+ auto it = Chain.begin();
+ if (dest->Chain) {
+ auto& last = dest->Chain.GetLastChunk();
+ if (last.Backend == first->Backend && last.End == first->Begin) {
+ first->Begin += num;
+ last.End = first->Begin;
+ return;
+ }
+ }
+ dest->Chain.PutToEnd(TChunk::Slice, it->Begin, it->Begin + num, *it);
+ it->Begin += num;
+ }
+ }
+
+ void Insert(TIterator pos, TRope&& rope) {
+ Y_VERIFY_DEBUG(this == pos.Rope);
+ Y_VERIFY_DEBUG(this != &rope);
+
+ if (!rope) {
+ return; // do nothing for empty rope
+ }
+
+ // adjust size
+ Size += std::exchange(rope.Size, 0);
+
+ // check if we have to split the block
+ if (pos.PointsToChunkMiddle()) {
+ pos.Iter = Chain.InsertBefore(pos.Iter, TChunk::Slice, pos->Begin, pos.Ptr, pos.GetChunk());
+ ++pos.Iter;
+ pos->Begin = pos.Ptr;
+ }
+
+ // perform glueing if possible
+ TChunk *ropeLeft = &rope.Chain.GetFirstChunk();
+ TChunk *ropeRight = &rope.Chain.GetLastChunk();
+ bool gluedLeft = false, gluedRight = false;
+ if (pos.Iter != Chain.begin()) { // glue left part whenever possible
+ // obtain iterator to previous chunk
+ auto prev(pos.Iter);
+ --prev;
+ if (prev->End == ropeLeft->Begin && prev->Backend == ropeLeft->Backend) { // it is glueable
+ prev->End = ropeLeft->End;
+ gluedLeft = true;
+ }
+ }
+ if (pos.Iter != Chain.end() && ropeRight->End == pos->Begin && ropeRight->Backend == pos->Backend) {
+ pos->Begin = ropeRight->Begin;
+ gluedRight = true;
+ }
+ if (gluedLeft) {
+ rope.Chain.EraseFront();
+ }
+ if (gluedRight) {
+ if (rope) {
+ rope.Chain.EraseBack();
+ } else { // it looks like double-glueing for the same chunk, we have to drop previous one
+ auto prev(pos.Iter);
+ --prev;
+ pos->Begin = prev->Begin;
+ pos.Iter = Chain.Erase(prev);
+ }
+ }
+ if (rope) { // insert remains
+ Chain.Splice(pos.Iter, rope.Chain, rope.Chain.begin(), rope.Chain.end());
+ }
+ Y_VERIFY_DEBUG(!rope);
+ InvalidateIterators();
+ }
+
+ void EraseFront(size_t len) {
+ Y_VERIFY_DEBUG(Size >= len);
+ Size -= len;
+
+ while (len) {
+ Y_VERIFY_DEBUG(Chain);
+ TChunk& item = Chain.GetFirstChunk();
+ const size_t itemSize = item.GetSize();
+ if (len >= itemSize) {
+ Chain.EraseFront();
+ len -= itemSize;
+ } else {
+ item.Begin += len;
+ break;
+ }
+ }
+
+ InvalidateIterators();
+ }
+
+ void EraseBack(size_t len) {
+ Y_VERIFY_DEBUG(Size >= len);
+ Size -= len;
+
+ while (len) {
+ Y_VERIFY_DEBUG(Chain);
+ TChunk& item = Chain.GetLastChunk();
+ const size_t itemSize = item.GetSize();
+ if (len >= itemSize) {
+ Chain.EraseBack();
+ len -= itemSize;
+ } else {
+ item.End -= len;
+ break;
+ }
+ }
+
+ InvalidateIterators();
+ }
+
+ bool ExtractFrontPlain(void *buffer, size_t len) {
+ // check if we have enough data in the rope
+ if (Size < len) {
+ return false;
+ }
+ Size -= len;
+ while (len) {
+ auto& chunk = Chain.GetFirstChunk();
+ const size_t num = Min(len, chunk.GetSize());
+ memcpy(buffer, chunk.Begin, num);
+ buffer = static_cast<char*>(buffer) + num;
+ len -= num;
+ chunk.Begin += num;
+ if (chunk.Begin == chunk.End) {
+ Chain.Erase(Chain.begin());
+ }
+ }
+ InvalidateIterators();
+ return true;
+ }
+
+ bool FetchFrontPlain(char **ptr, size_t *remain) {
+ const size_t num = Min(*remain, Size);
+ ExtractFrontPlain(*ptr, num);
+ *ptr += num;
+ *remain -= num;
+ return !*remain;
+ }
+
+ static int Compare(const TRope& x, const TRope& y) {
+ TConstIterator xIter = x.Begin(), yIter = y.Begin();
+ while (xIter.Valid() && yIter.Valid()) {
+ const size_t step = std::min(xIter.ContiguousSize(), yIter.ContiguousSize());
+ if (int res = memcmp(xIter.ContiguousData(), yIter.ContiguousData(), step)) {
+ return res;
+ }
+ xIter += step;
+ yIter += step;
+ }
+ return xIter.Valid() - yIter.Valid();
+ }
+
+ // Use this method carefully -- it may significantly reduce performance when misused.
+ TString ConvertToString() const {
+ TString res = TString::Uninitialized(GetSize());
+ Begin().ExtractPlainDataAndAdvance(res.Detach(), res.size());
+ return res;
+ }
+
+ TString DebugString() const {
+ TStringStream s;
+ s << "{Size# " << Size;
+ for (const auto& chunk : Chain) {
+ const char *data;
+ std::tie(data, std::ignore) = chunk.Backend.GetData();
+ s << " [" << chunk.Begin - data << ", " << chunk.End - data << ")@" << chunk.Backend.UniqueId();
+ }
+ s << "}";
+ return s.Str();
+ }
+
+ friend bool operator==(const TRope& x, const TRope& y) { return Compare(x, y) == 0; }
+ friend bool operator!=(const TRope& x, const TRope& y) { return Compare(x, y) != 0; }
+ friend bool operator< (const TRope& x, const TRope& y) { return Compare(x, y) < 0; }
+ friend bool operator<=(const TRope& x, const TRope& y) { return Compare(x, y) <= 0; }
+ friend bool operator> (const TRope& x, const TRope& y) { return Compare(x, y) > 0; }
+ friend bool operator>=(const TRope& x, const TRope& y) { return Compare(x, y) >= 0; }
+
+private:
+ void Cut(TIterator begin, TIterator end, TRope *target) {
+ // ensure all iterators are belong to us
+ Y_VERIFY_DEBUG(this == begin.Rope && this == end.Rope);
+
+ // if begin and end are equal, we do nothing -- checking this case allows us to find out that begin does not
+ // point to End(), for example
+ if (begin == end) {
+ return;
+ }
+
+ auto addBlock = [&](const TChunk& from, const char *begin, const char *end) {
+ if (target) {
+ target->Chain.PutToEnd(TChunk::Slice, begin, end, from);
+ target->Size += end - begin;
+ }
+ Size -= end - begin;
+ };
+
+ // consider special case -- when begin and end point to the same block; in this case we have to split up this
+ // block into two parts
+ if (begin.Iter == end.Iter) {
+ addBlock(begin.GetChunk(), begin.Ptr, end.Ptr);
+ const char *firstChunkBegin = begin.PointsToChunkMiddle() ? begin->Begin : nullptr;
+ begin->Begin = end.Ptr; // this affects both begin and end iterator pointed values
+ if (firstChunkBegin) {
+ Chain.InsertBefore(begin.Iter, TChunk::Slice, firstChunkBegin, begin.Ptr, begin.GetChunk());
+ }
+ } else {
+ // check the first iterator -- if it starts not from the begin of the block, we have to adjust end of the
+ // first block to match begin iterator and switch to next block
+ if (begin.PointsToChunkMiddle()) {
+ addBlock(begin.GetChunk(), begin.Ptr, begin->End);
+ begin->End = begin.Ptr;
+ begin.AdvanceToNextContiguousBlock();
+ }
+
+ // now drop full blocks
+ size_t rangeSize = 0;
+ for (auto it = begin.Iter; it != end.Iter; ++it) {
+ Y_VERIFY_DEBUG(it->GetSize());
+ rangeSize += it->GetSize();
+ }
+ if (rangeSize) {
+ if (target) {
+ end.Iter = target->Chain.Splice(target->Chain.end(), Chain, begin.Iter, end.Iter);
+ target->Size += rangeSize;
+ } else {
+ end.Iter = Chain.Erase(begin.Iter, end.Iter);
+ }
+ Size -= rangeSize;
+ }
+
+ // and cut the last block if necessary
+ if (end.PointsToChunkMiddle()) {
+ addBlock(end.GetChunk(), end->Begin, end.Ptr);
+ end->Begin = end.Ptr;
+ }
+ }
+
+ InvalidateIterators();
+ }
+};
+
+class TRopeArena {
+ using TAllocateCallback = std::function<TIntrusivePtr<IRopeChunkBackend>()>;
+
+ TAllocateCallback Allocator;
+ TRope Arena;
+ size_t Size = 0;
+ THashSet<const void*> AccountedBuffers;
+
+public:
+ TRopeArena(TAllocateCallback&& allocator)
+ : Allocator(std::move(allocator))
+ {}
+
+ TRope CreateRope(const void *buffer, size_t len) {
+ TRope res;
+
+ while (len) {
+ if (Arena) {
+ auto iter = Arena.Begin();
+ Y_VERIFY_DEBUG(iter.Valid());
+ char *dest = const_cast<char*>(iter.ContiguousData());
+ const size_t bytesToCopy = std::min(len, iter.ContiguousSize());
+ memcpy(dest, buffer, bytesToCopy);
+ buffer = static_cast<const char*>(buffer) + bytesToCopy;
+ len -= bytesToCopy;
+ res.Insert(res.End(), Arena.Extract(Arena.Begin(), Arena.Position(bytesToCopy)));
+ } else {
+ Arena.Insert(Arena.End(), TRope(Allocator()));
+ }
+ }
+
+ // align arena on 8-byte boundary
+ const size_t align = 8;
+ if (const size_t padding = Arena.GetSize() % align) {
+ Arena.EraseFront(padding);
+ }
+
+ return res;
+ }
+
+ size_t GetSize() const {
+ return Size;
+ }
+
+ void AccountChunk(const TRope::TChunk& chunk) {
+ if (AccountedBuffers.insert(chunk.Backend.UniqueId()).second) {
+ Size += chunk.GetCapacity();
+ }
+ }
+};
+
+struct TRopeUtils {
+ static void Memset(TRope::TConstIterator dst, char c, size_t size) {
+ while (size) {
+ Y_VERIFY_DEBUG(dst.Valid());
+ size_t len = std::min(size, dst.ContiguousSize());
+ memset(const_cast<char*>(dst.ContiguousData()), c, len);
+ dst += len;
+ size -= len;
+ }
+ }
+
+ static void Memcpy(TRope::TConstIterator dst, TRope::TConstIterator src, size_t size) {
+ while (size) {
+ Y_VERIFY_DEBUG(dst.Valid() && src.Valid(),
+ "Invalid iterator in memcpy: dst.Valid() - %" PRIu32 ", src.Valid() - %" PRIu32,
+ (ui32)dst.Valid(), (ui32)src.Valid());
+ size_t len = std::min(size, std::min(dst.ContiguousSize(), src.ContiguousSize()));
+ memcpy(const_cast<char*>(dst.ContiguousData()), src.ContiguousData(), len);
+ dst += len;
+ src += len;
+ size -= len;
+ }
+ }
+
+ static void Memcpy(TRope::TConstIterator dst, const char* src, size_t size) {
+ while (size) {
+ Y_VERIFY_DEBUG(dst.Valid());
+ size_t len = std::min(size, dst.ContiguousSize());
+ memcpy(const_cast<char*>(dst.ContiguousData()), src, len);
+ size -= len;
+ dst += len;
+ src += len;
+ }
+ }
+
+ static void Memcpy(char* dst, TRope::TConstIterator src, size_t size) {
+ while (size) {
+ Y_VERIFY_DEBUG(src.Valid());
+ size_t len = std::min(size, src.ContiguousSize());
+ memcpy(dst, src.ContiguousData(), len);
+ size -= len;
+ dst += len;
+ src += len;
+ }
+ }
+
+ // copy less or equal to sizeBound bytes, until src is valid
+ static size_t SafeMemcpy(char* dst, TRope::TIterator src, size_t sizeBound) {
+ size_t origSize = sizeBound;
+ while (sizeBound && src.Valid()) {
+ size_t len = Min(sizeBound, src.ContiguousSize());
+ memcpy(dst, src.ContiguousData(), len);
+ sizeBound -= len;
+ dst += len;
+ src += len;
+ }
+ return origSize - sizeBound;
+ }
+};
+
+template<size_t BLOCK, size_t ALIGN = 16>
+class TRopeSlideView {
+ alignas(ALIGN) char Slide[BLOCK]; // use if distance from current point and next chunk is less than BLOCK
+ TRope::TIterator Position; // current position at rope
+ size_t Size;
+ char* Head; // points to data, it might be current rope chunk or Slide
+
+private:
+ void FillBlock() {
+ size_t chunkSize = Position.ContiguousSize();
+ if (chunkSize >= BLOCK) {
+ Size = chunkSize;
+ Head = const_cast<char*>(Position.ContiguousData());
+ } else {
+ Size = TRopeUtils::SafeMemcpy(Slide, Position, BLOCK);
+ Head = Slide;
+ }
+ }
+
+public:
+ TRopeSlideView(TRope::TIterator position)
+ : Position(position)
+ {
+ FillBlock();
+ }
+
+ TRopeSlideView(TRope &rope)
+ : TRopeSlideView(rope.Begin())
+ {}
+
+ // if view on slide then copy slide to rope
+ void FlushBlock() {
+ if (Head == Slide) {
+ TRopeUtils::Memcpy(Position, Head, Size);
+ }
+ }
+
+ TRope::TIterator operator+=(size_t amount) {
+ Position += amount;
+ FillBlock();
+ return Position;
+ }
+
+ TRope::TIterator GetPosition() const {
+ return Position;
+ }
+
+ char* GetHead() const {
+ return Head;
+ }
+
+ ui8* GetUi8Head() const {
+ return reinterpret_cast<ui8*>(Head);
+ }
+
+ size_t ContiguousSize() const {
+ return Size;
+ }
+
+ bool IsOnChunk() const {
+ return Head != Slide;
+ }
+};
+
+inline TRope TRope::CopySpaceOptimized(TRope&& origin, size_t worstRatioPer1k, TRopeArena& arena) {
+ TRope res;
+ for (TChunk& chunk : origin.Chain) {
+ size_t ratio = chunk.GetSize() * 1024 / chunk.GetCapacity();
+ if (ratio < 1024 - worstRatioPer1k) {
+ res.Insert(res.End(), arena.CreateRope(chunk.Begin, chunk.GetSize()));
+ } else {
+ res.Chain.PutToEnd(std::move(chunk));
+ }
+ }
+ res.Size = origin.Size;
+ origin = TRope();
+ for (const TChunk& chunk : res.Chain) {
+ arena.AccountChunk(chunk);
+ }
+ return res;
+}
+
+
+#if defined(WITH_VALGRIND) || defined(_msan_enabled_)
+
+inline void CheckRopeIsDefined(TRope::TConstIterator begin, ui64 size) {
+ while (size) {
+ ui64 contiguousSize = Min(size, begin.ContiguousSize());
+# if defined(WITH_VALGRIND)
+ VALGRIND_CHECK_MEM_IS_DEFINED(begin.ContiguousData(), contiguousSize);
+# endif
+# if defined(_msan_enabled_)
+ NSan::CheckMemIsInitialized(begin.ContiguousData(), contiguousSize);
+# endif
+ size -= contiguousSize;
+ begin += contiguousSize;
+ }
+}
+
+# define CHECK_ROPE_IS_DEFINED(begin, size) CheckRopeIsDefined(begin, size)
+
+#else
+
+# define CHECK_ROPE_IS_DEFINED(begin, size) do {} while (false)
+
+#endif
diff --git a/library/cpp/actors/util/rope_cont_deque.h b/library/cpp/actors/util/rope_cont_deque.h
new file mode 100644
index 0000000000..d1d122c49c
--- /dev/null
+++ b/library/cpp/actors/util/rope_cont_deque.h
@@ -0,0 +1,181 @@
+#pragma once
+
+#include <library/cpp/containers/stack_vector/stack_vec.h>
+#include <deque>
+
+namespace NRopeDetails {
+
+template<typename TChunk>
+class TChunkList {
+ std::deque<TChunk> Chunks;
+
+ static constexpr size_t MaxInplaceItems = 4;
+ using TInplace = TStackVec<TChunk, MaxInplaceItems>;
+ TInplace Inplace;
+
+private:
+ template<typename TChunksIt, typename TInplaceIt, typename TValue>
+ struct TIterator {
+ TChunksIt ChunksIt;
+ TInplaceIt InplaceIt;
+
+ TIterator() = default;
+
+ TIterator(TChunksIt chunksIt, TInplaceIt inplaceIt)
+ : ChunksIt(std::move(chunksIt))
+ , InplaceIt(inplaceIt)
+ {}
+
+ template<typename A, typename B, typename C>
+ TIterator(const TIterator<A, B, C>& other)
+ : ChunksIt(other.ChunksIt)
+ , InplaceIt(other.InplaceIt)
+ {}
+
+ TIterator(const TIterator&) = default;
+ TIterator(TIterator&&) = default;
+ TIterator& operator =(const TIterator&) = default;
+ TIterator& operator =(TIterator&&) = default;
+
+ TValue& operator *() const { return InplaceIt != TInplaceIt() ? *InplaceIt : *ChunksIt; }
+ TValue* operator ->() const { return InplaceIt != TInplaceIt() ? &*InplaceIt : &*ChunksIt; }
+
+ TIterator& operator ++() {
+ if (InplaceIt != TInplaceIt()) {
+ ++InplaceIt;
+ } else {
+ ++ChunksIt;
+ }
+ return *this;
+ }
+
+ TIterator& operator --() {
+ if (InplaceIt != TInplaceIt()) {
+ --InplaceIt;
+ } else {
+ --ChunksIt;
+ }
+ return *this;
+ }
+
+ template<typename A, typename B, typename C>
+ bool operator ==(const TIterator<A, B, C>& other) const {
+ return ChunksIt == other.ChunksIt && InplaceIt == other.InplaceIt;
+ }
+
+ template<typename A, typename B, typename C>
+ bool operator !=(const TIterator<A, B, C>& other) const {
+ return ChunksIt != other.ChunksIt || InplaceIt != other.InplaceIt;
+ }
+ };
+
+public:
+ using iterator = TIterator<typename std::deque<TChunk>::iterator, typename TInplace::iterator, TChunk>;
+ using const_iterator = TIterator<typename std::deque<TChunk>::const_iterator, typename TInplace::const_iterator, const TChunk>;
+
+public:
+ TChunkList() = default;
+ TChunkList(const TChunkList& other) = default;
+ TChunkList(TChunkList&& other) = default;
+ TChunkList& operator=(const TChunkList& other) = default;
+ TChunkList& operator=(TChunkList&& other) = default;
+
+ template<typename... TArgs>
+ void PutToEnd(TArgs&&... args) {
+ InsertBefore(end(), std::forward<TArgs>(args)...);
+ }
+
+ template<typename... TArgs>
+ iterator InsertBefore(iterator pos, TArgs&&... args) {
+ if (!Inplace) {
+ pos.InplaceIt = Inplace.end();
+ }
+ if (Chunks.empty() && Inplace.size() < MaxInplaceItems) {
+ return {{}, Inplace.emplace(pos.InplaceIt, std::forward<TArgs>(args)...)};
+ } else {
+ if (Inplace) {
+ Y_VERIFY_DEBUG(Chunks.empty());
+ for (auto& item : Inplace) {
+ Chunks.push_back(std::move(item));
+ }
+ pos.ChunksIt = pos.InplaceIt - Inplace.begin() + Chunks.begin();
+ Inplace.clear();
+ }
+ return {Chunks.emplace(pos.ChunksIt, std::forward<TArgs>(args)...), {}};
+ }
+ }
+
+ iterator Erase(iterator pos) {
+ if (Inplace) {
+ return {{}, Inplace.erase(pos.InplaceIt)};
+ } else {
+ return {Chunks.erase(pos.ChunksIt), {}};
+ }
+ }
+
+ iterator Erase(iterator first, iterator last) {
+ if (Inplace) {
+ return {{}, Inplace.erase(first.InplaceIt, last.InplaceIt)};
+ } else {
+ return {Chunks.erase(first.ChunksIt, last.ChunksIt), {}};
+ }
+ }
+
+ void EraseFront() {
+ if (Inplace) {
+ Inplace.erase(Inplace.begin());
+ } else {
+ Chunks.pop_front();
+ }
+ }
+
+ void EraseBack() {
+ if (Inplace) {
+ Inplace.pop_back();
+ } else {
+ Chunks.pop_back();
+ }
+ }
+
+ iterator Splice(iterator pos, TChunkList& from, iterator first, iterator last) {
+ if (!Inplace) {
+ pos.InplaceIt = Inplace.end();
+ }
+ size_t n = 0;
+ for (auto it = first; it != last; ++it, ++n)
+ {}
+ if (Chunks.empty() && Inplace.size() + n <= MaxInplaceItems) {
+ if (first.InplaceIt != typename TInplace::iterator()) {
+ Inplace.insert(pos.InplaceIt, first.InplaceIt, last.InplaceIt);
+ } else {
+ Inplace.insert(pos.InplaceIt, first.ChunksIt, last.ChunksIt);
+ }
+ } else {
+ if (Inplace) {
+ Y_VERIFY_DEBUG(Chunks.empty());
+ for (auto& item : Inplace) {
+ Chunks.push_back(std::move(item));
+ }
+ pos.ChunksIt = pos.InplaceIt - Inplace.begin() + Chunks.begin();
+ Inplace.clear();
+ }
+ if (first.InplaceIt != typename TInplace::iterator()) {
+ Chunks.insert(pos.ChunksIt, first.InplaceIt, last.InplaceIt);
+ } else {
+ Chunks.insert(pos.ChunksIt, first.ChunksIt, last.ChunksIt);
+ }
+ }
+ return from.Erase(first, last);
+ }
+
+ operator bool() const { return !Inplace.empty() || !Chunks.empty(); }
+ TChunk& GetFirstChunk() { return Inplace ? Inplace.front() : Chunks.front(); }
+ const TChunk& GetFirstChunk() const { return Inplace ? Inplace.front() : Chunks.front(); }
+ TChunk& GetLastChunk() { return Inplace ? Inplace.back() : Chunks.back(); }
+ iterator begin() { return {Chunks.begin(), Inplace ? Inplace.begin() : typename TInplace::iterator()}; }
+ const_iterator begin() const { return {Chunks.begin(), Inplace ? Inplace.begin() : typename TInplace::const_iterator()}; }
+ iterator end() { return {Chunks.end(), Inplace ? Inplace.end() : typename TInplace::iterator()}; }
+ const_iterator end() const { return {Chunks.end(), Inplace ? Inplace.end() : typename TInplace::const_iterator()}; }
+};
+
+} // NRopeDetails
diff --git a/library/cpp/actors/util/rope_cont_list.h b/library/cpp/actors/util/rope_cont_list.h
new file mode 100644
index 0000000000..18c136284e
--- /dev/null
+++ b/library/cpp/actors/util/rope_cont_list.h
@@ -0,0 +1,159 @@
+#pragma once
+
+#include <util/generic/intrlist.h>
+
+namespace NRopeDetails {
+
+template<typename TChunk>
+class TChunkList {
+ struct TItem : TIntrusiveListItem<TItem>, TChunk {
+ // delegating constructor
+ template<typename... TArgs> TItem(TArgs&&... args) : TChunk(std::forward<TArgs>(args)...) {}
+ };
+
+ using TList = TIntrusiveList<TItem>;
+ TList List;
+
+ static constexpr size_t NumInplaceItems = 2;
+ char InplaceItems[sizeof(TItem) * NumInplaceItems];
+
+ template<typename... TArgs>
+ TItem *AllocateItem(TArgs&&... args) {
+ for (size_t index = 0; index < NumInplaceItems; ++index) {
+ TItem *chunk = GetInplaceItemPtr(index);
+ if (!TItem::IsInUse(*chunk)) {
+ return new(chunk) TItem(std::forward<TArgs>(args)...);
+ }
+ }
+ return new TItem(std::forward<TArgs>(args)...);
+ }
+
+ void ReleaseItem(TItem *chunk) {
+ if (IsInplaceItem(chunk)) {
+ chunk->~TItem();
+ TItem::Clear(*chunk);
+ } else {
+ delete chunk;
+ }
+ }
+
+ void ReleaseItems(TList& list) {
+ while (list) {
+ ReleaseItem(list.Front());
+ }
+ }
+
+ void Prepare() {
+ for (size_t index = 0; index < NumInplaceItems; ++index) {
+ TItem::Clear(*GetInplaceItemPtr(index));
+ }
+ }
+
+ TItem *GetInplaceItemPtr(size_t index) { return reinterpret_cast<TItem*>(InplaceItems + index * sizeof(TItem)); }
+ bool IsInplaceItem(TItem *chunk) { return chunk >= GetInplaceItemPtr(0) && chunk < GetInplaceItemPtr(NumInplaceItems); }
+
+public:
+ using iterator = typename TList::iterator;
+ using const_iterator = typename TList::const_iterator;
+
+public:
+ TChunkList() {
+ Prepare();
+ }
+
+ ~TChunkList() {
+ ReleaseItems(List);
+#ifndef NDEBUG
+ for (size_t index = 0; index < NumInplaceItems; ++index) {
+ Y_VERIFY(!TItem::IsInUse(*GetInplaceItemPtr(index)));
+ }
+#endif
+ }
+
+ TChunkList(const TChunkList& other) {
+ Prepare();
+ for (const TItem& chunk : other.List) {
+ PutToEnd(TChunk(chunk));
+ }
+ }
+
+ TChunkList(TChunkList&& other) {
+ Prepare();
+ Splice(end(), other, other.begin(), other.end());
+ }
+
+ TChunkList& operator=(const TChunkList& other) {
+ if (this != &other) {
+ ReleaseItems(List);
+ for (const TItem& chunk : other.List) {
+ PutToEnd(TChunk(chunk));
+ }
+ }
+ return *this;
+ }
+
+ TChunkList& operator=(TChunkList&& other) {
+ if (this != &other) {
+ ReleaseItems(List);
+ Splice(end(), other, other.begin(), other.end());
+ }
+ return *this;
+ }
+
+ template<typename... TArgs>
+ void PutToEnd(TArgs&&... args) {
+ InsertBefore(end(), std::forward<TArgs>(args)...);
+ }
+
+ template<typename... TArgs>
+ iterator InsertBefore(iterator pos, TArgs&&... args) {
+ TItem *item = AllocateItem<TArgs...>(std::forward<TArgs>(args)...);
+ item->LinkBefore(pos.Item());
+ return item;
+ }
+
+ iterator Erase(iterator pos) {
+ ReleaseItem(&*pos++);
+ return pos;
+ }
+
+ iterator Erase(iterator first, iterator last) {
+ TList temp;
+ TList::Cut(first, last, temp.end());
+ ReleaseItems(temp);
+ return last;
+ }
+
+ void EraseFront() {
+ ReleaseItem(List.PopFront());
+ }
+
+ void EraseBack() {
+ ReleaseItem(List.PopBack());
+ }
+
+ iterator Splice(iterator pos, TChunkList& from, iterator first, iterator last) {
+ for (auto it = first; it != last; ) {
+ if (from.IsInplaceItem(&*it)) {
+ TList::Cut(first, it, pos);
+ InsertBefore(pos, std::move(*it));
+ it = first = from.Erase(it);
+ } else {
+ ++it;
+ }
+ }
+ TList::Cut(first, last, pos);
+ return last;
+ }
+
+ operator bool() const { return static_cast<bool>(List); }
+ TChunk& GetFirstChunk() { return *List.Front(); }
+ const TChunk& GetFirstChunk() const { return *List.Front(); }
+ TChunk& GetLastChunk() { return *List.Back(); }
+ iterator begin() { return List.begin(); }
+ const_iterator begin() const { return List.begin(); }
+ iterator end() { return List.end(); }
+ const_iterator end() const { return List.end(); }
+};
+
+} // NRopeDetails
diff --git a/library/cpp/actors/util/rope_ut.cpp b/library/cpp/actors/util/rope_ut.cpp
new file mode 100644
index 0000000000..cabeed9230
--- /dev/null
+++ b/library/cpp/actors/util/rope_ut.cpp
@@ -0,0 +1,231 @@
+#include "rope.h"
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/random/random.h>
+
+class TRopeStringBackend : public IRopeChunkBackend {
+ TString Buffer;
+
+public:
+ TRopeStringBackend(TString buffer)
+ : Buffer(std::move(buffer))
+ {}
+
+ TData GetData() const override {
+ return {Buffer.data(), Buffer.size()};
+ }
+
+ size_t GetCapacity() const override {
+ return Buffer.capacity();
+ }
+};
+
+TRope CreateRope(TString s, size_t sliceSize) {
+ TRope res;
+ for (size_t i = 0; i < s.size(); ) {
+ size_t len = std::min(sliceSize, s.size() - i);
+ if (i % 2) {
+ res.Insert(res.End(), TRope(MakeIntrusive<TRopeStringBackend>(s.substr(i, len))));
+ } else {
+ res.Insert(res.End(), TRope(s.substr(i, len)));
+ }
+ i += len;
+ }
+ return res;
+}
+
+TString RopeToString(const TRope& rope) {
+ TString res;
+ auto iter = rope.Begin();
+ while (iter != rope.End()) {
+ res.append(iter.ContiguousData(), iter.ContiguousSize());
+ iter.AdvanceToNextContiguousBlock();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(rope.GetSize(), res.size());
+
+ TString temp = TString::Uninitialized(rope.GetSize());
+ rope.Begin().ExtractPlainDataAndAdvance(temp.Detach(), temp.size());
+ UNIT_ASSERT_VALUES_EQUAL(temp, res);
+
+ return res;
+}
+
+TString Text = "No elements are copied or moved, only the internal pointers of the list nodes are re-pointed.";
+
+Y_UNIT_TEST_SUITE(TRope) {
+
+ Y_UNIT_TEST(Leak) {
+ const size_t begin = 10, end = 20;
+ TRope rope = CreateRope(Text, 10);
+ rope.Erase(rope.Begin() + begin, rope.Begin() + end);
+ }
+
+ Y_UNIT_TEST(BasicRange) {
+ TRope rope = CreateRope(Text, 10);
+ for (size_t begin = 0; begin < Text.size(); ++begin) {
+ for (size_t end = begin; end <= Text.size(); ++end) {
+ TRope::TIterator rBegin = rope.Begin() + begin;
+ TRope::TIterator rEnd = rope.Begin() + end;
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(TRope(rBegin, rEnd)), Text.substr(begin, end - begin));
+ }
+ }
+ }
+
+ Y_UNIT_TEST(Erase) {
+ for (size_t begin = 0; begin < Text.size(); ++begin) {
+ for (size_t end = begin; end <= Text.size(); ++end) {
+ TRope rope = CreateRope(Text, 10);
+ rope.Erase(rope.Begin() + begin, rope.Begin() + end);
+ TString text = Text;
+ text.erase(text.begin() + begin, text.begin() + end);
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), text);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(Insert) {
+ TRope rope = CreateRope(Text, 10);
+ for (size_t begin = 0; begin < Text.size(); ++begin) {
+ for (size_t end = begin; end <= Text.size(); ++end) {
+ TRope part = TRope(rope.Begin() + begin, rope.Begin() + end);
+ for (size_t where = 0; where <= Text.size(); ++where) {
+ TRope x(rope);
+ x.Insert(x.Begin() + where, TRope(part));
+ UNIT_ASSERT_VALUES_EQUAL(x.GetSize(), rope.GetSize() + part.GetSize());
+ TString text = Text;
+ text.insert(text.begin() + where, Text.begin() + begin, Text.begin() + end);
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(x), text);
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST(Extract) {
+ for (size_t begin = 0; begin < Text.size(); ++begin) {
+ for (size_t end = begin; end <= Text.size(); ++end) {
+ TRope rope = CreateRope(Text, 10);
+ TRope part = rope.Extract(rope.Begin() + begin, rope.Begin() + end);
+ TString text = Text;
+ text.erase(text.begin() + begin, text.begin() + end);
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), text);
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(part), Text.substr(begin, end - begin));
+ }
+ }
+ }
+
+ Y_UNIT_TEST(EraseFront) {
+ for (size_t pos = 0; pos <= Text.size(); ++pos) {
+ TRope rope = CreateRope(Text, 10);
+ rope.EraseFront(pos);
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), Text.substr(pos));
+ }
+ }
+
+ Y_UNIT_TEST(EraseBack) {
+ for (size_t pos = 0; pos <= Text.size(); ++pos) {
+ TRope rope = CreateRope(Text, 10);
+ rope.EraseBack(pos);
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), Text.substr(0, Text.size() - pos));
+ }
+ }
+
+ Y_UNIT_TEST(ExtractFront) {
+ for (size_t step = 1; step <= Text.size(); ++step) {
+ TRope rope = CreateRope(Text, 10);
+ TRope out;
+ while (const size_t len = Min(step, rope.GetSize())) {
+ rope.ExtractFront(len, &out);
+ UNIT_ASSERT(rope.GetSize() + out.GetSize() == Text.size());
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(out), Text.substr(0, out.GetSize()));
+ }
+ }
+ }
+
+ Y_UNIT_TEST(ExtractFrontPlain) {
+ for (size_t step = 1; step <= Text.size(); ++step) {
+ TRope rope = CreateRope(Text, 10);
+ TString buffer = Text;
+ auto it = rope.Begin();
+ size_t remain = rope.GetSize();
+ while (const size_t len = Min(step, remain)) {
+ TString data = TString::Uninitialized(len);
+ it.ExtractPlainDataAndAdvance(data.Detach(), data.size());
+ UNIT_ASSERT_VALUES_EQUAL(data, buffer.substr(0, len));
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(TRope(it, rope.End())), buffer.substr(len));
+ buffer = buffer.substr(len);
+ remain -= len;
+ }
+ }
+ }
+
+ Y_UNIT_TEST(FetchFrontPlain) {
+ char s[10];
+ char *data = s;
+ size_t remain = sizeof(s);
+ TRope rope = TRope(TString("HELLO"));
+ UNIT_ASSERT(!rope.FetchFrontPlain(&data, &remain));
+ UNIT_ASSERT(!rope);
+ rope.Insert(rope.End(), TRope(TString("WORLD!!!")));
+ UNIT_ASSERT(rope.FetchFrontPlain(&data, &remain));
+ UNIT_ASSERT(!remain);
+ UNIT_ASSERT(rope.GetSize() == 3);
+ UNIT_ASSERT_VALUES_EQUAL(rope.ConvertToString(), "!!!");
+ UNIT_ASSERT(!strncmp(s, "HELLOWORLD", 10));
+ }
+
+ Y_UNIT_TEST(Glueing) {
+ TRope rope = CreateRope(Text, 10);
+ for (size_t begin = 0; begin <= Text.size(); ++begin) {
+ for (size_t end = begin; end <= Text.size(); ++end) {
+ TString repr = rope.DebugString();
+ TRope temp = rope.Extract(rope.Position(begin), rope.Position(end));
+ rope.Insert(rope.Position(begin), std::move(temp));
+ UNIT_ASSERT_VALUES_EQUAL(repr, rope.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL(RopeToString(rope), Text);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(IterWalk) {
+ TRope rope = CreateRope(Text, 10);
+ for (size_t step1 = 0; step1 <= rope.GetSize(); ++step1) {
+ for (size_t step2 = 0; step2 <= step1; ++step2) {
+ TRope::TConstIterator iter = rope.Begin();
+ iter += step1;
+ iter -= step2;
+ UNIT_ASSERT(iter == rope.Position(step1 - step2));
+ }
+ }
+ }
+
+ Y_UNIT_TEST(Compare) {
+ auto check = [](const TString& x, const TString& y) {
+ const TRope xRope = CreateRope(x, 7);
+ const TRope yRope = CreateRope(y, 11);
+ UNIT_ASSERT_VALUES_EQUAL(xRope == yRope, x == y);
+ UNIT_ASSERT_VALUES_EQUAL(xRope != yRope, x != y);
+ UNIT_ASSERT_VALUES_EQUAL(xRope < yRope, x < y);
+ UNIT_ASSERT_VALUES_EQUAL(xRope <= yRope, x <= y);
+ UNIT_ASSERT_VALUES_EQUAL(xRope > yRope, x > y);
+ UNIT_ASSERT_VALUES_EQUAL(xRope >= yRope, x >= y);
+ };
+
+ TVector<TString> pool;
+ for (size_t k = 0; k < 10; ++k) {
+ size_t len = RandomNumber<size_t>(100) + 100;
+ TString s = TString::Uninitialized(len);
+ char *p = s.Detach();
+ for (size_t j = 0; j < len; ++j) {
+ *p++ = RandomNumber<unsigned char>();
+ }
+ pool.push_back(std::move(s));
+ }
+
+ for (const TString& x : pool) {
+ for (const TString& y : pool) {
+ check(x, y);
+ }
+ }
+ }
+
+}
diff --git a/library/cpp/actors/util/should_continue.cpp b/library/cpp/actors/util/should_continue.cpp
new file mode 100644
index 0000000000..258e6a0aff
--- /dev/null
+++ b/library/cpp/actors/util/should_continue.cpp
@@ -0,0 +1,23 @@
+#include "should_continue.h"
+
+void TProgramShouldContinue::ShouldRestart() {
+ AtomicSet(State, Restart);
+}
+
+void TProgramShouldContinue::ShouldStop(int returnCode) {
+ AtomicSet(ReturnCode, returnCode);
+ AtomicSet(State, Stop);
+}
+
+TProgramShouldContinue::EState TProgramShouldContinue::PollState() {
+ return static_cast<EState>(AtomicGet(State));
+}
+
+int TProgramShouldContinue::GetReturnCode() {
+ return static_cast<int>(AtomicGet(ReturnCode));
+}
+
+void TProgramShouldContinue::Reset() {
+ AtomicSet(ReturnCode, 0);
+ AtomicSet(State, Continue);
+}
diff --git a/library/cpp/actors/util/should_continue.h b/library/cpp/actors/util/should_continue.h
new file mode 100644
index 0000000000..76acc40dc4
--- /dev/null
+++ b/library/cpp/actors/util/should_continue.h
@@ -0,0 +1,22 @@
+#pragma once
+#include "defs.h"
+
+class TProgramShouldContinue {
+public:
+ enum EState {
+ Continue,
+ Stop,
+ Restart,
+ };
+
+ void ShouldRestart();
+ void ShouldStop(int returnCode = 0);
+
+ EState PollState();
+ int GetReturnCode();
+
+ void Reset();
+private:
+ TAtomic ReturnCode = 0;
+ TAtomic State = Continue;
+};
diff --git a/library/cpp/actors/util/thread.h b/library/cpp/actors/util/thread.h
new file mode 100644
index 0000000000..d742c8c585
--- /dev/null
+++ b/library/cpp/actors/util/thread.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <util/generic/strbuf.h>
+#include <util/stream/str.h>
+#include <util/system/execpath.h>
+#include <util/system/thread.h>
+#include <util/system/thread.h>
+#include <time.h>
+
+inline void SetCurrentThreadName(const TString& name,
+ const ui32 maxCharsFromProcessName = 8) {
+#if defined(_linux_)
+ // linux limits threadname by 15 + \0
+
+ TStringBuf procName(GetExecPath());
+ procName = procName.RNextTok('/');
+ procName = procName.SubStr(0, maxCharsFromProcessName);
+
+ TStringStream linuxName;
+ linuxName << procName << "." << name;
+ TThread::SetCurrentThreadName(linuxName.Str().data());
+#else
+ Y_UNUSED(maxCharsFromProcessName);
+ TThread::SetCurrentThreadName(name.data());
+#endif
+}
diff --git a/library/cpp/actors/util/threadparkpad.cpp b/library/cpp/actors/util/threadparkpad.cpp
new file mode 100644
index 0000000000..74069ff15b
--- /dev/null
+++ b/library/cpp/actors/util/threadparkpad.cpp
@@ -0,0 +1,148 @@
+#include "threadparkpad.h"
+#include <util/system/winint.h>
+
+#ifdef _linux_
+
+#include "futex.h"
+
+namespace NActors {
+ class TThreadParkPad::TImpl {
+ volatile bool Interrupted;
+ int Futex;
+
+ public:
+ TImpl()
+ : Interrupted(false)
+ , Futex(0)
+ {
+ }
+ ~TImpl() {
+ }
+
+ bool Park() noexcept {
+ __atomic_fetch_sub(&Futex, 1, __ATOMIC_SEQ_CST);
+ while (__atomic_load_n(&Futex, __ATOMIC_ACQUIRE) == -1)
+ SysFutex(&Futex, FUTEX_WAIT_PRIVATE, -1, nullptr, nullptr, 0);
+ return IsInterrupted();
+ }
+
+ void Unpark() noexcept {
+ const int old = __atomic_fetch_add(&Futex, 1, __ATOMIC_SEQ_CST);
+ if (old == -1)
+ SysFutex(&Futex, FUTEX_WAKE_PRIVATE, -1, nullptr, nullptr, 0);
+ }
+
+ void Interrupt() noexcept {
+ __atomic_store_n(&Interrupted, true, __ATOMIC_SEQ_CST);
+ Unpark();
+ }
+
+ bool IsInterrupted() const noexcept {
+ return __atomic_load_n(&Interrupted, __ATOMIC_ACQUIRE);
+ }
+ };
+
+#elif defined _win32_
+#include <util/generic/bt_exception.h>
+#include <util/generic/yexception.h>
+
+namespace NActors {
+ class TThreadParkPad::TImpl {
+ TAtomic Interrupted;
+ HANDLE EvHandle;
+
+ public:
+ TImpl()
+ : Interrupted(false)
+ {
+ EvHandle = ::CreateEvent(0, false, false, 0);
+ if (!EvHandle)
+ ythrow TWithBackTrace<yexception>() << "::CreateEvent failed";
+ }
+ ~TImpl() {
+ if (EvHandle)
+ ::CloseHandle(EvHandle);
+ }
+
+ bool Park() noexcept {
+ ::WaitForSingleObject(EvHandle, INFINITE);
+ return AtomicGet(Interrupted);
+ }
+
+ void Unpark() noexcept {
+ ::SetEvent(EvHandle);
+ }
+
+ void Interrupt() noexcept {
+ AtomicSet(Interrupted, true);
+ Unpark();
+ }
+
+ bool IsInterrupted() const noexcept {
+ return AtomicGet(Interrupted);
+ }
+ };
+
+#else
+
+#include <util/system/event.h>
+
+namespace NActors {
+ class TThreadParkPad::TImpl {
+ TAtomic Interrupted;
+ TSystemEvent Ev;
+
+ public:
+ TImpl()
+ : Interrupted(false)
+ , Ev(TSystemEvent::rAuto)
+ {
+ }
+ ~TImpl() {
+ }
+
+ bool Park() noexcept {
+ Ev.Wait();
+ return AtomicGet(Interrupted);
+ }
+
+ void Unpark() noexcept {
+ Ev.Signal();
+ }
+
+ void Interrupt() noexcept {
+ AtomicSet(Interrupted, true);
+ Unpark();
+ }
+
+ bool IsInterrupted() const noexcept {
+ return AtomicGet(Interrupted);
+ }
+ };
+#endif
+
+ TThreadParkPad::TThreadParkPad()
+ : Impl(new TThreadParkPad::TImpl())
+ {
+ }
+
+ TThreadParkPad::~TThreadParkPad() {
+ }
+
+ bool TThreadParkPad::Park() noexcept {
+ return Impl->Park();
+ }
+
+ void TThreadParkPad::Unpark() noexcept {
+ Impl->Unpark();
+ }
+
+ void TThreadParkPad::Interrupt() noexcept {
+ Impl->Interrupt();
+ }
+
+ bool TThreadParkPad::Interrupted() const noexcept {
+ return Impl->IsInterrupted();
+ }
+
+}
diff --git a/library/cpp/actors/util/threadparkpad.h b/library/cpp/actors/util/threadparkpad.h
new file mode 100644
index 0000000000..5b574ccf34
--- /dev/null
+++ b/library/cpp/actors/util/threadparkpad.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+
+namespace NActors {
+ class TThreadParkPad {
+ private:
+ class TImpl;
+ THolder<TImpl> Impl;
+
+ public:
+ TThreadParkPad();
+ ~TThreadParkPad();
+
+ bool Park() noexcept;
+ void Unpark() noexcept;
+ void Interrupt() noexcept;
+ bool Interrupted() const noexcept;
+ };
+
+}
diff --git a/library/cpp/actors/util/ticket_lock.h b/library/cpp/actors/util/ticket_lock.h
new file mode 100644
index 0000000000..3b1fa80393
--- /dev/null
+++ b/library/cpp/actors/util/ticket_lock.h
@@ -0,0 +1,48 @@
+#pragma once
+
+#include "intrinsics.h"
+#include <util/system/guard.h>
+#include <util/system/yassert.h>
+
+class TTicketLock : TNonCopyable {
+ ui32 TicketIn;
+ ui32 TicketOut;
+
+public:
+ TTicketLock()
+ : TicketIn(0)
+ , TicketOut(0)
+ {
+ }
+
+ void Release() noexcept {
+ AtomicUi32Increment(&TicketOut);
+ }
+
+ ui32 Acquire() noexcept {
+ ui32 revolves = 0;
+ const ui32 ticket = AtomicUi32Increment(&TicketIn) - 1;
+ while (ticket != AtomicLoad(&TicketOut)) {
+ Y_VERIFY_DEBUG(ticket >= AtomicLoad(&TicketOut));
+ SpinLockPause();
+ ++revolves;
+ }
+ return revolves;
+ }
+
+ bool TryAcquire() noexcept {
+ const ui32 x = AtomicLoad(&TicketOut);
+ if (x == AtomicLoad(&TicketIn) && AtomicUi32Cas(&TicketIn, x + 1, x))
+ return true;
+ else
+ return false;
+ }
+
+ bool IsLocked() noexcept {
+ const ui32 ticketIn = AtomicLoad(&TicketIn);
+ const ui32 ticketOut = AtomicLoad(&TicketOut);
+ return (ticketIn != ticketOut);
+ }
+
+ typedef ::TGuard<TTicketLock> TGuard;
+};
diff --git a/library/cpp/actors/util/timerfd.h b/library/cpp/actors/util/timerfd.h
new file mode 100644
index 0000000000..3189e2a672
--- /dev/null
+++ b/library/cpp/actors/util/timerfd.h
@@ -0,0 +1,65 @@
+#pragma once
+
+#include "datetime.h"
+
+#include <util/generic/noncopyable.h>
+
+#ifdef _linux_
+
+#include <util/system/yassert.h>
+#include <errno.h>
+#include <sys/timerfd.h>
+
+struct TTimerFd: public TNonCopyable {
+ int Fd;
+
+ TTimerFd() {
+ Fd = timerfd_create(CLOCK_MONOTONIC, 0);
+ Y_VERIFY(Fd != -1, "timerfd_create(CLOCK_MONOTONIC, 0) -> -1; errno:%d: %s", int(errno), strerror(errno));
+ }
+
+ ~TTimerFd() {
+ close(Fd);
+ }
+
+ void Set(ui64 ts) {
+ ui64 now = GetCycleCountFast();
+ Arm(now >= ts? 1: NHPTimer::GetSeconds(ts - now) * 1e9);
+ }
+
+ void Reset() {
+ Arm(0); // disarm timer
+ }
+
+ void Wait() {
+ ui64 expirations;
+ ssize_t s = read(Fd, &expirations, sizeof(ui64));
+ Y_UNUSED(s); // Y_VERIFY(s == sizeof(ui64));
+ }
+
+ void Wake() {
+ Arm(1);
+ }
+private:
+ void Arm(ui64 ns) {
+ struct itimerspec spec;
+ spec.it_value.tv_sec = ns / 1'000'000'000;
+ spec.it_value.tv_nsec = ns % 1'000'000'000;
+ spec.it_interval.tv_sec = 0;
+ spec.it_interval.tv_nsec = 0;
+ int ret = timerfd_settime(Fd, 0, &spec, nullptr);
+ Y_VERIFY(ret != -1, "timerfd_settime(%d, 0, %" PRIu64 "ns, 0) -> %d; errno:%d: %s", Fd, ns, ret, int(errno), strerror(errno));
+ }
+};
+
+#else
+
+struct TTimerFd: public TNonCopyable {
+ int Fd = 0;
+ void Set(ui64) {}
+ void Reset() {}
+ void Wait() {}
+ void Wake() {}
+};
+
+#endif
diff --git a/library/cpp/actors/util/unordered_cache.h b/library/cpp/actors/util/unordered_cache.h
new file mode 100644
index 0000000000..76f036c0cf
--- /dev/null
+++ b/library/cpp/actors/util/unordered_cache.h
@@ -0,0 +1,201 @@
+#pragma once
+
+#include "defs.h"
+#include "queue_chunk.h"
+
+template <typename T, ui32 Size = 512, ui32 ConcurrencyFactor = 1, typename TChunk = TQueueChunk<T, Size>>
+class TUnorderedCache : TNonCopyable {
+ static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
+
+public:
+ static constexpr ui32 Concurrency = ConcurrencyFactor * 4;
+
+private:
+ struct TReadSlot {
+ TChunk* volatile ReadFrom;
+ volatile ui32 ReadPosition;
+ char Padding[64 - sizeof(TChunk*) - sizeof(ui32)]; // 1 slot per cache line
+ };
+
+ struct TWriteSlot {
+ TChunk* volatile WriteTo;
+ volatile ui32 WritePosition;
+ char Padding[64 - sizeof(TChunk*) - sizeof(ui32)]; // 1 slot per cache line
+ };
+
+ static_assert(sizeof(TReadSlot) == 64, "expect sizeof(TReadSlot) == 64");
+ static_assert(sizeof(TWriteSlot) == 64, "expect sizeof(TWriteSlot) == 64");
+
+private:
+ TReadSlot ReadSlots[Concurrency];
+ TWriteSlot WriteSlots[Concurrency];
+
+ static_assert(sizeof(TChunk*) == sizeof(TAtomic), "expect sizeof(TChunk*) == sizeof(TAtomic)");
+
+private:
+ struct TLockedWriter {
+ TWriteSlot* Slot;
+ TChunk* WriteTo;
+
+ TLockedWriter()
+ : Slot(nullptr)
+ , WriteTo(nullptr)
+ { }
+
+ TLockedWriter(TWriteSlot* slot, TChunk* writeTo)
+ : Slot(slot)
+ , WriteTo(writeTo)
+ { }
+
+ ~TLockedWriter() noexcept {
+ Drop();
+ }
+
+ void Drop() {
+ if (Slot) {
+ AtomicStore(&Slot->WriteTo, WriteTo);
+ Slot = nullptr;
+ }
+ }
+
+ TLockedWriter(const TLockedWriter&) = delete;
+ TLockedWriter& operator=(const TLockedWriter&) = delete;
+
+ TLockedWriter(TLockedWriter&& rhs)
+ : Slot(rhs.Slot)
+ , WriteTo(rhs.WriteTo)
+ {
+ rhs.Slot = nullptr;
+ }
+
+ TLockedWriter& operator=(TLockedWriter&& rhs) {
+ if (Y_LIKELY(this != &rhs)) {
+ Drop();
+ Slot = rhs.Slot;
+ WriteTo = rhs.WriteTo;
+ rhs.Slot = nullptr;
+ }
+ return *this;
+ }
+ };
+
+private:
+ TLockedWriter LockWriter(ui64 writerRotation) {
+ ui32 cycle = 0;
+ for (;;) {
+ TWriteSlot* slot = &WriteSlots[writerRotation % Concurrency];
+ if (AtomicLoad(&slot->WriteTo) != nullptr) {
+ if (TChunk* writeTo = AtomicSwap(&slot->WriteTo, nullptr)) {
+ return TLockedWriter(slot, writeTo);
+ }
+ }
+ ++writerRotation;
+
+ // Do a spinlock pause after a full cycle
+ if (++cycle == Concurrency) {
+ SpinLockPause();
+ cycle = 0;
+ }
+ }
+ }
+
+ void WriteOne(TLockedWriter& lock, T x) {
+ Y_VERIFY_DEBUG(x != 0);
+
+ const ui32 pos = AtomicLoad(&lock.Slot->WritePosition);
+ if (pos != TChunk::EntriesCount) {
+ AtomicStore(&lock.Slot->WritePosition, pos + 1);
+ AtomicStore(&lock.WriteTo->Entries[pos], x);
+ } else {
+ TChunk* next = new TChunk();
+ AtomicStore(&next->Entries[0], x);
+ AtomicStore(&lock.Slot->WritePosition, 1u);
+ AtomicStore(&lock.WriteTo->Next, next);
+ lock.WriteTo = next;
+ }
+ }
+
+public:
+ TUnorderedCache() {
+ for (ui32 i = 0; i < Concurrency; ++i) {
+ ReadSlots[i].ReadFrom = new TChunk();
+ ReadSlots[i].ReadPosition = 0;
+
+ WriteSlots[i].WriteTo = ReadSlots[i].ReadFrom;
+ WriteSlots[i].WritePosition = 0;
+ }
+ }
+
+ ~TUnorderedCache() {
+ Y_VERIFY(!Pop(0));
+
+ for (ui64 i = 0; i < Concurrency; ++i) {
+ if (ReadSlots[i].ReadFrom) {
+ delete ReadSlots[i].ReadFrom;
+ ReadSlots[i].ReadFrom = nullptr;
+ }
+ WriteSlots[i].WriteTo = nullptr;
+ }
+ }
+
+ T Pop(ui64 readerRotation) noexcept {
+ ui64 readerIndex = readerRotation;
+ const ui64 endIndex = readerIndex + Concurrency;
+ for (; readerIndex != endIndex; ++readerIndex) {
+ TReadSlot* slot = &ReadSlots[readerIndex % Concurrency];
+ if (AtomicLoad(&slot->ReadFrom) != nullptr) {
+ if (TChunk* readFrom = AtomicSwap(&slot->ReadFrom, nullptr)) {
+ const ui32 pos = AtomicLoad(&slot->ReadPosition);
+ if (pos != TChunk::EntriesCount) {
+ if (T ret = AtomicLoad(&readFrom->Entries[pos])) {
+ AtomicStore(&slot->ReadPosition, pos + 1);
+ AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk
+ return ret; // found, return
+ } else {
+ AtomicStore(&slot->ReadFrom, readFrom); // release lock with same chunk
+ }
+ } else if (TChunk* next = AtomicLoad(&readFrom->Next)) {
+ if (T ret = AtomicLoad(&next->Entries[0])) {
+ AtomicStore(&slot->ReadPosition, 1u);
+ AtomicStore(&slot->ReadFrom, next); // release lock with next chunk
+ delete readFrom;
+ return ret;
+ } else {
+ AtomicStore(&slot->ReadPosition, 0u);
+ AtomicStore(&slot->ReadFrom, next); // release lock with new chunk
+ delete readFrom;
+ }
+ } else {
+ // nothing in old chunk and no next chunk, just release lock with old chunk
+ AtomicStore(&slot->ReadFrom, readFrom);
+ }
+ }
+ }
+ }
+
+ return 0; // got nothing after full cycle, return
+ }
+
+ void Push(T x, ui64 writerRotation) {
+ TLockedWriter lock = LockWriter(writerRotation);
+ WriteOne(lock, x);
+ }
+
+ void PushBulk(T* x, ui32 xcount, ui64 writerRotation) {
+ for (;;) {
+ // Fill no more then one queue chunk per round
+ const ui32 xround = Min(xcount, (ui32)TChunk::EntriesCount);
+
+ {
+ TLockedWriter lock = LockWriter(writerRotation++);
+ for (T* end = x + xround; x != end; ++x)
+ WriteOne(lock, *x);
+ }
+
+ if (xcount <= TChunk::EntriesCount)
+ break;
+
+ xcount -= TChunk::EntriesCount;
+ }
+ }
+};
diff --git a/library/cpp/actors/util/unordered_cache_ut.cpp b/library/cpp/actors/util/unordered_cache_ut.cpp
new file mode 100644
index 0000000000..37865f2f91
--- /dev/null
+++ b/library/cpp/actors/util/unordered_cache_ut.cpp
@@ -0,0 +1,138 @@
+#include "unordered_cache.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/random/random.h>
+#include <util/system/hp_timer.h>
+#include <util/system/sanitizers.h>
+#include <util/system/thread.h>
+
+Y_UNIT_TEST_SUITE(UnorderedCache) {
+
+ void DoOnePushOnePop(ui64 count) {
+ TUnorderedCache<ui64> queue;
+
+ ui64 readRotation = 0;
+ ui64 writeRotation = 0;
+
+ auto popped = queue.Pop(readRotation++);
+ UNIT_ASSERT_VALUES_EQUAL(popped, 0u);
+
+ for (ui64 i = 0; i < count; ++i) {
+ queue.Push(i + 1, writeRotation++);
+ popped = queue.Pop(readRotation++);
+ UNIT_ASSERT_VALUES_EQUAL(popped, i + 1);
+
+ popped = queue.Pop(readRotation++);
+ UNIT_ASSERT_VALUES_EQUAL(popped, 0u);
+ }
+ }
+
+ Y_UNIT_TEST(OnePushOnePop) {
+ DoOnePushOnePop(1);
+ }
+
+ Y_UNIT_TEST(OnePushOnePop_Repeat1M) {
+ DoOnePushOnePop(1000000);
+ }
+
+ /**
+ * Simplified thread spawning for testing
+ */
+ class TWorkerThread : public ISimpleThread {
+ private:
+ std::function<void()> Func;
+ double Time = 0.0;
+
+ public:
+ TWorkerThread(std::function<void()> func)
+ : Func(std::move(func))
+ { }
+
+ double GetTime() const {
+ return Time;
+ }
+
+ static THolder<TWorkerThread> Spawn(std::function<void()> func) {
+ THolder<TWorkerThread> thread = MakeHolder<TWorkerThread>(std::move(func));
+ thread->Start();
+ return thread;
+ }
+
+ private:
+ void* ThreadProc() noexcept override {
+ THPTimer timer;
+ Func();
+ Time = timer.Passed();
+ return nullptr;
+ }
+ };
+
+ void DoConcurrentPushPop(size_t threads, ui64 perThreadCount) {
+ // Concurrency factor 4 is up to 16 threads
+ TUnorderedCache<ui64, 512, 4> queue;
+
+ auto workerFunc = [&](size_t threadIndex) {
+ ui64 readRotation = 0;
+ ui64 writeRotation = 0;
+ ui64 readsDone = 0;
+ ui64 writesDone = 0;
+ for (;;) {
+ bool canRead = readsDone < writesDone;
+ bool canWrite = writesDone < perThreadCount;
+ if (!canRead && !canWrite) {
+ break;
+ }
+ if (canRead && canWrite) {
+ // Randomly choose between read and write
+ if (RandomNumber<ui64>(2)) {
+ canRead = false;
+ } else {
+ canWrite = false;
+ }
+ }
+ if (canRead) {
+ ui64 popped = queue.Pop(readRotation++);
+ if (popped) {
+ ++readsDone;
+ }
+ }
+ if (canWrite) {
+ queue.Push(1 + writesDone * threads + threadIndex, writeRotation++);
+ ++writesDone;
+ }
+ }
+ };
+
+ TVector<THolder<TWorkerThread>> workers(threads);
+ for (size_t i = 0; i < threads; ++i) {
+ workers[i] = TWorkerThread::Spawn([workerFunc, i]() {
+ workerFunc(i);
+ });
+ }
+
+ double maxTime = 0;
+ for (size_t i = 0; i < threads; ++i) {
+ workers[i]->Join();
+ maxTime = Max(maxTime, workers[i]->GetTime());
+ }
+
+ auto popped = queue.Pop(0);
+ UNIT_ASSERT_VALUES_EQUAL(popped, 0u);
+
+ Cerr << "Concurrent with " << threads << " threads: " << maxTime << " seconds" << Endl;
+ }
+
+ void DoConcurrentPushPop_3times(size_t threads, ui64 perThreadCount) {
+ for (size_t i = 0; i < 3; ++i) {
+ DoConcurrentPushPop(threads, perThreadCount);
+ }
+ }
+
+ static constexpr ui64 PER_THREAD_COUNT = NSan::PlainOrUnderSanitizer(1000000, 100000);
+
+ Y_UNIT_TEST(ConcurrentPushPop_1thread) { DoConcurrentPushPop_3times(1, PER_THREAD_COUNT); }
+ Y_UNIT_TEST(ConcurrentPushPop_2threads) { DoConcurrentPushPop_3times(2, PER_THREAD_COUNT); }
+ Y_UNIT_TEST(ConcurrentPushPop_4threads) { DoConcurrentPushPop_3times(4, PER_THREAD_COUNT); }
+ Y_UNIT_TEST(ConcurrentPushPop_8threads) { DoConcurrentPushPop_3times(8, PER_THREAD_COUNT); }
+ Y_UNIT_TEST(ConcurrentPushPop_16threads) { DoConcurrentPushPop_3times(16, PER_THREAD_COUNT); }
+}
diff --git a/library/cpp/actors/util/ut/ya.make b/library/cpp/actors/util/ut/ya.make
new file mode 100644
index 0000000000..3b08b77984
--- /dev/null
+++ b/library/cpp/actors/util/ut/ya.make
@@ -0,0 +1,18 @@
+UNITTEST_FOR(library/cpp/actors/util)
+
+IF (WITH_VALGRIND)
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+OWNER(
+ alexvru
+ g:kikimr
+)
+
+SRCS(
+ rope_ut.cpp
+ unordered_cache_ut.cpp
+)
+
+END()
diff --git a/library/cpp/actors/util/ya.make b/library/cpp/actors/util/ya.make
new file mode 100644
index 0000000000..37488c3962
--- /dev/null
+++ b/library/cpp/actors/util/ya.make
@@ -0,0 +1,37 @@
+LIBRARY()
+
+OWNER(
+ ddoarn
+ g:kikimr
+)
+
+SRCS(
+ affinity.cpp
+ affinity.h
+ cpumask.h
+ datetime.h
+ defs.h
+ funnel_queue.h
+ futex.h
+ intrinsics.h
+ local_process_key.h
+ named_tuple.h
+ queue_chunk.h
+ queue_oneone_inplace.h
+ recentwnd.h
+ rope.h
+ should_continue.cpp
+ should_continue.h
+ thread.h
+ threadparkpad.cpp
+ threadparkpad.h
+ ticket_lock.h
+ timerfd.h
+ unordered_cache.h
+)
+
+PEERDIR(
+ util
+)
+
+END()