aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorponasenko-rs <ponasenko-rs@yandex-team.com>2025-02-11 01:45:57 +0300
committerponasenko-rs <ponasenko-rs@yandex-team.com>2025-02-11 02:00:13 +0300
commit88d7603a66bde5ae72e9b2ebe8c085f773859bf2 (patch)
tree2fddfeda3e32968da538d517ca4294dd56e0d123
parent17cc83379606b885d021e15a7cf5941210eba0b5 (diff)
downloadydb-88d7603a66bde5ae72e9b2ebe8c085f773859bf2.tar.gz
YT-22975: Per-row sequencer
Create per-tablet notion of a serialization type. This option can be changed only on unmounted dynamic table. New per-row serialization type could be enabled only on sorted dynamic tables and could be useful only for shared write locks. If per-row serialization type is enabled rows are serialized using per-row set of prepared and heap of serializing transactions on sorted store manager level. Old serialization only allows transaction to be serialized whole using per-cell set of prepared and heap of serializing transactions which means that non-conflicting prepared transactions are interfering during serialization. Most of changes are disabled as long as serialization type kept default (coarse). But wire commands are now kept in memory in a parsed form and save to snapshot has another bound using timestamps. * Changelog entry Type: feature Component: dynamic-tables Add option to decrease serialization time by serializing transaction within each lock group in each row separately. commit_hash:2c6bada00254a893c32b50e5b20d4d4bbbdd962b
-rw-r--r--yt/yt/client/table_client/public.h5
-rw-r--r--yt/yt/client/tablet_client/table_mount_cache.h2
-rw-r--r--yt/yt/core/misc/persistent_queue-inl.h55
-rw-r--r--yt/yt/core/misc/persistent_queue.h41
4 files changed, 98 insertions, 5 deletions
diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h
index d15824f283..1c15f1a8b2 100644
--- a/yt/yt/client/table_client/public.h
+++ b/yt/yt/client/table_client/public.h
@@ -165,6 +165,11 @@ DEFINE_ENUM_WITH_UNDERLYING_TYPE(EOptimizeFor, int,
((Scan) (1))
);
+DEFINE_ENUM_WITH_UNDERLYING_TYPE(ETabletTransactionSerializationType, i8,
+ ((Coarse) (0))
+ ((PerRow) (1))
+);
+
YT_DEFINE_ERROR_ENUM(
((SortOrderViolation) (301))
((InvalidDoubleValue) (302))
diff --git a/yt/yt/client/tablet_client/table_mount_cache.h b/yt/yt/client/tablet_client/table_mount_cache.h
index d8c1d64948..44869b2728 100644
--- a/yt/yt/client/tablet_client/table_mount_cache.h
+++ b/yt/yt/client/tablet_client/table_mount_cache.h
@@ -133,6 +133,8 @@ struct TTableMountInfo final
bool EnableDetailedProfiling = false;
+ NTableClient::ETabletTransactionSerializationType SerializationType = NTableClient::ETabletTransactionSerializationType::Coarse;
+
bool IsSorted() const;
bool IsOrdered() const;
bool IsReplicated() const;
diff --git a/yt/yt/core/misc/persistent_queue-inl.h b/yt/yt/core/misc/persistent_queue-inl.h
index d94ee4948c..829d13d172 100644
--- a/yt/yt/core/misc/persistent_queue-inl.h
+++ b/yt/yt/core/misc/persistent_queue-inl.h
@@ -44,6 +44,12 @@ const T& TPersistentQueueIterator<T, ChunkSize>::operator*() const
}
template <class T, size_t ChunkSize>
+const T* TPersistentQueueIterator<T, ChunkSize>::operator->() const
+{
+ return &CurrentChunk_->Elements[CurrentIndex_];
+}
+
+template <class T, size_t ChunkSize>
TPersistentQueueIterator<T, ChunkSize>::TPersistentQueueIterator(
TChunkPtr chunk,
size_t index)
@@ -177,6 +183,55 @@ void TPersistentQueue<T, ChunkSize>::Load(C& context)
}
}
+template <class T, size_t ChunkSize>
+void TIndexedPersistentQueue<T, ChunkSize>::Enqueue(T value)
+{
+ YT_VERIFY(!Frozen_);
+ return TBase::Enqueue(std::move(value));
+}
+
+template <class T, size_t ChunkSize>
+T TIndexedPersistentQueue<T, ChunkSize>::Dequeue()
+{
+ YT_VERIFY(!Frozen_);
+ return TBase::Dequeue();
+}
+
+template <class T, size_t ChunkSize>
+void TIndexedPersistentQueue<T, ChunkSize>::Clear()
+{
+ if (Frozen_) {
+ Shift_ = 0;
+ Chunks_.clear();
+
+ Frozen_ = false;
+ }
+
+ return TBase::Clear();
+}
+
+template <class T, size_t ChunkSize>
+void TIndexedPersistentQueue<T, ChunkSize>::Freeze()
+{
+ auto& tail = TBase::Tail_;
+ Shift_ = tail.CurrentIndex_;
+ for (auto chunk = tail.CurrentChunk_; chunk != nullptr; chunk = chunk->Next) {
+ Chunks_.push_back(&*chunk);
+ }
+
+ Frozen_ = true;
+}
+
+template <class T, size_t ChunkSize>
+const T& TIndexedPersistentQueue<T, ChunkSize>::operator[](int index) const
+{
+ YT_VERIFY(Frozen_);
+ auto shiftedIndex = index + Shift_;
+ auto chunkIndex = shiftedIndex / ChunkSize;
+ auto indexInChunk = shiftedIndex % ChunkSize;
+ return Chunks_[chunkIndex]->Elements[indexInChunk];
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/yt/core/misc/persistent_queue.h b/yt/yt/core/misc/persistent_queue.h
index fb78fc37b5..d5d4458ae4 100644
--- a/yt/yt/core/misc/persistent_queue.h
+++ b/yt/yt/core/misc/persistent_queue.h
@@ -19,6 +19,9 @@ class TPersistentQueueIterator;
template <class T, size_t ChunkSize>
class TPersistentQueue;
+template <class T, size_t ChunkSize>
+class TIndexedPersistentQueue;
+
////////////////////////////////////////////////////////////////////////////////
// Implementation.
@@ -39,7 +42,8 @@ public:
TPersistentQueueIterator& operator++(); // prefix
TPersistentQueueIterator operator++(int); // postfix
- const T& operator * () const;
+ const T& operator*() const;
+ const T* operator->() const;
bool operator==(const TPersistentQueueIterator& other) const = default;
@@ -49,6 +53,7 @@ private:
friend class TPersistentQueueBase<T, ChunkSize>;
friend class TPersistentQueue<T, ChunkSize>;
+ friend class TIndexedPersistentQueue<T, ChunkSize>;
TPersistentQueueIterator(TChunkPtr chunk, size_t index);
@@ -81,7 +86,6 @@ protected:
size_t Size_ = 0;
TIterator Head_;
TIterator Tail_;
-
};
template <class T, size_t ChunkSize>
@@ -108,9 +112,9 @@ class TPersistentQueue
: public TPersistentQueueBase<T, ChunkSize>
{
public:
- void Enqueue(T value);
- T Dequeue();
- void Clear();
+ virtual void Enqueue(T value);
+ virtual T Dequeue();
+ virtual void Clear();
using TSnapshot = TPersistentQueueSnapshot<T, ChunkSize>;
TSnapshot MakeSnapshot() const;
@@ -121,7 +125,34 @@ public:
private:
using TChunk = TPersistentQueueChunk<T, ChunkSize>;
using TChunkPtr = TIntrusivePtr<TChunk>;
+};
+
+// Almost zero-cost until used extension over TPersistentQueue.
+// Provides random-access to data.
+// TPersistentQueue is implemented via linked list.
+// Random access is implemented via random-access index (std::vector).
+// To avoid rebuilding index on every Enqueue/Deque index is built only on Freeze.
+// Random access until Freeze is forbidden.
+template <class T, size_t ChunkSize>
+class TIndexedPersistentQueue
+ : public TPersistentQueue<T, ChunkSize>
+{
+ using TBase = TPersistentQueue<T, ChunkSize>;
+
+public:
+ void Enqueue(T value) override;
+ T Dequeue() override;
+ void Clear() override;
+
+ void Freeze();
+ const T& operator[](int index) const;
+
+private:
+ using TChunk = TPersistentQueueChunk<T, ChunkSize>;
+ bool Frozen_ = false;
+ int Shift_ = 0;
+ std::vector<TChunk*> Chunks_;
};
////////////////////////////////////////////////////////////////////////////////