diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-06-04 10:32:18 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-06-04 10:54:30 +0300 |
commit | b10fa005adbac280c4ec81857a1d0a000e15af9c (patch) | |
tree | c7e387e386c45018152cdd46b7fad5a558616e3e | |
parent | 2f30887a9d7c920d55d57d65e4111495f0d57f42 (diff) | |
download | ydb-b10fa005adbac280c4ec81857a1d0a000e15af9c.tar.gz |
YT-21855: Rename PullConsumer -> PullQueueConsumer
Rename PullConsumer -> PullQueueConsumer
1ec6e386d6a3570532f130b99857a4bc2f20210b
-rw-r--r-- | yt/yt/client/api/delegating_client.h | 4 | ||||
-rw-r--r-- | yt/yt/client/api/internal_client.h | 2 | ||||
-rw-r--r-- | yt/yt/client/api/queue_client.h | 6 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/api_service_proxy.h | 3 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 7 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.h | 4 | ||||
-rw-r--r-- | yt/yt/client/driver/driver.cpp | 3 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/driver/queue_commands.h | 6 | ||||
-rw-r--r-- | yt/yt/client/federated/client.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/hedging/hedging.cpp | 2 | ||||
-rw-r--r-- | yt/yt/client/queue_client/config.cpp | 7 | ||||
-rw-r--r-- | yt/yt/client/queue_client/config.h | 2 | ||||
-rw-r--r-- | yt/yt/client/queue_client/partition_reader.cpp | 12 | ||||
-rw-r--r-- | yt/yt/client/unittests/mock/client.h | 4 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 4 |
16 files changed, 41 insertions, 37 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index f490a050e5e..60f55b1c61f 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -96,13 +96,13 @@ public: const TPullQueueOptions& options), (queuePath, offset, partitionIndex, rowBatchReadOptions, options)) - DELEGATE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, ( + DELEGATE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueueConsumer, ( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, std::optional<i64> offset, int partitionIndex, const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, - const TPullConsumerOptions& options), + const TPullQueueConsumerOptions& options), (consumerPath, queuePath, offset, partitionIndex, rowBatchReadOptions, options)) DELEGATE_METHOD(TFuture<void>, RegisterQueueConsumer, ( diff --git a/yt/yt/client/api/internal_client.h b/yt/yt/client/api/internal_client.h index 61a2f300ab1..cf2ed5a30ce 100644 --- a/yt/yt/client/api/internal_client.h +++ b/yt/yt/client/api/internal_client.h @@ -137,7 +137,7 @@ struct IInternalClient const TUnlockHunkStoreOptions& options = {}) = 0; //! Same as NApi::IClient::PullQueue, but without authentication. - //! This is used inside methods like NApi::IClient::PullConsumer, which perform their own authentication + //! This is used inside methods like NApi::IClient::PullQueueConsumer, which perform their own authentication //! and allow reading from a queue without having read permissions for the underlying dynamic table. virtual TFuture<NQueueClient::IQueueRowsetPtr> PullQueueUnauthenticated( const NYPath::TRichYPath& queuePath, diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h index f878e5eacf8..8425ecd8163 100644 --- a/yt/yt/client/api/queue_client.h +++ b/yt/yt/client/api/queue_client.h @@ -47,7 +47,7 @@ struct TPullQueueOptions bool UseNativeTabletNodeApi = true; }; -struct TPullConsumerOptions +struct TPullQueueConsumerOptions : public TPullQueueOptions { }; @@ -117,13 +117,13 @@ struct IQueueClient //! Same as PullQueue, but requires user to have read-access to the consumer and the consumer being registered for the given queue. //! There is no guarantee that `rowBatchReadOptions.MaxRowCount` rows will be returned even if they are in the queue. - virtual TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer( + virtual TFuture<NQueueClient::IQueueRowsetPtr> PullQueueConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, std::optional<i64> offset, int partitionIndex, const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, - const TPullConsumerOptions& options = {}) = 0; + const TPullQueueConsumerOptions& options = {}) = 0; virtual TFuture<void> RegisterQueueConsumer( const NYPath::TRichYPath& queuePath, diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index 5ea29e05e85..3ad68629058 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -87,7 +87,8 @@ public: // Queues DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AdvanceConsumer); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullQueue); - DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullConsumer); + DEFINE_RPC_PROXY_METHOD_GENERIC(PullConsumer, NRpcProxy::NProto::TReqPullQueueConsumer, NRpcProxy::NProto::TRspPullQueueConsumer); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullQueueConsumer); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RegisterQueueConsumer); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, UnregisterQueueConsumer); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ListQueueConsumerRegistrations); diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index d41847cbe07..1fd5d360aa8 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -768,16 +768,17 @@ TFuture<IQueueRowsetPtr> TClient::PullQueue( })); } -TFuture<IQueueRowsetPtr> TClient::PullConsumer( +TFuture<IQueueRowsetPtr> TClient::PullQueueConsumer( const TRichYPath& consumerPath, const TRichYPath& queuePath, std::optional<i64> offset, int partitionIndex, const TQueueRowBatchReadOptions& rowBatchReadOptions, - const TPullConsumerOptions& options) + const TPullQueueConsumerOptions& options) { auto proxy = CreateApiServiceProxy(); + // Use PullConsumer (not PullQueueConsumer) for backward compatibility. auto req = proxy.PullConsumer(); req->SetResponseHeavy(true); SetTimeoutOptions(*req, options); @@ -792,7 +793,7 @@ TFuture<IQueueRowsetPtr> TClient::PullConsumer( req->set_replica_consistency(static_cast<NProto::EReplicaConsistency>(options.ReplicaConsistency)); - return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPullConsumerPtr& rsp) -> IQueueRowsetPtr { + return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPullQueueConsumerPtr& rsp) -> IQueueRowsetPtr { auto rowset = DeserializeRowset<TUnversionedRow>( rsp->rowset_descriptor(), MergeRefsToRef<TRpcProxyClientBufferTag>(rsp->Attachments())); diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index d6bd791e6d6..f72882d1e40 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -137,13 +137,13 @@ public: const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, const TPullQueueOptions& options = {}) override; - TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer( + TFuture<NQueueClient::IQueueRowsetPtr> PullQueueConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, std::optional<i64> offset, int partitionIndex, const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, - const TPullConsumerOptions& options = {}) override; + const TPullQueueConsumerOptions& options = {}) override; TFuture<void> RegisterQueueConsumer( const NYPath::TRichYPath& queuePath, diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index db647489ef3..7b0ee3cf281 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -349,7 +349,8 @@ public: REGISTER (TUnregisterQueueConsumerCommand, "unregister_queue_consumer", Null, Structured, true, false, ApiVersion4); REGISTER (TListQueueConsumerRegistrationsCommand, "list_queue_consumer_registrations", Null, Structured, false, false, ApiVersion4); REGISTER (TPullQueueCommand, "pull_queue", Null, Tabular, false, true , ApiVersion4); - REGISTER (TPullConsumerCommand, "pull_consumer", Null, Tabular, false, true , ApiVersion4); + REGISTER (TPullQueueConsumerCommand, "pull_consumer", Null, Tabular, false, true , ApiVersion4); + REGISTER (TPullQueueConsumerCommand, "pull_queue_consumer", Null, Tabular, false, true , ApiVersion4); REGISTER (TAdvanceConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4); REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4); REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4); diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index 2068cce9d67..0fbe979af33 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -155,7 +155,7 @@ void TPullQueueCommand::DoExecute(ICommandContextPtr context) //////////////////////////////////////////////////////////////////////////////// -void TPullConsumerCommand::Register(TRegistrar registrar) +void TPullQueueConsumerCommand::Register(TRegistrar registrar) { registrar.Parameter("consumer_path", &TThis::ConsumerPath); @@ -194,11 +194,11 @@ void TPullConsumerCommand::Register(TRegistrar registrar) .Optional(/*init*/ false); } -void TPullConsumerCommand::DoExecute(ICommandContextPtr context) +void TPullQueueConsumerCommand::DoExecute(ICommandContextPtr context) { auto client = context->GetClient(); - auto result = WaitFor(client->PullConsumer( + auto result = WaitFor(client->PullQueueConsumer( ConsumerPath, QueuePath, Offset, diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h index 53179852079..fc5bec8cb71 100644 --- a/yt/yt/client/driver/queue_commands.h +++ b/yt/yt/client/driver/queue_commands.h @@ -80,11 +80,11 @@ private: //////////////////////////////////////////////////////////////////////////////// -class TPullConsumerCommand - : public TTypedCommand<NApi::TPullConsumerOptions> +class TPullQueueConsumerCommand + : public TTypedCommand<NApi::TPullQueueConsumerOptions> { public: - REGISTER_YSON_STRUCT_LITE(TPullConsumerCommand); + REGISTER_YSON_STRUCT_LITE(TPullQueueConsumerCommand); static void Register(TRegistrar registrar); diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index efbd61c0c56..bba000160d4 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -268,13 +268,13 @@ public: int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&) override; - TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer( + TFuture<NQueueClient::IQueueRowsetPtr> PullQueueConsumer( const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, - const TPullConsumerOptions&) override; + const TPullQueueConsumerOptions&) override; TFuture<ITransactionPtr> StartTransaction( NTransactionClient::ETransactionType type, @@ -666,7 +666,7 @@ CLIENT_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, ( CLIENT_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); CLIENT_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&)); -CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&)); +CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueConsumerOptions&)); CLIENT_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&)); CLIENT_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&)); CLIENT_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index d5f6bd946aa..57d4b0c03e4 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -79,7 +79,7 @@ public: RETRYABLE_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, (const TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); RETRYABLE_METHOD(TFuture<TSelectRowsResult>, SelectRows, (const TString&, const TSelectRowsOptions&)); RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueue, (const TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&)); - RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (const TRichYPath&, const TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&)); + RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueueConsumer, (const TRichYPath&, const TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueConsumerOptions&)); RETRYABLE_METHOD(TFuture<void>, RegisterQueueConsumer, (const TRichYPath&, const TRichYPath&, bool, const TRegisterQueueConsumerOptions&)); RETRYABLE_METHOD(TFuture<void>, UnregisterQueueConsumer, (const TRichYPath&, const TRichYPath&, const TUnregisterQueueConsumerOptions&)); RETRYABLE_METHOD(TFuture<std::vector<TListQueueConsumerRegistrationsResult>>, ListQueueConsumerRegistrations, (const std::optional<TRichYPath>&, const std::optional<TRichYPath>&, const TListQueueConsumerRegistrationsOptions&)); diff --git a/yt/yt/client/queue_client/config.cpp b/yt/yt/client/queue_client/config.cpp index 806135c58d8..cba7329eba0 100644 --- a/yt/yt/client/queue_client/config.cpp +++ b/yt/yt/client/queue_client/config.cpp @@ -15,12 +15,13 @@ void TPartitionReaderConfig::Register(TRegistrar registrar) registrar.Parameter("use_native_tablet_node_api", &TThis::UseNativeTabletNodeApi) .Default(false); - registrar.Parameter("use_pull_consumer", &TThis::UsePullConsumer) + registrar.Parameter("use_pull_queue_consumer", &TThis::UsePullQueueConsumer) + .Alias("use_pull_consumer") .Default(false); registrar.Postprocessor([] (TThis* config) { - if (config->UsePullConsumer && !config->UseNativeTabletNodeApi) { - THROW_ERROR_EXCEPTION("PullConsumer can only be used with the native tablet node api for pulling rows"); + if (config->UsePullQueueConsumer && !config->UseNativeTabletNodeApi) { + THROW_ERROR_EXCEPTION("PullQueueConsumer can only be used with the native tablet node api for pulling rows"); } }); } diff --git a/yt/yt/client/queue_client/config.h b/yt/yt/client/queue_client/config.h index 2f83573d37a..de862c1e6bd 100644 --- a/yt/yt/client/queue_client/config.h +++ b/yt/yt/client/queue_client/config.h @@ -23,7 +23,7 @@ public: std::optional<i64> DataWeightPerRowHint; bool UseNativeTabletNodeApi; - bool UsePullConsumer; + bool UsePullQueueConsumer; REGISTER_YSON_STRUCT(TPartitionReaderConfig); diff --git a/yt/yt/client/queue_client/partition_reader.cpp b/yt/yt/client/queue_client/partition_reader.cpp index d02774cd1fa..a7c4b3a5635 100644 --- a/yt/yt/client/queue_client/partition_reader.cpp +++ b/yt/yt/client/queue_client/partition_reader.cpp @@ -273,7 +273,7 @@ public: , RowBatchReadOptions_({Config_->MaxRowCount, Config_->MaxDataWeight, Config_->DataWeightPerRowHint}) , Logger(QueueClientLogger().WithTag("Consumer: %v, Queue: %v, Partition: %v", ConsumerPath_, QueuePath_, PartitionIndex_)) { - PullConsumerOptions_.UseNativeTabletNodeApi = Config_->UseNativeTabletNodeApi; + PullQueueConsumerOptions_.UseNativeTabletNodeApi = Config_->UseNativeTabletNodeApi; } TFuture<void> Open() override @@ -301,7 +301,7 @@ private: const TQueueRowBatchReadOptions RowBatchReadOptions_; const NLogging::TLogger Logger; - TPullConsumerOptions PullConsumerOptions_; + TPullQueueConsumerOptions PullQueueConsumerOptions_; ISubConsumerClientPtr ConsumerClient_; bool Opened_ = false; @@ -384,20 +384,20 @@ private: RowBatchReadOptions_.MaxRowCount, RowBatchReadOptions_.MaxDataWeight, RowBatchReadOptions_.DataWeightPerRowHint); - auto asyncRowset = (Config_->UsePullConsumer - ? Client_->PullConsumer( + auto asyncRowset = (Config_->UsePullQueueConsumer + ? Client_->PullQueueConsumer( ConsumerPath_, QueuePath_, currentOffset, PartitionIndex_, RowBatchReadOptions_, - PullConsumerOptions_) + PullQueueConsumerOptions_) : Client_->PullQueue( QueuePath_, currentOffset, PartitionIndex_, RowBatchReadOptions_, - PullConsumerOptions_)); + PullQueueConsumerOptions_)); auto rowset = WaitFor(asyncRowset) .ValueOrThrow(); diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index c283bafe3d5..29b813cfcfa 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -74,13 +74,13 @@ public: const TPullQueueOptions& options), (override)); - MOCK_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, ( + MOCK_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueueConsumer, ( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, std::optional<i64> offset, int partitionIndex, const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, - const TPullConsumerOptions& options), + const TPullQueueConsumerOptions& options), (override)); MOCK_METHOD(TFuture<void>, RegisterQueueConsumer, ( diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 9eae7aeecf1..95ee6147daf 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -682,7 +682,7 @@ message TRspPullQueue optional int64 start_offset = 2; } -message TReqPullConsumer +message TReqPullQueueConsumer { optional string consumer_path = 1; optional string queue_path = 2; @@ -693,7 +693,7 @@ message TReqPullConsumer optional EReplicaConsistency replica_consistency = 6; } -message TRspPullConsumer +message TRspPullQueueConsumer { optional TRowsetDescriptor rowset_descriptor = 1; optional int64 start_offset = 2; |