diff options
author | ponasenko-rs <ponasenko-rs@yandex-team.com> | 2025-02-11 01:45:57 +0300 |
---|---|---|
committer | ponasenko-rs <ponasenko-rs@yandex-team.com> | 2025-02-11 02:00:13 +0300 |
commit | 88d7603a66bde5ae72e9b2ebe8c085f773859bf2 (patch) | |
tree | 2fddfeda3e32968da538d517ca4294dd56e0d123 | |
parent | 17cc83379606b885d021e15a7cf5941210eba0b5 (diff) | |
download | ydb-88d7603a66bde5ae72e9b2ebe8c085f773859bf2.tar.gz |
YT-22975: Per-row sequencer
Create per-tablet notion of a serialization type. This option can be changed only on unmounted dynamic table.
New per-row serialization type could be enabled only on sorted dynamic tables and could be useful only for shared write locks.
If per-row serialization type is enabled rows are serialized using per-row set of prepared and heap of serializing transactions on sorted store manager level.
Old serialization only allows transaction to be serialized whole using per-cell set of prepared and heap of serializing transactions which means that non-conflicting prepared transactions are interfering during serialization.
Most of changes are disabled as long as serialization type kept default (coarse).
But wire commands are now kept in memory in a parsed form and save to snapshot has another bound using timestamps.
* Changelog entry
Type: feature
Component: dynamic-tables
Add option to decrease serialization time by serializing transaction within each lock group in each row separately.
commit_hash:2c6bada00254a893c32b50e5b20d4d4bbbdd962b
-rw-r--r-- | yt/yt/client/table_client/public.h | 5 | ||||
-rw-r--r-- | yt/yt/client/tablet_client/table_mount_cache.h | 2 | ||||
-rw-r--r-- | yt/yt/core/misc/persistent_queue-inl.h | 55 | ||||
-rw-r--r-- | yt/yt/core/misc/persistent_queue.h | 41 |
4 files changed, 98 insertions, 5 deletions
diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h index d15824f283..1c15f1a8b2 100644 --- a/yt/yt/client/table_client/public.h +++ b/yt/yt/client/table_client/public.h @@ -165,6 +165,11 @@ DEFINE_ENUM_WITH_UNDERLYING_TYPE(EOptimizeFor, int, ((Scan) (1)) ); +DEFINE_ENUM_WITH_UNDERLYING_TYPE(ETabletTransactionSerializationType, i8, + ((Coarse) (0)) + ((PerRow) (1)) +); + YT_DEFINE_ERROR_ENUM( ((SortOrderViolation) (301)) ((InvalidDoubleValue) (302)) diff --git a/yt/yt/client/tablet_client/table_mount_cache.h b/yt/yt/client/tablet_client/table_mount_cache.h index d8c1d64948..44869b2728 100644 --- a/yt/yt/client/tablet_client/table_mount_cache.h +++ b/yt/yt/client/tablet_client/table_mount_cache.h @@ -133,6 +133,8 @@ struct TTableMountInfo final bool EnableDetailedProfiling = false; + NTableClient::ETabletTransactionSerializationType SerializationType = NTableClient::ETabletTransactionSerializationType::Coarse; + bool IsSorted() const; bool IsOrdered() const; bool IsReplicated() const; diff --git a/yt/yt/core/misc/persistent_queue-inl.h b/yt/yt/core/misc/persistent_queue-inl.h index d94ee4948c..829d13d172 100644 --- a/yt/yt/core/misc/persistent_queue-inl.h +++ b/yt/yt/core/misc/persistent_queue-inl.h @@ -44,6 +44,12 @@ const T& TPersistentQueueIterator<T, ChunkSize>::operator*() const } template <class T, size_t ChunkSize> +const T* TPersistentQueueIterator<T, ChunkSize>::operator->() const +{ + return &CurrentChunk_->Elements[CurrentIndex_]; +} + +template <class T, size_t ChunkSize> TPersistentQueueIterator<T, ChunkSize>::TPersistentQueueIterator( TChunkPtr chunk, size_t index) @@ -177,6 +183,55 @@ void TPersistentQueue<T, ChunkSize>::Load(C& context) } } +template <class T, size_t ChunkSize> +void TIndexedPersistentQueue<T, ChunkSize>::Enqueue(T value) +{ + YT_VERIFY(!Frozen_); + return TBase::Enqueue(std::move(value)); +} + +template <class T, size_t ChunkSize> +T TIndexedPersistentQueue<T, ChunkSize>::Dequeue() +{ + YT_VERIFY(!Frozen_); + return TBase::Dequeue(); +} + +template <class T, size_t ChunkSize> +void TIndexedPersistentQueue<T, ChunkSize>::Clear() +{ + if (Frozen_) { + Shift_ = 0; + Chunks_.clear(); + + Frozen_ = false; + } + + return TBase::Clear(); +} + +template <class T, size_t ChunkSize> +void TIndexedPersistentQueue<T, ChunkSize>::Freeze() +{ + auto& tail = TBase::Tail_; + Shift_ = tail.CurrentIndex_; + for (auto chunk = tail.CurrentChunk_; chunk != nullptr; chunk = chunk->Next) { + Chunks_.push_back(&*chunk); + } + + Frozen_ = true; +} + +template <class T, size_t ChunkSize> +const T& TIndexedPersistentQueue<T, ChunkSize>::operator[](int index) const +{ + YT_VERIFY(Frozen_); + auto shiftedIndex = index + Shift_; + auto chunkIndex = shiftedIndex / ChunkSize; + auto indexInChunk = shiftedIndex % ChunkSize; + return Chunks_[chunkIndex]->Elements[indexInChunk]; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/yt/core/misc/persistent_queue.h b/yt/yt/core/misc/persistent_queue.h index fb78fc37b5..d5d4458ae4 100644 --- a/yt/yt/core/misc/persistent_queue.h +++ b/yt/yt/core/misc/persistent_queue.h @@ -19,6 +19,9 @@ class TPersistentQueueIterator; template <class T, size_t ChunkSize> class TPersistentQueue; +template <class T, size_t ChunkSize> +class TIndexedPersistentQueue; + //////////////////////////////////////////////////////////////////////////////// // Implementation. @@ -39,7 +42,8 @@ public: TPersistentQueueIterator& operator++(); // prefix TPersistentQueueIterator operator++(int); // postfix - const T& operator * () const; + const T& operator*() const; + const T* operator->() const; bool operator==(const TPersistentQueueIterator& other) const = default; @@ -49,6 +53,7 @@ private: friend class TPersistentQueueBase<T, ChunkSize>; friend class TPersistentQueue<T, ChunkSize>; + friend class TIndexedPersistentQueue<T, ChunkSize>; TPersistentQueueIterator(TChunkPtr chunk, size_t index); @@ -81,7 +86,6 @@ protected: size_t Size_ = 0; TIterator Head_; TIterator Tail_; - }; template <class T, size_t ChunkSize> @@ -108,9 +112,9 @@ class TPersistentQueue : public TPersistentQueueBase<T, ChunkSize> { public: - void Enqueue(T value); - T Dequeue(); - void Clear(); + virtual void Enqueue(T value); + virtual T Dequeue(); + virtual void Clear(); using TSnapshot = TPersistentQueueSnapshot<T, ChunkSize>; TSnapshot MakeSnapshot() const; @@ -121,7 +125,34 @@ public: private: using TChunk = TPersistentQueueChunk<T, ChunkSize>; using TChunkPtr = TIntrusivePtr<TChunk>; +}; + +// Almost zero-cost until used extension over TPersistentQueue. +// Provides random-access to data. +// TPersistentQueue is implemented via linked list. +// Random access is implemented via random-access index (std::vector). +// To avoid rebuilding index on every Enqueue/Deque index is built only on Freeze. +// Random access until Freeze is forbidden. +template <class T, size_t ChunkSize> +class TIndexedPersistentQueue + : public TPersistentQueue<T, ChunkSize> +{ + using TBase = TPersistentQueue<T, ChunkSize>; + +public: + void Enqueue(T value) override; + T Dequeue() override; + void Clear() override; + + void Freeze(); + const T& operator[](int index) const; + +private: + using TChunk = TPersistentQueueChunk<T, ChunkSize>; + bool Frozen_ = false; + int Shift_ = 0; + std::vector<TChunk*> Chunks_; }; //////////////////////////////////////////////////////////////////////////////// |