diff options
author | Grigory Reznikov <gritukan@gmail.com> | 2023-12-05 07:40:45 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-05 09:48:20 +0300 |
commit | 7bf3df81755926d2fce972d4ac113e4c0886d008 (patch) | |
tree | e73e8b7e921a8193830d8b1436c2bc73eadbe890 /library/cpp | |
parent | e92c23fea0e3ebe3613fd2e0708e09abd35aa5fb (diff) | |
download | ydb-7bf3df81755926d2fce972d4ac113e4c0886d008.tar.gz |
YT-18655 Support prerequisite transactions in dynamic tables write
We are building a service that uses dynamic tables as a metadata storage. Cypress is used for a leader election and prerequisite transactions are used to prevent races, however prerequisite transactions are not supported for dynamic table operations. This PR fixes that.
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
---
Pull Request resolved: https://github.com/ytsaurus/ytsaurus/pull/162
Diffstat (limited to 'library/cpp')
4 files changed, 223 insertions, 0 deletions
diff --git a/library/cpp/yt/small_containers/compact_queue-inl.h b/library/cpp/yt/small_containers/compact_queue-inl.h new file mode 100644 index 0000000000..6e085ab260 --- /dev/null +++ b/library/cpp/yt/small_containers/compact_queue-inl.h @@ -0,0 +1,71 @@ +#include "compact_queue.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T, size_t N> +void TCompactQueue<T, N>::Push(T value) +{ + if (Size_ == Queue_.size()) { + auto oldSize = Queue_.size(); + Queue_.resize(2 * oldSize); + + if (FrontIndex_ + Size_ > oldSize) { + std::move( + Queue_.begin(), + Queue_.begin() + FrontIndex_, + Queue_.begin() + Size_); + } + } + + auto index = FrontIndex_ + Size_; + if (index >= Queue_.size()) { + index -= Queue_.size(); + } + Queue_[index] = std::move(value); + ++Size_; +} + +template <class T, size_t N> +T TCompactQueue<T, N>::Pop() +{ + YT_VERIFY(!Empty()); + + auto value = std::move(Queue_[FrontIndex_]); + ++FrontIndex_; + if (FrontIndex_ >= Queue_.size()) { + FrontIndex_ -= Queue_.size(); + } + --Size_; + + return value; +} + +template <class T, size_t N> +const T& TCompactQueue<T, N>::Front() const +{ + return Queue_[FrontIndex_]; +} + +template <class T, size_t N> +size_t TCompactQueue<T, N>::Size() const +{ + return Size_; +} + +template <class T, size_t N> +size_t TCompactQueue<T, N>::Capacity() const +{ + return Queue_.capacity(); +} + +template <class T, size_t N> +bool TCompactQueue<T, N>::Empty() const +{ + return Size_ == 0; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/small_containers/compact_queue.h b/library/cpp/yt/small_containers/compact_queue.h new file mode 100644 index 0000000000..1852d29706 --- /dev/null +++ b/library/cpp/yt/small_containers/compact_queue.h @@ -0,0 +1,37 @@ +#pragma once + +#include "compact_vector.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +//! A queue optimized for storing elements inline +//! and with little memory overhead. See TCompactVector. +template <class T, size_t N> +class TCompactQueue +{ +public: + void Push(T value); + T Pop(); + + const T& Front() const; + + size_t Size() const; + size_t Capacity() const; + + bool Empty() const; + +private: + TCompactVector<T, N> Queue_ = TCompactVector<T, N>(N); + size_t FrontIndex_ = 0; + size_t Size_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define COMPACT_QUEUE_INL_H_ +#include "compact_queue-inl.h" +#undef COMPACT_QUEUE_INL_H_ diff --git a/library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp b/library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp new file mode 100644 index 0000000000..acea1c5c71 --- /dev/null +++ b/library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp @@ -0,0 +1,114 @@ +#include <library/cpp/yt/small_containers/compact_queue.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <queue> +#include <random> + +namespace NYT { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TCompactQueueTest, Simple) +{ + TCompactQueue<int, 4> queue; + queue.Push(1); + queue.Push(2); + queue.Push(3); + + for (int i = 1; i <= 10; i++) { + EXPECT_EQ(queue.Size(), 3u); + EXPECT_EQ(queue.Front(), i); + EXPECT_EQ(queue.Pop(), i); + queue.Push(i + 3); + } + + for (int i = 11; i <= 13; i++) { + EXPECT_EQ(queue.Front(), i); + queue.Pop(); + } + + EXPECT_TRUE(queue.Empty()); +} + +TEST(TCompactQueueTest, Reallocation1) +{ + TCompactQueue<int, 2> queue; + queue.Push(1); + queue.Push(2); + queue.Push(3); + + for (int i = 1; i <= 10; i++) { + EXPECT_EQ(queue.Size(), 3u); + EXPECT_EQ(queue.Front(), i); + EXPECT_EQ(queue.Pop(), i); + queue.Push(i + 3); + } + + for (int i = 11; i <= 13; i++) { + EXPECT_EQ(queue.Front(), i); + queue.Pop(); + } + + EXPECT_TRUE(queue.Empty()); +} + +TEST(TCompactQueueTest, Reallocation2) +{ + TCompactQueue<int, 3> queue; + queue.Push(1); + queue.Push(2); + queue.Push(3); + EXPECT_EQ(queue.Pop(), 1); + queue.Push(4); + queue.Push(5); + + EXPECT_EQ(queue.Size(), 4u); + + for (int i = 2; i <= 10; i++) { + EXPECT_EQ(queue.Size(), 4u); + EXPECT_EQ(queue.Front(), i); + EXPECT_EQ(queue.Pop(), i); + queue.Push(i + 4); + } + + for (int i = 11; i <= 14; i++) { + EXPECT_EQ(queue.Front(), i); + queue.Pop(); + } + + EXPECT_TRUE(queue.Empty()); +} + +TEST(TCompactQueueTest, Stress) +{ + std::mt19937_64 rng(42); + + for (int iteration = 0; iteration < 1000; ++iteration) { + TCompactQueue<int, 4> queue1; + std::queue<int> queue2; + + for (int step = 0; step < 10'000; ++step) { + EXPECT_EQ(queue1.Size(), queue2.size()); + EXPECT_EQ(queue1.Empty(), queue2.empty()); + if (!queue1.Empty()) { + EXPECT_EQ(queue1.Front(), queue2.front()); + } + + if (queue2.empty() || rng() % 2 == 0) { + int value = rng() % 1'000'000; + queue1.Push(value); + queue2.push(value); + } else { + queue1.Pop(); + queue2.pop(); + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT diff --git a/library/cpp/yt/small_containers/unittests/ya.make b/library/cpp/yt/small_containers/unittests/ya.make index bf3deade90..b09ef44f8d 100644 --- a/library/cpp/yt/small_containers/unittests/ya.make +++ b/library/cpp/yt/small_containers/unittests/ya.make @@ -7,6 +7,7 @@ SRCS( compact_heap_ut.cpp compact_set_ut.cpp compact_vector_ut.cpp + compact_queue_ut.cpp ) PEERDIR( |