diff options
| author | nadya73 <[email protected]> | 2023-12-13 19:20:14 +0300 |
|---|---|---|
| committer | nadya73 <[email protected]> | 2023-12-13 22:57:00 +0300 |
| commit | 58ba8bb28c76ab2173c65407ff16e40d52b7e543 (patch) | |
| tree | b2336fa9eb02c1fd21a8070958f249bf029890da | |
| parent | 0bfea175d9013a083f56cfbbfc39c230300baf73 (diff) | |
YT-20425: Optional offset in pull_consumer
| -rw-r--r-- | yt/yt/client/api/delegating_client.cpp | 2 | ||||
| -rw-r--r-- | yt/yt/client/api/delegating_client.h | 2 | ||||
| -rw-r--r-- | yt/yt/client/api/queue_client.h | 2 | ||||
| -rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 6 | ||||
| -rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.h | 2 | ||||
| -rw-r--r-- | yt/yt/client/driver/queue_commands.cpp | 3 | ||||
| -rw-r--r-- | yt/yt/client/driver/queue_commands.h | 2 | ||||
| -rw-r--r-- | yt/yt/client/federated/client.cpp | 4 | ||||
| -rw-r--r-- | yt/yt/client/hedging/hedging.cpp | 2 | ||||
| -rw-r--r-- | yt/yt/client/unittests/mock/client.h | 2 |
10 files changed, 15 insertions, 12 deletions
diff --git a/yt/yt/client/api/delegating_client.cpp b/yt/yt/client/api/delegating_client.cpp index b834977abab..75f40065024 100644 --- a/yt/yt/client/api/delegating_client.cpp +++ b/yt/yt/client/api/delegating_client.cpp @@ -98,7 +98,7 @@ TFuture<NQueueClient::IQueueRowsetPtr> TDelegatingClient::PullQueue( TFuture<NQueueClient::IQueueRowsetPtr> TDelegatingClient::PullConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, - i64 offset, + std::optional<i64> offset, int partitionIndex, const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, const TPullConsumerOptions& options) diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index c3125400eb3..07429b103f8 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -72,7 +72,7 @@ public: TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, - i64 offset, + std::optional<i64> offset, int partitionIndex, const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, const TPullConsumerOptions& options = {}) override; diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h index 3a2d8c70e2c..a19aef2b907 100644 --- a/yt/yt/client/api/queue_client.h +++ b/yt/yt/client/api/queue_client.h @@ -99,7 +99,7 @@ struct IQueueClient virtual TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, - i64 offset, + std::optional<i64> offset, int partitionIndex, const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, const TPullConsumerOptions& options = {}) = 0; diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index 93a763adaa2..aa76339c5f5 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -745,7 +745,7 @@ TFuture<IQueueRowsetPtr> TClient::PullQueue( TFuture<IQueueRowsetPtr> TClient::PullConsumer( const TRichYPath& consumerPath, const TRichYPath& queuePath, - i64 offset, + std::optional<i64> offset, int partitionIndex, const TQueueRowBatchReadOptions& rowBatchReadOptions, const TPullConsumerOptions& options) @@ -758,7 +758,9 @@ TFuture<IQueueRowsetPtr> TClient::PullConsumer( ToProto(req->mutable_consumer_path(), consumerPath); ToProto(req->mutable_queue_path(), queuePath); - req->set_offset(offset); + if (offset) { + req->set_offset(*offset); + } req->set_partition_index(partitionIndex); ToProto(req->mutable_row_batch_read_options(), rowBatchReadOptions); diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index 9820d770b4c..e6ab691e43c 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -140,7 +140,7 @@ public: TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, - i64 offset, + std::optional<i64> offset, int partitionIndex, const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, const TPullConsumerOptions& options = {}) override; diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index 8aaebcfd522..e2aaf35e7c1 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -98,7 +98,8 @@ void TListQueueConsumerRegistrationsCommand::DoExecute(ICommandContextPtr contex void TPullQueueCommand::Register(TRegistrar registrar) { registrar.Parameter("queue_path", &TThis::QueuePath); - registrar.Parameter("offset", &TThis::Offset); + registrar.Parameter("offset", &TThis::Offset) + .Optional(); registrar.Parameter("partition_index", &TThis::PartitionIndex); registrar.ParameterWithUniversalAccessor<i64>( diff --git a/yt/yt/client/driver/queue_commands.h b/yt/yt/client/driver/queue_commands.h index 3e5c3a1318e..db78db9143a 100644 --- a/yt/yt/client/driver/queue_commands.h +++ b/yt/yt/client/driver/queue_commands.h @@ -91,7 +91,7 @@ public: private: NYPath::TRichYPath ConsumerPath; NYPath::TRichYPath QueuePath; - i64 Offset; + std::optional<i64> Offset; int PartitionIndex; NQueueClient::TQueueRowBatchReadOptions RowBatchReadOptions; diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index 8cfcddd3c21..98efe5b9a70 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -258,7 +258,7 @@ public: TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer( const NYPath::TRichYPath&, const NYPath::TRichYPath&, - i64, + std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&) override; @@ -638,7 +638,7 @@ CLIENT_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookup, (cons CLIENT_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); CLIENT_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&)); -CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&)); +CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&)); CLIENT_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&)); CLIENT_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&)); CLIENT_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index b0ca0d55e31..f27c17cc78f 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -78,7 +78,7 @@ public: RETRYABLE_METHOD(TFuture<TVersionedLookupRowsResult>, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); RETRYABLE_METHOD(TFuture<TSelectRowsResult>, SelectRows, (const TString&, const TSelectRowsOptions&)); RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&)); - RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&)); + RETRYABLE_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&)); RETRYABLE_METHOD(TFuture<void>, RegisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, bool, const TRegisterQueueConsumerOptions&)); RETRYABLE_METHOD(TFuture<void>, UnregisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TUnregisterQueueConsumerOptions&)); RETRYABLE_METHOD(TFuture<std::vector<TListQueueConsumerRegistrationsResult>>, ListQueueConsumerRegistrations, (const std::optional<NYPath::TRichYPath>&, const std::optional<NYPath::TRichYPath>&, const TListQueueConsumerRegistrationsOptions&)); diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index 3cabe9d56b0..49dbe4c7417 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -65,7 +65,7 @@ public: MOCK_METHOD(TFuture<NQueueClient::IQueueRowsetPtr>, PullConsumer, ( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, - i64 offset, + std::optional<i64> offset, int partitionIndex, const NQueueClient::TQueueRowBatchReadOptions& rowBatchReadOptions, const TPullConsumerOptions& options), (override)); |
