aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick88 <alexnick@ydb.tech>2024-12-06 12:21:20 +0300
committerGitHub <noreply@github.com>2024-12-06 09:21:20 +0000
commit7b9edeca21a1eaf386c7c3493ecda9891f2cda24 (patch)
treec3c3e128d63cecbe4314a5b4261ce03bf90148be
parent2fa847a0dce97157be1e91ab2c676f6e6fb76fdd (diff)
downloadydb-7b9edeca21a1eaf386c7c3493ecda9891f2cda24.tar.gz
Add more fields in describe consumer in cpp sdk (#12347)
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp15
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp17
3 files changed, 37 insertions, 1 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
index 4eec57ed050..240cf338262 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
@@ -351,6 +351,9 @@ TPartitionConsumerStats::TPartitionConsumerStats(const Ydb::Topic::DescribeConsu
, LastReadOffset_(partitionStats.last_read_offset())
, ReaderName_(partitionStats.reader_name())
, ReadSessionId_(partitionStats.read_session_id())
+ , LastReadTime_(TInstant::Seconds(partitionStats.last_read_time().seconds()))
+ , MaxReadTimeLag_(TDuration::Seconds(partitionStats.max_read_time_lag().seconds()))
+ , MaxWriteTimeLag_(TDuration::Seconds(partitionStats.max_write_time_lag().seconds()))
{}
ui64 TPartitionConsumerStats::GetCommittedOffset() const {
@@ -369,6 +372,18 @@ TString TPartitionConsumerStats::GetReadSessionId() const {
return ReadSessionId_;
}
+const TInstant& TPartitionConsumerStats::GetLastReadTime() const {
+ return LastReadTime_;
+}
+
+const TDuration& TPartitionConsumerStats::GetMaxReadTimeLag() const {
+ return MaxReadTimeLag_;
+}
+
+const TDuration& TPartitionConsumerStats::GetMaxWriteTimeLag() const {
+ return MaxWriteTimeLag_;
+}
+
TPartitionLocation::TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation)
: NodeId_(partitionLocation.node_id())
, Generation_(partitionLocation.generation())
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h b/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h
index 4f6adad457f..6044340845c 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h
@@ -108,12 +108,18 @@ public:
ui64 GetLastReadOffset() const;
TString GetReaderName() const;
TString GetReadSessionId() const;
+ const TInstant& GetLastReadTime() const;
+ const TDuration& GetMaxReadTimeLag() const;
+ const TDuration& GetMaxWriteTimeLag() const;
private:
ui64 CommittedOffset_;
i64 LastReadOffset_;
TString ReaderName_;
TString ReadSessionId_;
+ TInstant LastReadTime_;
+ TDuration MaxReadTimeLag_;
+ TDuration MaxWriteTimeLag_;
};
// Topic partition location
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
index 1abc34c2f1a..5532e6d9a10 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
@@ -146,6 +146,7 @@ namespace NYdb::NTopic::NTests {
UNIT_ASSERT_GT(consumerStats->GetCommittedOffset(), 0);
UNIT_ASSERT_GE(consumerStats->GetReadSessionId(), 0);
UNIT_ASSERT_VALUES_EQUAL(consumerStats->GetReaderName(), "");
+ UNIT_ASSERT_GE(consumerStats->GetMaxWriteTimeLag(), TDuration::Seconds(100));
} else {
UNIT_ASSERT_VALUES_EQUAL(stats->GetStartOffset(), 0);
UNIT_ASSERT_VALUES_EQUAL(consumerStats->GetLastReadOffset(), 0);
@@ -284,7 +285,7 @@ namespace NYdb::NTopic::NTests {
std::string message(32_MB, 'x');
for(size_t i = 0; i < messagesCount; ++i) {
- UNIT_ASSERT(writeSession->Write(message));
+ UNIT_ASSERT(writeSession->Write(message, {}, TInstant::Now() - TDuration::Seconds(100)));
}
writeSession->Close();
}
@@ -326,7 +327,21 @@ namespace NYdb::NTopic::NTests {
}
}
+ // Additional write
+ {
+ auto writeSettings = TWriteSessionSettings().Path(TEST_TOPIC).MessageGroupId(TEST_MESSAGE_GROUP_ID).Codec(ECodec::RAW);
+ auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
+ std::string message(32, 'x');
+
+ for(size_t i = 0; i < messagesCount; ++i) {
+ UNIT_ASSERT(writeSession->Write(message));
+ }
+ writeSession->Close();
+ }
+ Sleep(TDuration::Seconds(3));
+
// Get non-empty description
+
DescribeTopic(setup, client, true, true, false, false);
DescribeConsumer(setup, client, true, true, false, false);
DescribePartition(setup, client, true, true, false, false);