diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-06-17 13:21:36 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-06-17 13:35:21 +0300 |
commit | 95cc874865c4e7d67caf3dc49d48fc6884ac84bc (patch) | |
tree | abb30f268d75c39e6484afa1cee748fc1d7bb46a | |
parent | 99e82a86110035ec748d05ee2dc5f2ef8cd854df (diff) | |
download | ydb-95cc874865c4e7d67caf3dc49d48fc6884ac84bc.tar.gz |
[queues] YT-21930: Rename AdvanceConsumer -> AdvanceQueueConsumer
Rename AdvanceConsumer -> AdvanceQueueConsumer
d56d22b75c7563533fa41260f799b2a5f1dc04b7
-rw-r--r-- | yt/yt/client/api/delegating_transaction.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/api/delegating_transaction.h | 4 | ||||
-rw-r--r-- | yt/yt/client/api/queue_transaction.h | 6 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/api_service_proxy.h | 4 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 2 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.cpp | 5 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.h | 6 | ||||
-rw-r--r-- | yt/yt/client/driver/driver.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.h | 8 | ||||
-rw-r--r-- | yt/yt/client/federated/client.cpp | 8 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/transaction.h | 6 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 4 |
13 files changed, 36 insertions, 31 deletions
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp index c80fa09146..9b04f3451f 100644 --- a/yt/yt/client/api/delegating_transaction.cpp +++ b/yt/yt/client/api/delegating_transaction.cpp @@ -290,13 +290,13 @@ DELEGATE_METHOD(void, AdvanceConsumer, ( i64 newOffset), (consumerPath, queuePath, partitionIndex, oldOffset, newOffset)) -DELEGATE_METHOD(TFuture<void>, AdvanceConsumer, ( +DELEGATE_METHOD(TFuture<void>, AdvanceQueueConsumer, ( const NYT::NYPath::TRichYPath& consumer, const NYT::NYPath::TRichYPath& queue, int partitionIndex, std::optional<i64> oldOffset, i64 newOffset, - const NYT::NApi::TAdvanceConsumerOptions& options), + const NYT::NApi::TAdvanceQueueConsumerOptions& options), (consumer, queue, partitionIndex, oldOffset, newOffset, options)) DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, ( diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h index 6377bab05c..2cf1f780e5 100644 --- a/yt/yt/client/api/delegating_transaction.h +++ b/yt/yt/client/api/delegating_transaction.h @@ -235,13 +235,13 @@ public: int partitionIndex, std::optional<i64> oldOffset, i64 newOffset) override; - TFuture<void> AdvanceConsumer( + TFuture<void> AdvanceQueueConsumer( const NYT::NYPath::TRichYPath& consumer, const NYT::NYPath::TRichYPath& queue, int partitionIndex, std::optional<i64> oldOffset, i64 newOffset, - const NYT::NApi::TAdvanceConsumerOptions& options) override; + const NYT::NApi::TAdvanceQueueConsumerOptions& options) override; TFuture<TPushQueueProducerResult> PushQueueProducer( const NYPath::TRichYPath& producerPath, diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h index cfaa07c49d..ebc492db06 100644 --- a/yt/yt/client/api/queue_transaction.h +++ b/yt/yt/client/api/queue_transaction.h @@ -8,7 +8,7 @@ namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// -struct TAdvanceConsumerOptions +struct TAdvanceQueueConsumerOptions : public TTimeoutOptions { }; @@ -70,13 +70,13 @@ struct IQueueTransaction * If #oldOffset is specified, the current offset is read inside this transaction and compared with #oldOffset. * If they are equal, the new offset is written, otherwise an exception is thrown. */ - virtual TFuture<void> AdvanceConsumer( + virtual TFuture<void> AdvanceQueueConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, std::optional<i64> oldOffset, i64 newOffset, - const TAdvanceConsumerOptions& options) = 0; + const TAdvanceQueueConsumerOptions& options) = 0; //! Write rows in the queue with checking their sequence number. /*! diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index c40e855363..91fe45fc3d 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -85,7 +85,9 @@ public: DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AlterReplicationCard); // Queues - DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AdvanceConsumer); + // COMPAT(nadya73): For compatability with old versions of clients. + DEFINE_RPC_PROXY_METHOD_GENERIC(AdvanceConsumer, NRpcProxy::NProto::TReqAdvanceQueueConsumer, NRpcProxy::NProto::TRspAdvanceQueueConsumer); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AdvanceQueueConsumer); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullQueue); // COMPAT(nadya73): For compatability with old versions of clients. DEFINE_RPC_PROXY_METHOD_GENERIC(PullConsumer, NRpcProxy::NProto::TReqPullQueueConsumer, NRpcProxy::NProto::TRspPullQueueConsumer); diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index 61d4404260..bc6d64843f 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -778,7 +778,7 @@ TFuture<IQueueRowsetPtr> TClient::PullQueueConsumer( { auto proxy = CreateApiServiceProxy(); - // Use PullConsumer (not PullQueueConsumer) for backward compatibility. + // COMPAT(nadya73): Use PullConsumer (not PullQueueConsumer) for compatibility with old clusters. auto req = proxy.PullConsumer(); req->SetResponseHeavy(true); SetTimeoutOptions(*req, options); diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index 7dfb4d20bc..a952b7ff97 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -493,18 +493,19 @@ void TTransaction::ModifyRows( } } -TFuture<void> TTransaction::AdvanceConsumer( +TFuture<void> TTransaction::AdvanceQueueConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, std::optional<i64> oldOffset, i64 newOffset, - const TAdvanceConsumerOptions& options) + const TAdvanceQueueConsumerOptions& options) { ValidateTabletTransactionId(GetId()); THROW_ERROR_EXCEPTION_IF(newOffset < 0, "Queue consumer offset %v cannot be negative", newOffset); + // COMPAT(nadya73): Use AdvaceConsumer (not AdvanceQueueConsumer) for compatibility with old clusters. auto req = Proxy_.AdvanceConsumer(); SetTimeoutOptions(*req, options); diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h index 34b93ba90d..8b907fec2d 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.h +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h @@ -77,14 +77,14 @@ public: TSharedRange<NApi::TRowModification> modifications, const NApi::TModifyRowsOptions& options) override; - using TQueueTransactionMixin::AdvanceConsumer; - TFuture<void> AdvanceConsumer( + using TQueueTransactionMixin::AdvanceQueueConsumer; + TFuture<void> AdvanceQueueConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, std::optional<i64> oldOffset, i64 newOffset, - const TAdvanceConsumerOptions& options) override; + const TAdvanceQueueConsumerOptions& options) override; TFuture<TPushQueueProducerResult> PushQueueProducer( const NYPath::TRichYPath& producerPath, diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index 4853e6b65e..bba7ffbabd 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -353,7 +353,9 @@ public: // COMPAT(nadya73): for compatibility with old versions of clients. REGISTER (TPullQueueConsumerCommand, "pull_consumer", Null, Tabular, false, true , ApiVersion4); REGISTER (TPullQueueConsumerCommand, "pull_queue_consumer", Null, Tabular, false, true , ApiVersion4); - REGISTER (TAdvanceConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4); + // COMPAT(nadya73): for compatibility with old versions of clients. + REGISTER (TAdvanceQueueConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4); + REGISTER (TAdvanceQueueConsumerCommand, "advance_queue_consumer", Null, Structured, true, false, ApiVersion4); REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4); REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4); REGISTER (TPushQueueProducerCommand, "push_queue_producer", Null, Structured, true, false, ApiVersion4); diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index d1e4bb3d81..f6db0ff134 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -244,7 +244,7 @@ void TPullQueueConsumerCommand::DoExecute(ICommandContextPtr context) //////////////////////////////////////////////////////////////////////////////// -void TAdvanceConsumerCommand::Register(TRegistrar registrar) +void TAdvanceQueueConsumerCommand::Register(TRegistrar registrar) { registrar.Parameter("consumer_path", &TThis::ConsumerPath); registrar.Parameter("queue_path", &TThis::QueuePath); @@ -256,14 +256,14 @@ void TAdvanceConsumerCommand::Register(TRegistrar registrar) .Optional(); } -void TAdvanceConsumerCommand::DoExecute(ICommandContextPtr context) +void TAdvanceQueueConsumerCommand::DoExecute(ICommandContextPtr context) { auto transaction = GetTransaction(context); if (ClientSide.value_or(false)) { transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset); } else { - WaitFor(transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset, /*options*/ {})) + WaitFor(transaction->AdvanceQueueConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset, /*options*/ {})) .ThrowOnError(); } diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h index 85ea7637af..249ef9bddb 100644 --- a/yt/yt/client/driver/queue_commands.h +++ b/yt/yt/client/driver/queue_commands.h @@ -100,15 +100,15 @@ private: //////////////////////////////////////////////////////////////////////////////// -struct TAdvanceConsumerOptions +struct TAdvanceQueueConsumerOptions : public TTabletWriteOptions { }; -class TAdvanceConsumerCommand - : public TTypedCommand<TAdvanceConsumerOptions> +class TAdvanceQueueConsumerCommand + : public TTypedCommand<TAdvanceQueueConsumerOptions> { public: - REGISTER_YSON_STRUCT_LITE(TAdvanceConsumerCommand); + REGISTER_YSON_STRUCT_LITE(TAdvanceQueueConsumerCommand); static void Register(TRegistrar registrar); diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index 4f60ac5e0b..b5f79cee42 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -81,14 +81,14 @@ public: TSharedRange<TRowModification> modifications, const TModifyRowsOptions& options) override; - using TQueueTransactionMixin::AdvanceConsumer; - TFuture<void> AdvanceConsumer( + using TQueueTransactionMixin::AdvanceQueueConsumer; + TFuture<void> AdvanceQueueConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, std::optional<i64> oldOffset, i64 newOffset, - const TAdvanceConsumerOptions& options) override; + const TAdvanceQueueConsumerOptions& options) override; TFuture<TPushQueueProducerResult> PushQueueProducer( const NYPath::TRichYPath& producerPath, @@ -516,7 +516,7 @@ TRANSACTION_METHOD_IMPL(void, Abort, (const TTransactionAbortOptions&)); TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); -TRANSACTION_METHOD_IMPL(void, AdvanceConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceConsumerOptions&)); +TRANSACTION_METHOD_IMPL(void, AdvanceQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceQueueConsumerOptions&)); TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, i64, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&)); diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index 60ad83bef3..bc0191bf63 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -199,14 +199,14 @@ public: MOCK_METHOD(void, SubscribeAborted, (const TAbortedHandler& callback), (override)); MOCK_METHOD(void, UnsubscribeAborted, (const TAbortedHandler& callback), (override)); - using TQueueTransactionMixin::AdvanceConsumer; - MOCK_METHOD(TFuture<void>, AdvanceConsumer, ( + using TQueueTransactionMixin::AdvanceQueueConsumer; + MOCK_METHOD(TFuture<void>, AdvanceQueueConsumer, ( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, std::optional<i64> oldOffset, i64 newOffset, - const TAdvanceConsumerOptions& options), (override)); + const TAdvanceQueueConsumerOptions& options), (override)); MOCK_METHOD(void, ModifyRows, ( const NYPath::TYPath& path, diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 24995c479b..0c3a43469b 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -644,7 +644,7 @@ message TRspSelectRows //////////////////////////////////////////////////////////////////////////////// -message TReqAdvanceConsumer +message TReqAdvanceQueueConsumer { optional NYT.NProto.TGuid transaction_id = 1; optional string consumer_path = 2; @@ -654,7 +654,7 @@ message TReqAdvanceConsumer optional int64 new_offset = 6; } -message TRspAdvanceConsumer +message TRspAdvanceQueueConsumer { } |