diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-02-15 10:17:25 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-02-15 10:17:25 +0300 |
commit | 8d5870f5f3b8a67afd44c3856560d521e6255bd0 (patch) | |
tree | 6b089c18a0b956b38b61db74fc23649be3f58a0c | |
parent | 3a5bd6c0e387a0b040704b7ced45acf59e2c78a0 (diff) | |
download | ydb-8d5870f5f3b8a67afd44c3856560d521e6255bd0.tar.gz |
commit offset method implementation
commit offset method implementation
4 files changed, 53 insertions, 10 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index eddcade51c..f5d1846bc5 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -319,7 +319,8 @@ std::shared_ptr<IReadSession> TTopicClient::CreateReadSession(const TReadSession return Impl_->CreateReadSession(settings); } -std::shared_ptr<ISimpleBlockingWriteSession> TTopicClient::CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings) { +std::shared_ptr<ISimpleBlockingWriteSession> TTopicClient::CreateSimpleBlockingWriteSession( + const TWriteSessionSettings& settings) { return Impl_->CreateSimpleWriteSession(settings); } @@ -327,4 +328,9 @@ std::shared_ptr<IWriteSession> TTopicClient::CreateWriteSession(const TWriteSess return Impl_->CreateWriteSession(settings); } +TAsyncStatus TTopicClient::CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset, + const TCommitOffsetSettings& settings) { + return Impl_->CommitOffset(path, partitionId, consumerName, offset, settings); +} + } // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h index 4bd98de0ff..d893dc4fe2 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h @@ -232,6 +232,20 @@ public: return promise.GetFuture(); } + TAsyncStatus CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset, + const TCommitOffsetSettings& settings) { + Ydb::Topic::CommitOffsetRequest request = MakeOperationRequest<Ydb::Topic::CommitOffsetRequest>(settings); + request.set_path(path); + request.set_partition_id(partitionId); + request.set_consumer(consumerName); + request.set_offset(offset); + + return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse>( + std::move(request), + &Ydb::Topic::V1::TopicService::Stub::AsyncCommitOffset, + TRpcRequestSettings::Make(settings)); + } + // Runtime API. std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings); std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleWriteSession(const TWriteSessionSettings& settings); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index fce6adf49a..f601a8102d 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -256,11 +256,9 @@ private: struct TAlterConsumerSettings; struct TAlterTopicSettings; -struct TCreateTopicSettings; -typedef TAlterAttributesBuilderImpl<TAlterConsumerSettings> TAlterConsumerAttributesBuilder; - -typedef TAlterAttributesBuilderImpl<TAlterTopicSettings> TAlterTopicAttributesBuilder; +using TAlterConsumerAttributesBuilder = TAlterAttributesBuilderImpl<TAlterConsumerSettings>; +using TAlterTopicAttributesBuilder = TAlterAttributesBuilderImpl<TAlterTopicSettings>; template<class TSettings> struct TConsumerSettings { @@ -482,14 +480,21 @@ struct TDropTopicSettings : public TOperationRequestSettings<TDropTopicSettings> // Settings for describe resource request. struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> { + using TSelf = TDescribeTopicSettings; + FLUENT_SETTING_DEFAULT(bool, IncludeStats, false); }; // Settings for describe resource request. struct TDescribeConsumerSettings : public TOperationRequestSettings<TDescribeConsumerSettings> { + using TSelf = TDescribeConsumerSettings; + FLUENT_SETTING_DEFAULT(bool, IncludeStats, false); }; +// Settings for commit offset request. +struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSettings> {}; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //! Session metainformation. @@ -1527,16 +1532,17 @@ public: TAsyncStatus CreateTopic(const TString& path, const TCreateTopicSettings& settings = {}); // Update a topic. - TAsyncStatus AlterTopic(const TString& path, const TAlterTopicSettings& = {}); + TAsyncStatus AlterTopic(const TString& path, const TAlterTopicSettings& settings = {}); // Delete a topic. - TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& = {}); + TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& settings = {}); // Describe settings of topic. - TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& = {}); + TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& settings = {}); // Describe settings of topic's consumer. - TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& = {}); + TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, + const TDescribeConsumerSettings& settings = {}); //! Create read session. std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings); @@ -1545,6 +1551,10 @@ public: std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings); std::shared_ptr<IWriteSession> CreateWriteSession(const TWriteSessionSettings& settings); + // Commit offset + TAsyncStatus CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset, + const TCommitOffsetSettings& settings = {}); + private: std::shared_ptr<TImpl> Impl_; }; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp index 68c631a58b..286fd722d7 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -62,7 +62,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { // Create read session. NYdb::NTopic::TReadSessionSettings readSettings; readSettings - .ConsumerName(setup->GetTestClient()) + .ConsumerName("shared/user") .MaxMemoryUsageBytes(1_MB) .AppendTopics(setup->GetTestTopic()); @@ -93,6 +93,19 @@ Y_UNIT_TEST_SUITE(BasicUsage) { f.GetValueSync(); ReadSession->Close(TDuration::MilliSeconds(10)); AtomicSet(check, 0); + + auto status = topicClient.CommitOffset(setup->GetTestTopic(), 0, "shared/user", 50); + UNIT_ASSERT(status.GetValueSync().IsSuccess()); + + auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true); + auto result = topicClient.DescribeConsumer("/Root/PQ/rt3.dc1--topic1", "shared/user", describeConsumerSettings).GetValueSync(); + UNIT_ASSERT(result.IsSuccess()); + + auto description = result.GetConsumerDescription(); + UNIT_ASSERT(description.GetPartitions().size() == 1); + auto stats = description.GetPartitions().front().GetPartitionConsumerStats(); + UNIT_ASSERT(stats.Defined()); + UNIT_ASSERT(stats->GetCommittedOffset() == 50); } |