aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-06-04 10:32:18 +0300
committernadya73 <nadya73@yandex-team.com>2024-06-04 10:54:30 +0300
commitb10fa005adbac280c4ec81857a1d0a000e15af9c (patch)
treec7e387e386c45018152cdd46b7fad5a558616e3e
parent2f30887a9d7c920d55d57d65e4111495f0d57f42 (diff)
downloadydb-b10fa005adbac280c4ec81857a1d0a000e15af9c.tar.gz
YT-21855: Rename PullConsumer -> PullQueueConsumer
Rename PullConsumer -> PullQueueConsumer 1ec6e386d6a3570532f130b99857a4bc2f20210b
-rw-r--r--yt/yt/client/api/delegating_client.h4
-rw-r--r--yt/yt/client/api/internal_client.h2
-rw-r--r--yt/yt/client/api/queue_client.h6
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h3
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp7
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h4
-rw-r--r--yt/yt/client/driver/driver.cpp3
-rw-r--r--yt/yt/client/driver/queue_commands.cpp6
-rw-r--r--yt/yt/client/driver/queue_commands.h6
-rw-r--r--yt/yt/client/federated/client.cpp6
-rw-r--r--yt/yt/client/hedging/hedging.cpp2
-rw-r--r--yt/yt/client/queue_client/config.cpp7
-rw-r--r--yt/yt/client/queue_client/config.h2
-rw-r--r--yt/yt/client/queue_client/partition_reader.cpp12
-rw-r--r--yt/yt/client/unittests/mock/client.h4
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto4
16 files changed, 41 insertions, 37 deletions
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index f490a050e5e..60f55b1c61f 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -96,13 +96,13 @@ public:
const TPullQueueOptions& options),
(queuePath, offset, partitionIndex, rowBatchReadOptions, options))
- DELEGATE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (
+ DELEGATE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueueConsumer, (
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
std::optional<i64> offset,
int partitionIndex,
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
- const TPullConsumerOptions& options),
+ const TPullQueueConsumerOptions& options),
(consumerPath, queuePath, offset, partitionIndex, rowBatchReadOptions, options))
DELEGATE_METHOD(TFuture<void>, RegisterQueueConsumer, (
diff --git a/yt/yt/client/api/internal_client.h b/yt/yt/client/api/internal_client.h
index 61a2f300ab1..cf2ed5a30ce 100644
--- a/yt/yt/client/api/internal_client.h
+++ b/yt/yt/client/api/internal_client.h
@@ -137,7 +137,7 @@ struct IInternalClient
const TUnlockHunkStoreOptions& options = {}) = 0;
//! Same as NApi::IClient::PullQueue, but without authentication.
- //! This is used inside methods like NApi::IClient::PullConsumer, which perform their own authentication
+ //! This is used inside methods like NApi::IClient::PullQueueConsumer, which perform their own authentication
//! and allow reading from a queue without having read permissions for the underlying dynamic table.
virtual TFuture<NQueueClient::IQueueRowsetPtr> PullQueueUnauthenticated(
const NYPath::TRichYPath& queuePath,
diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h
index f878e5eacf8..8425ecd8163 100644
--- a/yt/yt/client/api/queue_client.h
+++ b/yt/yt/client/api/queue_client.h
@@ -47,7 +47,7 @@ struct TPullQueueOptions
bool UseNativeTabletNodeApi = true;
};
-struct TPullConsumerOptions
+struct TPullQueueConsumerOptions
: public TPullQueueOptions
{ };
@@ -117,13 +117,13 @@ struct IQueueClient
//! Same as PullQueue, but requires user to have read-access to the consumer and the consumer being registered for the given queue.
//! There is no guarantee that `rowBatchReadOptions.MaxRowCount` rows will be returned even if they are in the queue.
- virtual TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer(
+ virtual TFuture<NQueueClient::IQueueRowsetPtr> PullQueueConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
std::optional<i64> offset,
int partitionIndex,
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
- const TPullConsumerOptions& options = {}) = 0;
+ const TPullQueueConsumerOptions& options = {}) = 0;
virtual TFuture<void> RegisterQueueConsumer(
const NYPath::TRichYPath& queuePath,
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index 5ea29e05e85..3ad68629058 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -87,7 +87,8 @@ public:
// Queues
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AdvanceConsumer);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullQueue);
- DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullConsumer);
+ DEFINE_RPC_PROXY_METHOD_GENERIC(PullConsumer, NRpcProxy::NProto::TReqPullQueueConsumer, NRpcProxy::NProto::TRspPullQueueConsumer);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullQueueConsumer);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RegisterQueueConsumer);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, UnregisterQueueConsumer);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ListQueueConsumerRegistrations);
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index d41847cbe07..1fd5d360aa8 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -768,16 +768,17 @@ TFuture<IQueueRowsetPtr> TClient::PullQueue(
}));
}
-TFuture<IQueueRowsetPtr> TClient::PullConsumer(
+TFuture<IQueueRowsetPtr> TClient::PullQueueConsumer(
const TRichYPath& consumerPath,
const TRichYPath& queuePath,
std::optional<i64> offset,
int partitionIndex,
const TQueueRowBatchReadOptions& rowBatchReadOptions,
- const TPullConsumerOptions& options)
+ const TPullQueueConsumerOptions& options)
{
auto proxy = CreateApiServiceProxy();
+ // Use PullConsumer (not PullQueueConsumer) for backward compatibility.
auto req = proxy.PullConsumer();
req->SetResponseHeavy(true);
SetTimeoutOptions(*req, options);
@@ -792,7 +793,7 @@ TFuture<IQueueRowsetPtr> TClient::PullConsumer(
req->set_replica_consistency(static_cast<NProto::EReplicaConsistency>(options.ReplicaConsistency));
- return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPullConsumerPtr& rsp) -> IQueueRowsetPtr {
+ return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspPullQueueConsumerPtr& rsp) -> IQueueRowsetPtr {
auto rowset = DeserializeRowset<TUnversionedRow>(
rsp->rowset_descriptor(),
MergeRefsToRef<TRpcProxyClientBufferTag>(rsp->Attachments()));
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index d6bd791e6d6..f72882d1e40 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -137,13 +137,13 @@ public:
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
const TPullQueueOptions& options = {}) override;
- TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer(
+ TFuture<NQueueClient::IQueueRowsetPtr> PullQueueConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
std::optional<i64> offset,
int partitionIndex,
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
- const TPullConsumerOptions& options = {}) override;
+ const TPullQueueConsumerOptions& options = {}) override;
TFuture<void> RegisterQueueConsumer(
const NYPath::TRichYPath& queuePath,
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index db647489ef3..7b0ee3cf281 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -349,7 +349,8 @@ public:
REGISTER (TUnregisterQueueConsumerCommand, "unregister_queue_consumer", Null, Structured, true, false, ApiVersion4);
REGISTER (TListQueueConsumerRegistrationsCommand, "list_queue_consumer_registrations", Null, Structured, false, false, ApiVersion4);
REGISTER (TPullQueueCommand, "pull_queue", Null, Tabular, false, true , ApiVersion4);
- REGISTER (TPullConsumerCommand, "pull_consumer", Null, Tabular, false, true , ApiVersion4);
+ REGISTER (TPullQueueConsumerCommand, "pull_consumer", Null, Tabular, false, true , ApiVersion4);
+ REGISTER (TPullQueueConsumerCommand, "pull_queue_consumer", Null, Tabular, false, true , ApiVersion4);
REGISTER (TAdvanceConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4);
REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4);
REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4);
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index 2068cce9d67..0fbe979af33 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -155,7 +155,7 @@ void TPullQueueCommand::DoExecute(ICommandContextPtr context)
////////////////////////////////////////////////////////////////////////////////
-void TPullConsumerCommand::Register(TRegistrar registrar)
+void TPullQueueConsumerCommand::Register(TRegistrar registrar)
{
registrar.Parameter("consumer_path", &TThis::ConsumerPath);
@@ -194,11 +194,11 @@ void TPullConsumerCommand::Register(TRegistrar registrar)
.Optional(/*init*/ false);
}
-void TPullConsumerCommand::DoExecute(ICommandContextPtr context)
+void TPullQueueConsumerCommand::DoExecute(ICommandContextPtr context)
{
auto client = context->GetClient();
- auto result = WaitFor(client->PullConsumer(
+ auto result = WaitFor(client->PullQueueConsumer(
ConsumerPath,
QueuePath,
Offset,
diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h
index 53179852079..fc5bec8cb71 100644
--- a/yt/yt/client/driver/queue_commands.h
+++ b/yt/yt/client/driver/queue_commands.h
@@ -80,11 +80,11 @@ private:
////////////////////////////////////////////////////////////////////////////////
-class TPullConsumerCommand
- : public TTypedCommand<NApi::TPullConsumerOptions>
+class TPullQueueConsumerCommand
+ : public TTypedCommand<NApi::TPullQueueConsumerOptions>
{
public:
- REGISTER_YSON_STRUCT_LITE(TPullConsumerCommand);
+ REGISTER_YSON_STRUCT_LITE(TPullQueueConsumerCommand);
static void Register(TRegistrar registrar);
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index efbd61c0c56..bba000160d4 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -268,13 +268,13 @@ public:
int,
const NQueueClient::TQueueRowBatchReadOptions&,
const TPullQueueOptions&) override;
- TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer(
+ TFuture<NQueueClient::IQueueRowsetPtr> PullQueueConsumer(
const NYPath::TRichYPath&,
const NYPath::TRichYPath&,
std::optional<i64>,
int,
const NQueueClient::TQueueRowBatchReadOptions&,
- const TPullConsumerOptions&) override;
+ const TPullQueueConsumerOptions&) override;
TFuture<ITransactionPtr> StartTransaction(
NTransactionClient::ETransactionType type,
@@ -666,7 +666,7 @@ CLIENT_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, (
CLIENT_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
CLIENT_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&));
-CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&));
+CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueConsumerOptions&));
CLIENT_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
CLIENT_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
CLIENT_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index d5f6bd946aa..57d4b0c03e4 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -79,7 +79,7 @@ public:
RETRYABLE_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, (const TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
RETRYABLE_METHOD(TFuture<TSelectRowsResult>, SelectRows, (const TString&, const TSelectRowsOptions&));
RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueue, (const TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&));
- RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (const TRichYPath&, const TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&));
+ RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueueConsumer, (const TRichYPath&, const TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueConsumerOptions&));
RETRYABLE_METHOD(TFuture<void>, RegisterQueueConsumer, (const TRichYPath&, const TRichYPath&, bool, const TRegisterQueueConsumerOptions&));
RETRYABLE_METHOD(TFuture<void>, UnregisterQueueConsumer, (const TRichYPath&, const TRichYPath&, const TUnregisterQueueConsumerOptions&));
RETRYABLE_METHOD(TFuture<std::vector<TListQueueConsumerRegistrationsResult>>, ListQueueConsumerRegistrations, (const std::optional<TRichYPath>&, const std::optional<TRichYPath>&, const TListQueueConsumerRegistrationsOptions&));
diff --git a/yt/yt/client/queue_client/config.cpp b/yt/yt/client/queue_client/config.cpp
index 806135c58d8..cba7329eba0 100644
--- a/yt/yt/client/queue_client/config.cpp
+++ b/yt/yt/client/queue_client/config.cpp
@@ -15,12 +15,13 @@ void TPartitionReaderConfig::Register(TRegistrar registrar)
registrar.Parameter("use_native_tablet_node_api", &TThis::UseNativeTabletNodeApi)
.Default(false);
- registrar.Parameter("use_pull_consumer", &TThis::UsePullConsumer)
+ registrar.Parameter("use_pull_queue_consumer", &TThis::UsePullQueueConsumer)
+ .Alias("use_pull_consumer")
.Default(false);
registrar.Postprocessor([] (TThis* config) {
- if (config->UsePullConsumer && !config->UseNativeTabletNodeApi) {
- THROW_ERROR_EXCEPTION("PullConsumer can only be used with the native tablet node api for pulling rows");
+ if (config->UsePullQueueConsumer && !config->UseNativeTabletNodeApi) {
+ THROW_ERROR_EXCEPTION("PullQueueConsumer can only be used with the native tablet node api for pulling rows");
}
});
}
diff --git a/yt/yt/client/queue_client/config.h b/yt/yt/client/queue_client/config.h
index 2f83573d37a..de862c1e6bd 100644
--- a/yt/yt/client/queue_client/config.h
+++ b/yt/yt/client/queue_client/config.h
@@ -23,7 +23,7 @@ public:
std::optional<i64> DataWeightPerRowHint;
bool UseNativeTabletNodeApi;
- bool UsePullConsumer;
+ bool UsePullQueueConsumer;
REGISTER_YSON_STRUCT(TPartitionReaderConfig);
diff --git a/yt/yt/client/queue_client/partition_reader.cpp b/yt/yt/client/queue_client/partition_reader.cpp
index d02774cd1fa..a7c4b3a5635 100644
--- a/yt/yt/client/queue_client/partition_reader.cpp
+++ b/yt/yt/client/queue_client/partition_reader.cpp
@@ -273,7 +273,7 @@ public:
, RowBatchReadOptions_({Config_->MaxRowCount, Config_->MaxDataWeight, Config_->DataWeightPerRowHint})
, Logger(QueueClientLogger().WithTag("Consumer: %v, Queue: %v, Partition: %v", ConsumerPath_, QueuePath_, PartitionIndex_))
{
- PullConsumerOptions_.UseNativeTabletNodeApi = Config_->UseNativeTabletNodeApi;
+ PullQueueConsumerOptions_.UseNativeTabletNodeApi = Config_->UseNativeTabletNodeApi;
}
TFuture<void> Open() override
@@ -301,7 +301,7 @@ private:
const TQueueRowBatchReadOptions RowBatchReadOptions_;
const NLogging::TLogger Logger;
- TPullConsumerOptions PullConsumerOptions_;
+ TPullQueueConsumerOptions PullQueueConsumerOptions_;
ISubConsumerClientPtr ConsumerClient_;
bool Opened_ = false;
@@ -384,20 +384,20 @@ private:
RowBatchReadOptions_.MaxRowCount,
RowBatchReadOptions_.MaxDataWeight,
RowBatchReadOptions_.DataWeightPerRowHint);
- auto asyncRowset = (Config_->UsePullConsumer
- ? Client_->PullConsumer(
+ auto asyncRowset = (Config_->UsePullQueueConsumer
+ ? Client_->PullQueueConsumer(
ConsumerPath_,
QueuePath_,
currentOffset,
PartitionIndex_,
RowBatchReadOptions_,
- PullConsumerOptions_)
+ PullQueueConsumerOptions_)
: Client_->PullQueue(
QueuePath_,
currentOffset,
PartitionIndex_,
RowBatchReadOptions_,
- PullConsumerOptions_));
+ PullQueueConsumerOptions_));
auto rowset = WaitFor(asyncRowset)
.ValueOrThrow();
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index c283bafe3d5..29b813cfcfa 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -74,13 +74,13 @@ public:
const TPullQueueOptions& options),
(override));
- MOCK_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (
+ MOCK_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueueConsumer, (
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
std::optional<i64> offset,
int partitionIndex,
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
- const TPullConsumerOptions& options),
+ const TPullQueueConsumerOptions& options),
(override));
MOCK_METHOD(TFuture<void>, RegisterQueueConsumer, (
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 9eae7aeecf1..95ee6147daf 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -682,7 +682,7 @@ message TRspPullQueue
optional int64 start_offset = 2;
}
-message TReqPullConsumer
+message TReqPullQueueConsumer
{
optional string consumer_path = 1;
optional string queue_path = 2;
@@ -693,7 +693,7 @@ message TReqPullConsumer
optional EReplicaConsistency replica_consistency = 6;
}
-message TRspPullConsumer
+message TRspPullQueueConsumer
{
optional TRowsetDescriptor rowset_descriptor = 1;
optional int64 start_offset = 2;