aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-02-15 10:17:25 +0300
committerildar-khisam <ikhis@ydb.tech>2023-02-15 10:17:25 +0300
commit8d5870f5f3b8a67afd44c3856560d521e6255bd0 (patch)
tree6b089c18a0b956b38b61db74fc23649be3f58a0c
parent3a5bd6c0e387a0b040704b7ced45acf59e2c78a0 (diff)
downloadydb-8d5870f5f3b8a67afd44c3856560d521e6255bd0.tar.gz
commit offset method implementation
commit offset method implementation
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h14
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h26
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp15
4 files changed, 53 insertions, 10 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
index eddcade51c..f5d1846bc5 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
@@ -319,7 +319,8 @@ std::shared_ptr<IReadSession> TTopicClient::CreateReadSession(const TReadSession
return Impl_->CreateReadSession(settings);
}
-std::shared_ptr<ISimpleBlockingWriteSession> TTopicClient::CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings) {
+std::shared_ptr<ISimpleBlockingWriteSession> TTopicClient::CreateSimpleBlockingWriteSession(
+ const TWriteSessionSettings& settings) {
return Impl_->CreateSimpleWriteSession(settings);
}
@@ -327,4 +328,9 @@ std::shared_ptr<IWriteSession> TTopicClient::CreateWriteSession(const TWriteSess
return Impl_->CreateWriteSession(settings);
}
+TAsyncStatus TTopicClient::CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset,
+ const TCommitOffsetSettings& settings) {
+ return Impl_->CommitOffset(path, partitionId, consumerName, offset, settings);
+}
+
} // namespace NYdb::NTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
index 4bd98de0ff..d893dc4fe2 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
@@ -232,6 +232,20 @@ public:
return promise.GetFuture();
}
+ TAsyncStatus CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset,
+ const TCommitOffsetSettings& settings) {
+ Ydb::Topic::CommitOffsetRequest request = MakeOperationRequest<Ydb::Topic::CommitOffsetRequest>(settings);
+ request.set_path(path);
+ request.set_partition_id(partitionId);
+ request.set_consumer(consumerName);
+ request.set_offset(offset);
+
+ return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse>(
+ std::move(request),
+ &Ydb::Topic::V1::TopicService::Stub::AsyncCommitOffset,
+ TRpcRequestSettings::Make(settings));
+ }
+
// Runtime API.
std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings);
std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleWriteSession(const TWriteSessionSettings& settings);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index fce6adf49a..f601a8102d 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -256,11 +256,9 @@ private:
struct TAlterConsumerSettings;
struct TAlterTopicSettings;
-struct TCreateTopicSettings;
-typedef TAlterAttributesBuilderImpl<TAlterConsumerSettings> TAlterConsumerAttributesBuilder;
-
-typedef TAlterAttributesBuilderImpl<TAlterTopicSettings> TAlterTopicAttributesBuilder;
+using TAlterConsumerAttributesBuilder = TAlterAttributesBuilderImpl<TAlterConsumerSettings>;
+using TAlterTopicAttributesBuilder = TAlterAttributesBuilderImpl<TAlterTopicSettings>;
template<class TSettings>
struct TConsumerSettings {
@@ -482,14 +480,21 @@ struct TDropTopicSettings : public TOperationRequestSettings<TDropTopicSettings>
// Settings for describe resource request.
struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> {
+ using TSelf = TDescribeTopicSettings;
+
FLUENT_SETTING_DEFAULT(bool, IncludeStats, false);
};
// Settings for describe resource request.
struct TDescribeConsumerSettings : public TOperationRequestSettings<TDescribeConsumerSettings> {
+ using TSelf = TDescribeConsumerSettings;
+
FLUENT_SETTING_DEFAULT(bool, IncludeStats, false);
};
+// Settings for commit offset request.
+struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSettings> {};
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//! Session metainformation.
@@ -1527,16 +1532,17 @@ public:
TAsyncStatus CreateTopic(const TString& path, const TCreateTopicSettings& settings = {});
// Update a topic.
- TAsyncStatus AlterTopic(const TString& path, const TAlterTopicSettings& = {});
+ TAsyncStatus AlterTopic(const TString& path, const TAlterTopicSettings& settings = {});
// Delete a topic.
- TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& = {});
+ TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& settings = {});
// Describe settings of topic.
- TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& = {});
+ TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& settings = {});
// Describe settings of topic's consumer.
- TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& = {});
+ TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer,
+ const TDescribeConsumerSettings& settings = {});
//! Create read session.
std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings);
@@ -1545,6 +1551,10 @@ public:
std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings);
std::shared_ptr<IWriteSession> CreateWriteSession(const TWriteSessionSettings& settings);
+ // Commit offset
+ TAsyncStatus CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset,
+ const TCommitOffsetSettings& settings = {});
+
private:
std::shared_ptr<TImpl> Impl_;
};
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
index 68c631a58b..286fd722d7 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
@@ -62,7 +62,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
// Create read session.
NYdb::NTopic::TReadSessionSettings readSettings;
readSettings
- .ConsumerName(setup->GetTestClient())
+ .ConsumerName("shared/user")
.MaxMemoryUsageBytes(1_MB)
.AppendTopics(setup->GetTestTopic());
@@ -93,6 +93,19 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
f.GetValueSync();
ReadSession->Close(TDuration::MilliSeconds(10));
AtomicSet(check, 0);
+
+ auto status = topicClient.CommitOffset(setup->GetTestTopic(), 0, "shared/user", 50);
+ UNIT_ASSERT(status.GetValueSync().IsSuccess());
+
+ auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
+ auto result = topicClient.DescribeConsumer("/Root/PQ/rt3.dc1--topic1", "shared/user", describeConsumerSettings).GetValueSync();
+ UNIT_ASSERT(result.IsSuccess());
+
+ auto description = result.GetConsumerDescription();
+ UNIT_ASSERT(description.GetPartitions().size() == 1);
+ auto stats = description.GetPartitions().front().GetPartitionConsumerStats();
+ UNIT_ASSERT(stats.Defined());
+ UNIT_ASSERT(stats->GetCommittedOffset() == 50);
}