diff options
author | Grigory Reznikov <gritukan@gmail.com> | 2023-12-05 07:40:45 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-05 09:48:20 +0300 |
commit | 7bf3df81755926d2fce972d4ac113e4c0886d008 (patch) | |
tree | e73e8b7e921a8193830d8b1436c2bc73eadbe890 | |
parent | e92c23fea0e3ebe3613fd2e0708e09abd35aa5fb (diff) | |
download | ydb-7bf3df81755926d2fce972d4ac113e4c0886d008.tar.gz |
YT-18655 Support prerequisite transactions in dynamic tables write
We are building a service that uses dynamic tables as a metadata storage. Cypress is used for a leader election and prerequisite transactions are used to prevent races, however prerequisite transactions are not supported for dynamic table operations. This PR fixes that.
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
---
Pull Request resolved: https://github.com/ytsaurus/ytsaurus/pull/162
-rw-r--r-- | library/cpp/yt/small_containers/compact_queue-inl.h | 71 | ||||
-rw-r--r-- | library/cpp/yt/small_containers/compact_queue.h | 37 | ||||
-rw-r--r-- | library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp | 114 | ||||
-rw-r--r-- | library/cpp/yt/small_containers/unittests/ya.make | 1 | ||||
-rw-r--r-- | yt/yt/client/api/internal_client.h | 46 | ||||
-rw-r--r-- | yt/yt/client/driver/driver.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/driver/internal_commands.cpp | 81 | ||||
-rw-r--r-- | yt/yt/client/driver/internal_commands.h | 72 | ||||
-rw-r--r-- | yt/yt/client/transaction_client/public.h | 1 |
9 files changed, 427 insertions, 0 deletions
diff --git a/library/cpp/yt/small_containers/compact_queue-inl.h b/library/cpp/yt/small_containers/compact_queue-inl.h new file mode 100644 index 0000000000..6e085ab260 --- /dev/null +++ b/library/cpp/yt/small_containers/compact_queue-inl.h @@ -0,0 +1,71 @@ +#include "compact_queue.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T, size_t N> +void TCompactQueue<T, N>::Push(T value) +{ + if (Size_ == Queue_.size()) { + auto oldSize = Queue_.size(); + Queue_.resize(2 * oldSize); + + if (FrontIndex_ + Size_ > oldSize) { + std::move( + Queue_.begin(), + Queue_.begin() + FrontIndex_, + Queue_.begin() + Size_); + } + } + + auto index = FrontIndex_ + Size_; + if (index >= Queue_.size()) { + index -= Queue_.size(); + } + Queue_[index] = std::move(value); + ++Size_; +} + +template <class T, size_t N> +T TCompactQueue<T, N>::Pop() +{ + YT_VERIFY(!Empty()); + + auto value = std::move(Queue_[FrontIndex_]); + ++FrontIndex_; + if (FrontIndex_ >= Queue_.size()) { + FrontIndex_ -= Queue_.size(); + } + --Size_; + + return value; +} + +template <class T, size_t N> +const T& TCompactQueue<T, N>::Front() const +{ + return Queue_[FrontIndex_]; +} + +template <class T, size_t N> +size_t TCompactQueue<T, N>::Size() const +{ + return Size_; +} + +template <class T, size_t N> +size_t TCompactQueue<T, N>::Capacity() const +{ + return Queue_.capacity(); +} + +template <class T, size_t N> +bool TCompactQueue<T, N>::Empty() const +{ + return Size_ == 0; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/small_containers/compact_queue.h b/library/cpp/yt/small_containers/compact_queue.h new file mode 100644 index 0000000000..1852d29706 --- /dev/null +++ b/library/cpp/yt/small_containers/compact_queue.h @@ -0,0 +1,37 @@ +#pragma once + +#include "compact_vector.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +//! A queue optimized for storing elements inline +//! and with little memory overhead. See TCompactVector. +template <class T, size_t N> +class TCompactQueue +{ +public: + void Push(T value); + T Pop(); + + const T& Front() const; + + size_t Size() const; + size_t Capacity() const; + + bool Empty() const; + +private: + TCompactVector<T, N> Queue_ = TCompactVector<T, N>(N); + size_t FrontIndex_ = 0; + size_t Size_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define COMPACT_QUEUE_INL_H_ +#include "compact_queue-inl.h" +#undef COMPACT_QUEUE_INL_H_ diff --git a/library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp b/library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp new file mode 100644 index 0000000000..acea1c5c71 --- /dev/null +++ b/library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp @@ -0,0 +1,114 @@ +#include <library/cpp/yt/small_containers/compact_queue.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <queue> +#include <random> + +namespace NYT { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TCompactQueueTest, Simple) +{ + TCompactQueue<int, 4> queue; + queue.Push(1); + queue.Push(2); + queue.Push(3); + + for (int i = 1; i <= 10; i++) { + EXPECT_EQ(queue.Size(), 3u); + EXPECT_EQ(queue.Front(), i); + EXPECT_EQ(queue.Pop(), i); + queue.Push(i + 3); + } + + for (int i = 11; i <= 13; i++) { + EXPECT_EQ(queue.Front(), i); + queue.Pop(); + } + + EXPECT_TRUE(queue.Empty()); +} + +TEST(TCompactQueueTest, Reallocation1) +{ + TCompactQueue<int, 2> queue; + queue.Push(1); + queue.Push(2); + queue.Push(3); + + for (int i = 1; i <= 10; i++) { + EXPECT_EQ(queue.Size(), 3u); + EXPECT_EQ(queue.Front(), i); + EXPECT_EQ(queue.Pop(), i); + queue.Push(i + 3); + } + + for (int i = 11; i <= 13; i++) { + EXPECT_EQ(queue.Front(), i); + queue.Pop(); + } + + EXPECT_TRUE(queue.Empty()); +} + +TEST(TCompactQueueTest, Reallocation2) +{ + TCompactQueue<int, 3> queue; + queue.Push(1); + queue.Push(2); + queue.Push(3); + EXPECT_EQ(queue.Pop(), 1); + queue.Push(4); + queue.Push(5); + + EXPECT_EQ(queue.Size(), 4u); + + for (int i = 2; i <= 10; i++) { + EXPECT_EQ(queue.Size(), 4u); + EXPECT_EQ(queue.Front(), i); + EXPECT_EQ(queue.Pop(), i); + queue.Push(i + 4); + } + + for (int i = 11; i <= 14; i++) { + EXPECT_EQ(queue.Front(), i); + queue.Pop(); + } + + EXPECT_TRUE(queue.Empty()); +} + +TEST(TCompactQueueTest, Stress) +{ + std::mt19937_64 rng(42); + + for (int iteration = 0; iteration < 1000; ++iteration) { + TCompactQueue<int, 4> queue1; + std::queue<int> queue2; + + for (int step = 0; step < 10'000; ++step) { + EXPECT_EQ(queue1.Size(), queue2.size()); + EXPECT_EQ(queue1.Empty(), queue2.empty()); + if (!queue1.Empty()) { + EXPECT_EQ(queue1.Front(), queue2.front()); + } + + if (queue2.empty() || rng() % 2 == 0) { + int value = rng() % 1'000'000; + queue1.Push(value); + queue2.push(value); + } else { + queue1.Pop(); + queue2.pop(); + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT diff --git a/library/cpp/yt/small_containers/unittests/ya.make b/library/cpp/yt/small_containers/unittests/ya.make index bf3deade90..b09ef44f8d 100644 --- a/library/cpp/yt/small_containers/unittests/ya.make +++ b/library/cpp/yt/small_containers/unittests/ya.make @@ -7,6 +7,7 @@ SRCS( compact_heap_ut.cpp compact_set_ut.cpp compact_vector_ut.cpp + compact_queue_ut.cpp ) PEERDIR( diff --git a/yt/yt/client/api/internal_client.h b/yt/yt/client/api/internal_client.h index 0c569021b9..01fa29340b 100644 --- a/yt/yt/client/api/internal_client.h +++ b/yt/yt/client/api/internal_client.h @@ -69,6 +69,30 @@ struct TUnlockHunkStoreOptions //////////////////////////////////////////////////////////////////////////////// +struct TIssueLeaseOptions + : public TTimeoutOptions +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +struct TRevokeLeaseOptions + : public TTimeoutOptions +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReferenceLeaseOptions + : public TTimeoutOptions +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +struct TUnreferenceLeaseOptions + : public TTimeoutOptions +{ }; + +//////////////////////////////////////////////////////////////////////////////// + struct TGetOrderedTabletSafeTrimRowCountOptions : public TTimeoutOptions { }; @@ -127,6 +151,28 @@ struct IInternalClient virtual TFuture<std::vector<TErrorOr<i64>>> GetOrderedTabletSafeTrimRowCount( const std::vector<TGetOrderedTabletSafeTrimRowCountRequest>& requests, const TGetOrderedTabletSafeTrimRowCountOptions& options = {}) = 0; + + virtual TFuture<void> IssueLease( + NHydra::TCellId cellId, + NObjectClient::TObjectId leaseId, + const TIssueLeaseOptions& options = {}) = 0; + virtual TFuture<void> RevokeLease( + NHydra::TCellId cellId, + NObjectClient::TObjectId leaseId, + bool force, + const TRevokeLeaseOptions& options = {}) = 0; + + virtual TFuture<void> ReferenceLease( + NHydra::TCellId cellId, + NObjectClient::TObjectId leaseId, + bool persistent, + bool force, + const TReferenceLeaseOptions& options = {}) = 0; + virtual TFuture<void> UnreferenceLease( + NHydra::TCellId cellId, + NObjectClient::TObjectId leaseId, + bool persistent, + const TUnreferenceLeaseOptions& options = {}) = 0; }; DEFINE_REFCOUNTED_TYPE(IInternalClient) diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index f3e6288516..2c0f03fbcd 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -366,6 +366,10 @@ public: REGISTER_ALL(TLockHunkStoreCommand, "lock_hunk_store", Null, Structured, false, true ); REGISTER_ALL(TUnlockHunkStoreCommand, "unlock_hunk_store", Null, Structured, false, true ); REGISTER_ALL(TGetConnectionConfigCommand, "get_connection_config", Null, Structured, false, false); + REGISTER_ALL(TIssueLeaseCommand, "issue_lease", Null, Structured, false, false); + REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, false, false); + REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, false, false); + REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, false, false); } #undef REGISTER diff --git a/yt/yt/client/driver/internal_commands.cpp b/yt/yt/client/driver/internal_commands.cpp index 89d506eef3..889737f91a 100644 --- a/yt/yt/client/driver/internal_commands.cpp +++ b/yt/yt/client/driver/internal_commands.cpp @@ -141,4 +141,85 @@ void TGetConnectionConfigCommand::DoExecute(ICommandContextPtr context) //////////////////////////////////////////////////////////////////////////////// +void TIssueLeaseCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("cell_id", &TThis::CellId); + registrar.Parameter("lease_id", &TThis::LeaseId); +} + +void TIssueLeaseCommand::DoExecute(ICommandContextPtr context) +{ + auto internalClient = context->GetInternalClientOrThrow(); + + WaitFor(internalClient->IssueLease(CellId, LeaseId)) + .ThrowOnError(); + + ProduceEmptyOutput(context); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TRevokeLeaseCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("cell_id", &TThis::CellId); + registrar.Parameter("lease_id", &TThis::LeaseId); + registrar.Parameter("force", &TThis::Force) + .Default(false); +} + +void TRevokeLeaseCommand::DoExecute(ICommandContextPtr context) +{ + auto internalClient = context->GetInternalClientOrThrow(); + + WaitFor(internalClient->RevokeLease(CellId, LeaseId, Force)) + .ThrowOnError(); + + ProduceEmptyOutput(context); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TReferenceLeaseCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("cell_id", &TThis::CellId); + registrar.Parameter("lease_id", &TThis::LeaseId); + registrar.Parameter("persistent", &TThis::Persistent); + registrar.Parameter("force", &TThis::Force); +} + +void TReferenceLeaseCommand::DoExecute(ICommandContextPtr context) +{ + auto internalClient = context->GetInternalClientOrThrow(); + + WaitFor(internalClient->ReferenceLease( + CellId, + LeaseId, + Persistent, + Force)) + .ThrowOnError(); + + ProduceEmptyOutput(context); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TUnreferenceLeaseCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("cell_id", &TThis::CellId); + registrar.Parameter("lease_id", &TThis::LeaseId); + registrar.Parameter("persistent", &TThis::Persistent); +} + +void TUnreferenceLeaseCommand::DoExecute(ICommandContextPtr context) +{ + auto internalClient = context->GetInternalClientOrThrow(); + + WaitFor(internalClient->UnreferenceLease(CellId, LeaseId, Persistent)) + .ThrowOnError(); + + ProduceEmptyOutput(context); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NDriver diff --git a/yt/yt/client/driver/internal_commands.h b/yt/yt/client/driver/internal_commands.h index 671e507364..4b1a567074 100644 --- a/yt/yt/client/driver/internal_commands.h +++ b/yt/yt/client/driver/internal_commands.h @@ -103,4 +103,76 @@ private: //////////////////////////////////////////////////////////////////////////////// +class TIssueLeaseCommand + : public TTypedCommand<NApi::TIssueLeaseOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TIssueLeaseCommand); + + static void Register(TRegistrar registrar); + +private: + NHydra::TCellId CellId; + NObjectClient::TObjectId LeaseId; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TRevokeLeaseCommand + : public TTypedCommand<NApi::TRevokeLeaseOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TRevokeLeaseCommand); + + static void Register(TRegistrar registrar); + +private: + NHydra::TCellId CellId; + NObjectClient::TObjectId LeaseId; + bool Force; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TReferenceLeaseCommand + : public TTypedCommand<NApi::TReferenceLeaseOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TReferenceLeaseCommand); + + static void Register(TRegistrar registrar); + +private: + NHydra::TCellId CellId; + NObjectClient::TObjectId LeaseId; + bool Persistent; + bool Force; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TUnreferenceLeaseCommand + : public TTypedCommand<NApi::TUnreferenceLeaseOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TUnreferenceLeaseCommand); + + static void Register(TRegistrar registrar); + +private: + NHydra::TCellId CellId; + NObjectClient::TObjectId LeaseId; + bool Persistent; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NDriver diff --git a/yt/yt/client/transaction_client/public.h b/yt/yt/client/transaction_client/public.h index ac38af4dd6..6920bbd19c 100644 --- a/yt/yt/client/transaction_client/public.h +++ b/yt/yt/client/transaction_client/public.h @@ -45,6 +45,7 @@ YT_DEFINE_ERROR_ENUM( ((ForeignParentTransaction) (11010)) ((ForeignPrerequisiteTransaction) (11011)) ((IncompletePrepareSignature) (11012)) + ((TransactionSuccessorHasLeases) (11013)) ); //////////////////////////////////////////////////////////////////////////////// |