aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2025-01-26 23:42:22 +0300
committernadya73 <nadya73@yandex-team.com>2025-01-26 23:58:47 +0300
commit19ca351db376d4e785f68758db99eaa6ce92084c (patch)
treeccd9cc980606d06002160e43ea91465efa14b5e6
parent7def09dc9ce7bf9366218aeaa9ec17540092f612 (diff)
downloadydb-19ca351db376d4e785f68758db99eaa6ce92084c.tar.gz
YT-21330: Add require_sync_replica parameter in push_queue_producer method
* Changelog entry Type: feature Component: proxy Add `require_sync_replica` parameter in `push_queue_producer` handler commit_hash:0d9e31193e3c021824910aec8d105e90f06d6319
-rw-r--r--yt/yt/client/api/queue_transaction.h4
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp1
-rw-r--r--yt/yt/client/driver/queue_commands.cpp7
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto2
4 files changed, 13 insertions, 1 deletions
diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h
index 4644fcc8e1..1487cb0ca3 100644
--- a/yt/yt/client/api/queue_transaction.h
+++ b/yt/yt/client/api/queue_transaction.h
@@ -30,6 +30,10 @@ struct TPushQueueProducerOptions
* what data should be written after fail of the user process.
*/
NYTree::INodePtr UserMeta;
+
+ //! If this happens to be a push into a replicated table queue,
+ //! controls if at least one sync replica is required.
+ bool RequireSyncReplica = true;
};
struct TPushQueueProducerResult
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index b3cff92078..9cc13718fa 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -552,6 +552,7 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer(
if (options.SequenceNumber) {
req->set_sequence_number(options.SequenceNumber->Underlying());
}
+ req->set_require_sync_replica(options.RequireSyncReplica);
if (NTracing::IsCurrentTraceContextRecorded()) {
req->TracingTags().emplace_back("yt.producer_path", ToString(producerPath));
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index 8f8040b193..fea2f15d9e 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -389,7 +389,12 @@ void TPushQueueProducerCommand::Register(TRegistrar registrar)
registrar.Parameter("queue_path", &TThis::QueuePath);
registrar.Parameter("session_id", &TThis::SessionId);
registrar.Parameter("epoch", &TThis::Epoch);
-
+ registrar.ParameterWithUniversalAccessor<bool>(
+ "require_sync_replica",
+ [] (TThis* command) -> auto& {
+ return command->Options.RequireSyncReplica;
+ })
+ .Optional(/*init*/ false);
}
void TPushQueueProducerCommand::DoExecute(ICommandContextPtr context)
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 7cfd7994d0..bfdd234fb2 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -685,6 +685,8 @@ message TReqPushQueueProducer
optional bytes user_meta = 6; // YSON
optional int64 sequence_number = 7;
+ optional bool require_sync_replica = 8;
+
required TRowsetDescriptor rowset_descriptor = 200;
}