diff options
author | nadya73 <nadya73@yandex-team.com> | 2025-01-26 23:42:22 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2025-01-26 23:58:47 +0300 |
commit | 19ca351db376d4e785f68758db99eaa6ce92084c (patch) | |
tree | ccd9cc980606d06002160e43ea91465efa14b5e6 | |
parent | 7def09dc9ce7bf9366218aeaa9ec17540092f612 (diff) | |
download | ydb-19ca351db376d4e785f68758db99eaa6ce92084c.tar.gz |
YT-21330: Add require_sync_replica parameter in push_queue_producer method
* Changelog entry
Type: feature
Component: proxy
Add `require_sync_replica` parameter in `push_queue_producer` handler
commit_hash:0d9e31193e3c021824910aec8d105e90f06d6319
-rw-r--r-- | yt/yt/client/api/queue_transaction.h | 4 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.cpp | 1 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.cpp | 7 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 2 |
4 files changed, 13 insertions, 1 deletions
diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h index 4644fcc8e1..1487cb0ca3 100644 --- a/yt/yt/client/api/queue_transaction.h +++ b/yt/yt/client/api/queue_transaction.h @@ -30,6 +30,10 @@ struct TPushQueueProducerOptions * what data should be written after fail of the user process. */ NYTree::INodePtr UserMeta; + + //! If this happens to be a push into a replicated table queue, + //! controls if at least one sync replica is required. + bool RequireSyncReplica = true; }; struct TPushQueueProducerResult diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index b3cff92078..9cc13718fa 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -552,6 +552,7 @@ TFuture<TPushQueueProducerResult> TTransaction::PushQueueProducer( if (options.SequenceNumber) { req->set_sequence_number(options.SequenceNumber->Underlying()); } + req->set_require_sync_replica(options.RequireSyncReplica); if (NTracing::IsCurrentTraceContextRecorded()) { req->TracingTags().emplace_back("yt.producer_path", ToString(producerPath)); diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index 8f8040b193..fea2f15d9e 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -389,7 +389,12 @@ void TPushQueueProducerCommand::Register(TRegistrar registrar) registrar.Parameter("queue_path", &TThis::QueuePath); registrar.Parameter("session_id", &TThis::SessionId); registrar.Parameter("epoch", &TThis::Epoch); - + registrar.ParameterWithUniversalAccessor<bool>( + "require_sync_replica", + [] (TThis* command) -> auto& { + return command->Options.RequireSyncReplica; + }) + .Optional(/*init*/ false); } void TPushQueueProducerCommand::DoExecute(ICommandContextPtr context) diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 7cfd7994d0..bfdd234fb2 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -685,6 +685,8 @@ message TReqPushQueueProducer optional bytes user_meta = 6; // YSON optional int64 sequence_number = 7; + optional bool require_sync_replica = 8; + required TRowsetDescriptor rowset_descriptor = 200; } |