aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGrigory Reznikov <gritukan@gmail.com>2023-12-05 07:40:45 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-12-05 09:48:20 +0300
commit7bf3df81755926d2fce972d4ac113e4c0886d008 (patch)
treee73e8b7e921a8193830d8b1436c2bc73eadbe890
parente92c23fea0e3ebe3613fd2e0708e09abd35aa5fb (diff)
downloadydb-7bf3df81755926d2fce972d4ac113e4c0886d008.tar.gz
YT-18655 Support prerequisite transactions in dynamic tables write
We are building a service that uses dynamic tables as a metadata storage. Cypress is used for a leader election and prerequisite transactions are used to prevent races, however prerequisite transactions are not supported for dynamic table operations. This PR fixes that. I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en --- Pull Request resolved: https://github.com/ytsaurus/ytsaurus/pull/162
-rw-r--r--library/cpp/yt/small_containers/compact_queue-inl.h71
-rw-r--r--library/cpp/yt/small_containers/compact_queue.h37
-rw-r--r--library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp114
-rw-r--r--library/cpp/yt/small_containers/unittests/ya.make1
-rw-r--r--yt/yt/client/api/internal_client.h46
-rw-r--r--yt/yt/client/driver/driver.cpp4
-rw-r--r--yt/yt/client/driver/internal_commands.cpp81
-rw-r--r--yt/yt/client/driver/internal_commands.h72
-rw-r--r--yt/yt/client/transaction_client/public.h1
9 files changed, 427 insertions, 0 deletions
diff --git a/library/cpp/yt/small_containers/compact_queue-inl.h b/library/cpp/yt/small_containers/compact_queue-inl.h
new file mode 100644
index 0000000000..6e085ab260
--- /dev/null
+++ b/library/cpp/yt/small_containers/compact_queue-inl.h
@@ -0,0 +1,71 @@
+#include "compact_queue.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T, size_t N>
+void TCompactQueue<T, N>::Push(T value)
+{
+ if (Size_ == Queue_.size()) {
+ auto oldSize = Queue_.size();
+ Queue_.resize(2 * oldSize);
+
+ if (FrontIndex_ + Size_ > oldSize) {
+ std::move(
+ Queue_.begin(),
+ Queue_.begin() + FrontIndex_,
+ Queue_.begin() + Size_);
+ }
+ }
+
+ auto index = FrontIndex_ + Size_;
+ if (index >= Queue_.size()) {
+ index -= Queue_.size();
+ }
+ Queue_[index] = std::move(value);
+ ++Size_;
+}
+
+template <class T, size_t N>
+T TCompactQueue<T, N>::Pop()
+{
+ YT_VERIFY(!Empty());
+
+ auto value = std::move(Queue_[FrontIndex_]);
+ ++FrontIndex_;
+ if (FrontIndex_ >= Queue_.size()) {
+ FrontIndex_ -= Queue_.size();
+ }
+ --Size_;
+
+ return value;
+}
+
+template <class T, size_t N>
+const T& TCompactQueue<T, N>::Front() const
+{
+ return Queue_[FrontIndex_];
+}
+
+template <class T, size_t N>
+size_t TCompactQueue<T, N>::Size() const
+{
+ return Size_;
+}
+
+template <class T, size_t N>
+size_t TCompactQueue<T, N>::Capacity() const
+{
+ return Queue_.capacity();
+}
+
+template <class T, size_t N>
+bool TCompactQueue<T, N>::Empty() const
+{
+ return Size_ == 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/small_containers/compact_queue.h b/library/cpp/yt/small_containers/compact_queue.h
new file mode 100644
index 0000000000..1852d29706
--- /dev/null
+++ b/library/cpp/yt/small_containers/compact_queue.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include "compact_vector.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! A queue optimized for storing elements inline
+//! and with little memory overhead. See TCompactVector.
+template <class T, size_t N>
+class TCompactQueue
+{
+public:
+ void Push(T value);
+ T Pop();
+
+ const T& Front() const;
+
+ size_t Size() const;
+ size_t Capacity() const;
+
+ bool Empty() const;
+
+private:
+ TCompactVector<T, N> Queue_ = TCompactVector<T, N>(N);
+ size_t FrontIndex_ = 0;
+ size_t Size_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define COMPACT_QUEUE_INL_H_
+#include "compact_queue-inl.h"
+#undef COMPACT_QUEUE_INL_H_
diff --git a/library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp b/library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp
new file mode 100644
index 0000000000..acea1c5c71
--- /dev/null
+++ b/library/cpp/yt/small_containers/unittests/compact_queue_ut.cpp
@@ -0,0 +1,114 @@
+#include <library/cpp/yt/small_containers/compact_queue.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <queue>
+#include <random>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TCompactQueueTest, Simple)
+{
+ TCompactQueue<int, 4> queue;
+ queue.Push(1);
+ queue.Push(2);
+ queue.Push(3);
+
+ for (int i = 1; i <= 10; i++) {
+ EXPECT_EQ(queue.Size(), 3u);
+ EXPECT_EQ(queue.Front(), i);
+ EXPECT_EQ(queue.Pop(), i);
+ queue.Push(i + 3);
+ }
+
+ for (int i = 11; i <= 13; i++) {
+ EXPECT_EQ(queue.Front(), i);
+ queue.Pop();
+ }
+
+ EXPECT_TRUE(queue.Empty());
+}
+
+TEST(TCompactQueueTest, Reallocation1)
+{
+ TCompactQueue<int, 2> queue;
+ queue.Push(1);
+ queue.Push(2);
+ queue.Push(3);
+
+ for (int i = 1; i <= 10; i++) {
+ EXPECT_EQ(queue.Size(), 3u);
+ EXPECT_EQ(queue.Front(), i);
+ EXPECT_EQ(queue.Pop(), i);
+ queue.Push(i + 3);
+ }
+
+ for (int i = 11; i <= 13; i++) {
+ EXPECT_EQ(queue.Front(), i);
+ queue.Pop();
+ }
+
+ EXPECT_TRUE(queue.Empty());
+}
+
+TEST(TCompactQueueTest, Reallocation2)
+{
+ TCompactQueue<int, 3> queue;
+ queue.Push(1);
+ queue.Push(2);
+ queue.Push(3);
+ EXPECT_EQ(queue.Pop(), 1);
+ queue.Push(4);
+ queue.Push(5);
+
+ EXPECT_EQ(queue.Size(), 4u);
+
+ for (int i = 2; i <= 10; i++) {
+ EXPECT_EQ(queue.Size(), 4u);
+ EXPECT_EQ(queue.Front(), i);
+ EXPECT_EQ(queue.Pop(), i);
+ queue.Push(i + 4);
+ }
+
+ for (int i = 11; i <= 14; i++) {
+ EXPECT_EQ(queue.Front(), i);
+ queue.Pop();
+ }
+
+ EXPECT_TRUE(queue.Empty());
+}
+
+TEST(TCompactQueueTest, Stress)
+{
+ std::mt19937_64 rng(42);
+
+ for (int iteration = 0; iteration < 1000; ++iteration) {
+ TCompactQueue<int, 4> queue1;
+ std::queue<int> queue2;
+
+ for (int step = 0; step < 10'000; ++step) {
+ EXPECT_EQ(queue1.Size(), queue2.size());
+ EXPECT_EQ(queue1.Empty(), queue2.empty());
+ if (!queue1.Empty()) {
+ EXPECT_EQ(queue1.Front(), queue2.front());
+ }
+
+ if (queue2.empty() || rng() % 2 == 0) {
+ int value = rng() % 1'000'000;
+ queue1.Push(value);
+ queue2.push(value);
+ } else {
+ queue1.Pop();
+ queue2.pop();
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT
diff --git a/library/cpp/yt/small_containers/unittests/ya.make b/library/cpp/yt/small_containers/unittests/ya.make
index bf3deade90..b09ef44f8d 100644
--- a/library/cpp/yt/small_containers/unittests/ya.make
+++ b/library/cpp/yt/small_containers/unittests/ya.make
@@ -7,6 +7,7 @@ SRCS(
compact_heap_ut.cpp
compact_set_ut.cpp
compact_vector_ut.cpp
+ compact_queue_ut.cpp
)
PEERDIR(
diff --git a/yt/yt/client/api/internal_client.h b/yt/yt/client/api/internal_client.h
index 0c569021b9..01fa29340b 100644
--- a/yt/yt/client/api/internal_client.h
+++ b/yt/yt/client/api/internal_client.h
@@ -69,6 +69,30 @@ struct TUnlockHunkStoreOptions
////////////////////////////////////////////////////////////////////////////////
+struct TIssueLeaseOptions
+ : public TTimeoutOptions
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TRevokeLeaseOptions
+ : public TTimeoutOptions
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TReferenceLeaseOptions
+ : public TTimeoutOptions
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TUnreferenceLeaseOptions
+ : public TTimeoutOptions
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
struct TGetOrderedTabletSafeTrimRowCountOptions
: public TTimeoutOptions
{ };
@@ -127,6 +151,28 @@ struct IInternalClient
virtual TFuture<std::vector<TErrorOr<i64>>> GetOrderedTabletSafeTrimRowCount(
const std::vector<TGetOrderedTabletSafeTrimRowCountRequest>& requests,
const TGetOrderedTabletSafeTrimRowCountOptions& options = {}) = 0;
+
+ virtual TFuture<void> IssueLease(
+ NHydra::TCellId cellId,
+ NObjectClient::TObjectId leaseId,
+ const TIssueLeaseOptions& options = {}) = 0;
+ virtual TFuture<void> RevokeLease(
+ NHydra::TCellId cellId,
+ NObjectClient::TObjectId leaseId,
+ bool force,
+ const TRevokeLeaseOptions& options = {}) = 0;
+
+ virtual TFuture<void> ReferenceLease(
+ NHydra::TCellId cellId,
+ NObjectClient::TObjectId leaseId,
+ bool persistent,
+ bool force,
+ const TReferenceLeaseOptions& options = {}) = 0;
+ virtual TFuture<void> UnreferenceLease(
+ NHydra::TCellId cellId,
+ NObjectClient::TObjectId leaseId,
+ bool persistent,
+ const TUnreferenceLeaseOptions& options = {}) = 0;
};
DEFINE_REFCOUNTED_TYPE(IInternalClient)
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index f3e6288516..2c0f03fbcd 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -366,6 +366,10 @@ public:
REGISTER_ALL(TLockHunkStoreCommand, "lock_hunk_store", Null, Structured, false, true );
REGISTER_ALL(TUnlockHunkStoreCommand, "unlock_hunk_store", Null, Structured, false, true );
REGISTER_ALL(TGetConnectionConfigCommand, "get_connection_config", Null, Structured, false, false);
+ REGISTER_ALL(TIssueLeaseCommand, "issue_lease", Null, Structured, false, false);
+ REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, false, false);
+ REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, false, false);
+ REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, false, false);
}
#undef REGISTER
diff --git a/yt/yt/client/driver/internal_commands.cpp b/yt/yt/client/driver/internal_commands.cpp
index 89d506eef3..889737f91a 100644
--- a/yt/yt/client/driver/internal_commands.cpp
+++ b/yt/yt/client/driver/internal_commands.cpp
@@ -141,4 +141,85 @@ void TGetConnectionConfigCommand::DoExecute(ICommandContextPtr context)
////////////////////////////////////////////////////////////////////////////////
+void TIssueLeaseCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("cell_id", &TThis::CellId);
+ registrar.Parameter("lease_id", &TThis::LeaseId);
+}
+
+void TIssueLeaseCommand::DoExecute(ICommandContextPtr context)
+{
+ auto internalClient = context->GetInternalClientOrThrow();
+
+ WaitFor(internalClient->IssueLease(CellId, LeaseId))
+ .ThrowOnError();
+
+ ProduceEmptyOutput(context);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TRevokeLeaseCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("cell_id", &TThis::CellId);
+ registrar.Parameter("lease_id", &TThis::LeaseId);
+ registrar.Parameter("force", &TThis::Force)
+ .Default(false);
+}
+
+void TRevokeLeaseCommand::DoExecute(ICommandContextPtr context)
+{
+ auto internalClient = context->GetInternalClientOrThrow();
+
+ WaitFor(internalClient->RevokeLease(CellId, LeaseId, Force))
+ .ThrowOnError();
+
+ ProduceEmptyOutput(context);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TReferenceLeaseCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("cell_id", &TThis::CellId);
+ registrar.Parameter("lease_id", &TThis::LeaseId);
+ registrar.Parameter("persistent", &TThis::Persistent);
+ registrar.Parameter("force", &TThis::Force);
+}
+
+void TReferenceLeaseCommand::DoExecute(ICommandContextPtr context)
+{
+ auto internalClient = context->GetInternalClientOrThrow();
+
+ WaitFor(internalClient->ReferenceLease(
+ CellId,
+ LeaseId,
+ Persistent,
+ Force))
+ .ThrowOnError();
+
+ ProduceEmptyOutput(context);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TUnreferenceLeaseCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("cell_id", &TThis::CellId);
+ registrar.Parameter("lease_id", &TThis::LeaseId);
+ registrar.Parameter("persistent", &TThis::Persistent);
+}
+
+void TUnreferenceLeaseCommand::DoExecute(ICommandContextPtr context)
+{
+ auto internalClient = context->GetInternalClientOrThrow();
+
+ WaitFor(internalClient->UnreferenceLease(CellId, LeaseId, Persistent))
+ .ThrowOnError();
+
+ ProduceEmptyOutput(context);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NDriver
diff --git a/yt/yt/client/driver/internal_commands.h b/yt/yt/client/driver/internal_commands.h
index 671e507364..4b1a567074 100644
--- a/yt/yt/client/driver/internal_commands.h
+++ b/yt/yt/client/driver/internal_commands.h
@@ -103,4 +103,76 @@ private:
////////////////////////////////////////////////////////////////////////////////
+class TIssueLeaseCommand
+ : public TTypedCommand<NApi::TIssueLeaseOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TIssueLeaseCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NHydra::TCellId CellId;
+ NObjectClient::TObjectId LeaseId;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRevokeLeaseCommand
+ : public TTypedCommand<NApi::TRevokeLeaseOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TRevokeLeaseCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NHydra::TCellId CellId;
+ NObjectClient::TObjectId LeaseId;
+ bool Force;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TReferenceLeaseCommand
+ : public TTypedCommand<NApi::TReferenceLeaseOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TReferenceLeaseCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NHydra::TCellId CellId;
+ NObjectClient::TObjectId LeaseId;
+ bool Persistent;
+ bool Force;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TUnreferenceLeaseCommand
+ : public TTypedCommand<NApi::TUnreferenceLeaseOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TUnreferenceLeaseCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NHydra::TCellId CellId;
+ NObjectClient::TObjectId LeaseId;
+ bool Persistent;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NDriver
diff --git a/yt/yt/client/transaction_client/public.h b/yt/yt/client/transaction_client/public.h
index ac38af4dd6..6920bbd19c 100644
--- a/yt/yt/client/transaction_client/public.h
+++ b/yt/yt/client/transaction_client/public.h
@@ -45,6 +45,7 @@ YT_DEFINE_ERROR_ENUM(
((ForeignParentTransaction) (11010))
((ForeignPrerequisiteTransaction) (11011))
((IncompletePrepareSignature) (11012))
+ ((TransactionSuccessorHasLeases) (11013))
);
////////////////////////////////////////////////////////////////////////////////