aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-06-17 13:21:36 +0300
committernadya73 <nadya73@yandex-team.com>2024-06-17 13:35:21 +0300
commit95cc874865c4e7d67caf3dc49d48fc6884ac84bc (patch)
treeabb30f268d75c39e6484afa1cee748fc1d7bb46a
parent99e82a86110035ec748d05ee2dc5f2ef8cd854df (diff)
downloadydb-95cc874865c4e7d67caf3dc49d48fc6884ac84bc.tar.gz
[queues] YT-21930: Rename AdvanceConsumer -> AdvanceQueueConsumer
Rename AdvanceConsumer -> AdvanceQueueConsumer d56d22b75c7563533fa41260f799b2a5f1dc04b7
-rw-r--r--yt/yt/client/api/delegating_transaction.cpp4
-rw-r--r--yt/yt/client/api/delegating_transaction.h4
-rw-r--r--yt/yt/client/api/queue_transaction.h6
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h4
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp2
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp5
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.h6
-rw-r--r--yt/yt/client/driver/driver.cpp4
-rw-r--r--yt/yt/client/driver/queue_commands.cpp6
-rw-r--r--yt/yt/client/driver/queue_commands.h8
-rw-r--r--yt/yt/client/federated/client.cpp8
-rw-r--r--yt/yt/client/unittests/mock/transaction.h6
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto4
13 files changed, 36 insertions, 31 deletions
diff --git a/yt/yt/client/api/delegating_transaction.cpp b/yt/yt/client/api/delegating_transaction.cpp
index c80fa09146..9b04f3451f 100644
--- a/yt/yt/client/api/delegating_transaction.cpp
+++ b/yt/yt/client/api/delegating_transaction.cpp
@@ -290,13 +290,13 @@ DELEGATE_METHOD(void, AdvanceConsumer, (
i64 newOffset),
(consumerPath, queuePath, partitionIndex, oldOffset, newOffset))
-DELEGATE_METHOD(TFuture<void>, AdvanceConsumer, (
+DELEGATE_METHOD(TFuture<void>, AdvanceQueueConsumer, (
const NYT::NYPath::TRichYPath& consumer,
const NYT::NYPath::TRichYPath& queue,
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset,
- const NYT::NApi::TAdvanceConsumerOptions& options),
+ const NYT::NApi::TAdvanceQueueConsumerOptions& options),
(consumer, queue, partitionIndex, oldOffset, newOffset, options))
DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
diff --git a/yt/yt/client/api/delegating_transaction.h b/yt/yt/client/api/delegating_transaction.h
index 6377bab05c..2cf1f780e5 100644
--- a/yt/yt/client/api/delegating_transaction.h
+++ b/yt/yt/client/api/delegating_transaction.h
@@ -235,13 +235,13 @@ public:
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset) override;
- TFuture<void> AdvanceConsumer(
+ TFuture<void> AdvanceQueueConsumer(
const NYT::NYPath::TRichYPath& consumer,
const NYT::NYPath::TRichYPath& queue,
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset,
- const NYT::NApi::TAdvanceConsumerOptions& options) override;
+ const NYT::NApi::TAdvanceQueueConsumerOptions& options) override;
TFuture<TPushQueueProducerResult> PushQueueProducer(
const NYPath::TRichYPath& producerPath,
diff --git a/yt/yt/client/api/queue_transaction.h b/yt/yt/client/api/queue_transaction.h
index cfaa07c49d..ebc492db06 100644
--- a/yt/yt/client/api/queue_transaction.h
+++ b/yt/yt/client/api/queue_transaction.h
@@ -8,7 +8,7 @@ namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
-struct TAdvanceConsumerOptions
+struct TAdvanceQueueConsumerOptions
: public TTimeoutOptions
{ };
@@ -70,13 +70,13 @@ struct IQueueTransaction
* If #oldOffset is specified, the current offset is read inside this transaction and compared with #oldOffset.
* If they are equal, the new offset is written, otherwise an exception is thrown.
*/
- virtual TFuture<void> AdvanceConsumer(
+ virtual TFuture<void> AdvanceQueueConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset,
- const TAdvanceConsumerOptions& options) = 0;
+ const TAdvanceQueueConsumerOptions& options) = 0;
//! Write rows in the queue with checking their sequence number.
/*!
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index c40e855363..91fe45fc3d 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -85,7 +85,9 @@ public:
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AlterReplicationCard);
// Queues
- DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AdvanceConsumer);
+ // COMPAT(nadya73): For compatability with old versions of clients.
+ DEFINE_RPC_PROXY_METHOD_GENERIC(AdvanceConsumer, NRpcProxy::NProto::TReqAdvanceQueueConsumer, NRpcProxy::NProto::TRspAdvanceQueueConsumer);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AdvanceQueueConsumer);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullQueue);
// COMPAT(nadya73): For compatability with old versions of clients.
DEFINE_RPC_PROXY_METHOD_GENERIC(PullConsumer, NRpcProxy::NProto::TReqPullQueueConsumer, NRpcProxy::NProto::TRspPullQueueConsumer);
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 61d4404260..bc6d64843f 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -778,7 +778,7 @@ TFuture<IQueueRowsetPtr> TClient::PullQueueConsumer(
{
auto proxy = CreateApiServiceProxy();
- // Use PullConsumer (not PullQueueConsumer) for backward compatibility.
+ // COMPAT(nadya73): Use PullConsumer (not PullQueueConsumer) for compatibility with old clusters.
auto req = proxy.PullConsumer();
req->SetResponseHeavy(true);
SetTimeoutOptions(*req, options);
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index 7dfb4d20bc..a952b7ff97 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -493,18 +493,19 @@ void TTransaction::ModifyRows(
}
}
-TFuture<void> TTransaction::AdvanceConsumer(
+TFuture<void> TTransaction::AdvanceQueueConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset,
- const TAdvanceConsumerOptions& options)
+ const TAdvanceQueueConsumerOptions& options)
{
ValidateTabletTransactionId(GetId());
THROW_ERROR_EXCEPTION_IF(newOffset < 0, "Queue consumer offset %v cannot be negative", newOffset);
+ // COMPAT(nadya73): Use AdvaceConsumer (not AdvanceQueueConsumer) for compatibility with old clusters.
auto req = Proxy_.AdvanceConsumer();
SetTimeoutOptions(*req, options);
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h
index 34b93ba90d..8b907fec2d 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.h
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h
@@ -77,14 +77,14 @@ public:
TSharedRange<NApi::TRowModification> modifications,
const NApi::TModifyRowsOptions& options) override;
- using TQueueTransactionMixin::AdvanceConsumer;
- TFuture<void> AdvanceConsumer(
+ using TQueueTransactionMixin::AdvanceQueueConsumer;
+ TFuture<void> AdvanceQueueConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset,
- const TAdvanceConsumerOptions& options) override;
+ const TAdvanceQueueConsumerOptions& options) override;
TFuture<TPushQueueProducerResult> PushQueueProducer(
const NYPath::TRichYPath& producerPath,
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index 4853e6b65e..bba7ffbabd 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -353,7 +353,9 @@ public:
// COMPAT(nadya73): for compatibility with old versions of clients.
REGISTER (TPullQueueConsumerCommand, "pull_consumer", Null, Tabular, false, true , ApiVersion4);
REGISTER (TPullQueueConsumerCommand, "pull_queue_consumer", Null, Tabular, false, true , ApiVersion4);
- REGISTER (TAdvanceConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4);
+ // COMPAT(nadya73): for compatibility with old versions of clients.
+ REGISTER (TAdvanceQueueConsumerCommand, "advance_consumer", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TAdvanceQueueConsumerCommand, "advance_queue_consumer", Null, Structured, true, false, ApiVersion4);
REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4);
REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4);
REGISTER (TPushQueueProducerCommand, "push_queue_producer", Null, Structured, true, false, ApiVersion4);
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index d1e4bb3d81..f6db0ff134 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -244,7 +244,7 @@ void TPullQueueConsumerCommand::DoExecute(ICommandContextPtr context)
////////////////////////////////////////////////////////////////////////////////
-void TAdvanceConsumerCommand::Register(TRegistrar registrar)
+void TAdvanceQueueConsumerCommand::Register(TRegistrar registrar)
{
registrar.Parameter("consumer_path", &TThis::ConsumerPath);
registrar.Parameter("queue_path", &TThis::QueuePath);
@@ -256,14 +256,14 @@ void TAdvanceConsumerCommand::Register(TRegistrar registrar)
.Optional();
}
-void TAdvanceConsumerCommand::DoExecute(ICommandContextPtr context)
+void TAdvanceQueueConsumerCommand::DoExecute(ICommandContextPtr context)
{
auto transaction = GetTransaction(context);
if (ClientSide.value_or(false)) {
transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset);
} else {
- WaitFor(transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset, /*options*/ {}))
+ WaitFor(transaction->AdvanceQueueConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset, /*options*/ {}))
.ThrowOnError();
}
diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h
index 85ea7637af..249ef9bddb 100644
--- a/yt/yt/client/driver/queue_commands.h
+++ b/yt/yt/client/driver/queue_commands.h
@@ -100,15 +100,15 @@ private:
////////////////////////////////////////////////////////////////////////////////
-struct TAdvanceConsumerOptions
+struct TAdvanceQueueConsumerOptions
: public TTabletWriteOptions
{ };
-class TAdvanceConsumerCommand
- : public TTypedCommand<TAdvanceConsumerOptions>
+class TAdvanceQueueConsumerCommand
+ : public TTypedCommand<TAdvanceQueueConsumerOptions>
{
public:
- REGISTER_YSON_STRUCT_LITE(TAdvanceConsumerCommand);
+ REGISTER_YSON_STRUCT_LITE(TAdvanceQueueConsumerCommand);
static void Register(TRegistrar registrar);
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index 4f60ac5e0b..b5f79cee42 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -81,14 +81,14 @@ public:
TSharedRange<TRowModification> modifications,
const TModifyRowsOptions& options) override;
- using TQueueTransactionMixin::AdvanceConsumer;
- TFuture<void> AdvanceConsumer(
+ using TQueueTransactionMixin::AdvanceQueueConsumer;
+ TFuture<void> AdvanceQueueConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset,
- const TAdvanceConsumerOptions& options) override;
+ const TAdvanceQueueConsumerOptions& options) override;
TFuture<TPushQueueProducerResult> PushQueueProducer(
const NYPath::TRichYPath& producerPath,
@@ -516,7 +516,7 @@ TRANSACTION_METHOD_IMPL(void, Abort, (const TTransactionAbortOptions&));
TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
-TRANSACTION_METHOD_IMPL(void, AdvanceConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceConsumerOptions&));
+TRANSACTION_METHOD_IMPL(void, AdvanceQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceQueueConsumerOptions&));
TRANSACTION_METHOD_IMPL(TPushQueueProducerResult, PushQueueProducer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TString&, i64, NTableClient::TNameTablePtr, TSharedRange<NTableClient::TUnversionedRow>, const TPushQueueProducerOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index 60ad83bef3..bc0191bf63 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -199,14 +199,14 @@ public:
MOCK_METHOD(void, SubscribeAborted, (const TAbortedHandler& callback), (override));
MOCK_METHOD(void, UnsubscribeAborted, (const TAbortedHandler& callback), (override));
- using TQueueTransactionMixin::AdvanceConsumer;
- MOCK_METHOD(TFuture<void>, AdvanceConsumer, (
+ using TQueueTransactionMixin::AdvanceQueueConsumer;
+ MOCK_METHOD(TFuture<void>, AdvanceQueueConsumer, (
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset,
- const TAdvanceConsumerOptions& options), (override));
+ const TAdvanceQueueConsumerOptions& options), (override));
MOCK_METHOD(void, ModifyRows, (
const NYPath::TYPath& path,
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 24995c479b..0c3a43469b 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -644,7 +644,7 @@ message TRspSelectRows
////////////////////////////////////////////////////////////////////////////////
-message TReqAdvanceConsumer
+message TReqAdvanceQueueConsumer
{
optional NYT.NProto.TGuid transaction_id = 1;
optional string consumer_path = 2;
@@ -654,7 +654,7 @@ message TReqAdvanceConsumer
optional int64 new_offset = 6;
}
-message TRspAdvanceConsumer
+message TRspAdvanceQueueConsumer
{
}