summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <[email protected]>2023-12-13 19:20:14 +0300
committernadya73 <[email protected]>2023-12-13 22:57:00 +0300
commit58ba8bb28c76ab2173c65407ff16e40d52b7e543 (patch)
treeb2336fa9eb02c1fd21a8070958f249bf029890da
parent0bfea175d9013a083f56cfbbfc39c230300baf73 (diff)
YT-20425: Optional offset in pull_consumer
-rw-r--r--yt/yt/client/api/delegating_client.cpp2
-rw-r--r--yt/yt/client/api/delegating_client.h2
-rw-r--r--yt/yt/client/api/queue_client.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp6
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h2
-rw-r--r--yt/yt/client/driver/queue_commands.cpp3
-rw-r--r--yt/yt/client/driver/queue_commands.h2
-rw-r--r--yt/yt/client/federated/client.cpp4
-rw-r--r--yt/yt/client/hedging/hedging.cpp2
-rw-r--r--yt/yt/client/unittests/mock/client.h2
10 files changed, 15 insertions, 12 deletions
diff --git a/yt/yt/client/api/delegating_client.cpp b/yt/yt/client/api/delegating_client.cpp
index b834977abab..75f40065024 100644
--- a/yt/yt/client/api/delegating_client.cpp
+++ b/yt/yt/client/api/delegating_client.cpp
@@ -98,7 +98,7 @@ TFuture<NQueueClient::IQueueRowsetPtr> TDelegatingClient::PullQueue(
TFuture<NQueueClient::IQueueRowsetPtr> TDelegatingClient::PullConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
- i64 offset,
+ std::optional<i64> offset,
int partitionIndex,
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
const TPullConsumerOptions& options)
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index c3125400eb3..07429b103f8 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -72,7 +72,7 @@ public:
TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
- i64 offset,
+ std::optional<i64> offset,
int partitionIndex,
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
const TPullConsumerOptions& options = {}) override;
diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h
index 3a2d8c70e2c..a19aef2b907 100644
--- a/yt/yt/client/api/queue_client.h
+++ b/yt/yt/client/api/queue_client.h
@@ -99,7 +99,7 @@ struct IQueueClient
virtual TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
- i64 offset,
+ std::optional<i64> offset,
int partitionIndex,
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
const TPullConsumerOptions& options = {}) = 0;
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 93a763adaa2..aa76339c5f5 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -745,7 +745,7 @@ TFuture<IQueueRowsetPtr> TClient::PullQueue(
TFuture<IQueueRowsetPtr> TClient::PullConsumer(
const TRichYPath& consumerPath,
const TRichYPath& queuePath,
- i64 offset,
+ std::optional<i64> offset,
int partitionIndex,
const TQueueRowBatchReadOptions& rowBatchReadOptions,
const TPullConsumerOptions& options)
@@ -758,7 +758,9 @@ TFuture<IQueueRowsetPtr> TClient::PullConsumer(
ToProto(req->mutable_consumer_path(), consumerPath);
ToProto(req->mutable_queue_path(), queuePath);
- req->set_offset(offset);
+ if (offset) {
+ req->set_offset(*offset);
+ }
req->set_partition_index(partitionIndex);
ToProto(req->mutable_row_batch_read_options(), rowBatchReadOptions);
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index 9820d770b4c..e6ab691e43c 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -140,7 +140,7 @@ public:
TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
- i64 offset,
+ std::optional<i64> offset,
int partitionIndex,
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
const TPullConsumerOptions& options = {}) override;
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index 8aaebcfd522..e2aaf35e7c1 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -98,7 +98,8 @@ void TListQueueConsumerRegistrationsCommand::DoExecute(ICommandContextPtr contex
void TPullQueueCommand::Register(TRegistrar registrar)
{
registrar.Parameter("queue_path", &TThis::QueuePath);
- registrar.Parameter("offset", &TThis::Offset);
+ registrar.Parameter("offset", &TThis::Offset)
+ .Optional();
registrar.Parameter("partition_index", &TThis::PartitionIndex);
registrar.ParameterWithUniversalAccessor<i64>(
diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h
index 3e5c3a1318e..db78db9143a 100644
--- a/yt/yt/client/driver/queue_commands.h
+++ b/yt/yt/client/driver/queue_commands.h
@@ -91,7 +91,7 @@ public:
private:
NYPath::TRichYPath ConsumerPath;
NYPath::TRichYPath QueuePath;
- i64 Offset;
+ std::optional<i64> Offset;
int PartitionIndex;
NQueueClient::TQueueRowBatchReadOptions RowBatchReadOptions;
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index 8cfcddd3c21..98efe5b9a70 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -258,7 +258,7 @@ public:
TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer(
const NYPath::TRichYPath&,
const NYPath::TRichYPath&,
- i64,
+ std::optional<i64>,
int,
const NQueueClient::TQueueRowBatchReadOptions&,
const TPullConsumerOptions&) override;
@@ -638,7 +638,7 @@ CLIENT_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookup, (cons
CLIENT_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
CLIENT_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&));
-CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&));
+CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&));
CLIENT_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
CLIENT_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
CLIENT_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index b0ca0d55e31..f27c17cc78f 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -78,7 +78,7 @@ public:
RETRYABLE_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
RETRYABLE_METHOD(TFuture<TSelectRowsResult>, SelectRows, (const TString&, const TSelectRowsOptions&));
RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&));
- RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&));
+ RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&));
RETRYABLE_METHOD(TFuture<void>, RegisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, bool, const TRegisterQueueConsumerOptions&));
RETRYABLE_METHOD(TFuture<void>, UnregisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TUnregisterQueueConsumerOptions&));
RETRYABLE_METHOD(TFuture<std::vector<TListQueueConsumerRegistrationsResult>>, ListQueueConsumerRegistrations, (const std::optional<NYPath::TRichYPath>&, const std::optional<NYPath::TRichYPath>&, const TListQueueConsumerRegistrationsOptions&));
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index 3cabe9d56b0..49dbe4c7417 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -65,7 +65,7 @@ public:
MOCK_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
- i64 offset,
+ std::optional<i64> offset,
int partitionIndex,
const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions,
const TPullConsumerOptions& options), (override));