aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-07-25 19:31:28 +0300
committerroot <root@qavm-2ed34686.qemu>2023-07-25 19:31:28 +0300
commitdd49b96fb21fbea107931e3ca4f1d9b292241969 (patch)
treeb02fefe5eb8aa412cfdda30a6563846dd372f4c5
parent6780bb36eaf2c3d3fd68bc20ddc754969d90c5b8 (diff)
downloadydb-dd49b96fb21fbea107931e3ca4f1d9b292241969.tar.gz
Describe topic, consumer, partition in SDK
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h12
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp51
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h44
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h74
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp313
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp10
6 files changed, 438 insertions, 66 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
index e64aeee9525..2f75531305c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
@@ -199,6 +199,7 @@ public:
Server.AnnoyingClient->KickNodeInHive(Server.CleverServer->GetRuntime(), i);
}
}
+
void AllowTablets() {
for (ui32 i = 0; i < Server.CleverServer->StaticNodes() + Server.CleverServer->DynamicNodes(); i++) {
Server.AnnoyingClient->MarkNodeInHive(Server.CleverServer->GetRuntime(), i, true);
@@ -214,5 +215,16 @@ public:
UNIT_ASSERT_C(describeResult->Record.GetPathDescription().HasPersQueueGroup(), describeResult->Record);
Server.AnnoyingClient->KillTablet(*Server.CleverServer, describeResult->Record.GetPathDescription().GetPersQueueGroup().GetBalancerTabletID());
}
+
+ void KillTopicTablets(const TString& topicName) {
+ auto pqGroup = Server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription().GetPersQueueGroup();
+
+ THashSet<ui64> restartedTablets;
+ for (const auto& p : pqGroup.GetPartitions())
+ if (restartedTablets.insert(p.GetTabletId()).second)
+ Server.AnnoyingClient->KillTablet(*Server.CleverServer, p.GetTabletId());
+
+ Server.CleverServer->GetRuntime()->DispatchEvents();
+ }
};
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
index 35869b47aa4..a820288af02 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
@@ -37,6 +37,16 @@ const TConsumerDescription& TDescribeConsumerResult::GetConsumerDescription() co
return ConsumerDescription_;
}
+TDescribePartitionResult::TDescribePartitionResult(TStatus&& status, Ydb::Topic::DescribePartitionResult&& result)
+ : TStatus(std::move(status))
+ , PartitionDescription_(std::move(result))
+{
+}
+
+const TPartitionDescription& TDescribePartitionResult::GetPartitionDescription() const {
+ return PartitionDescription_;
+}
+
TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result)
: Proto_(std::move(result))
, PartitioningSettings_(Proto_.partitioning_settings())
@@ -74,6 +84,11 @@ TConsumerDescription::TConsumerDescription(Ydb::Topic::DescribeConsumerResult&&
}
}
+TPartitionDescription::TPartitionDescription(Ydb::Topic::DescribePartitionResult&& result)
+ : Proto_(std::move(result))
+ , Partition_(Proto_.partition())
+{
+}
TConsumer::TConsumer(const Ydb::Topic::Consumer& consumer)
: ConsumerName_(consumer.name())
@@ -124,6 +139,10 @@ const TVector<TPartitionInfo>& TConsumerDescription::GetPartitions() const {
return Partitions_;
}
+const TPartitionInfo& TPartitionDescription::GetPartition() const {
+ return Partition_;
+}
+
const TConsumer& TConsumerDescription::GetConsumer() const {
return Consumer_;
}
@@ -173,6 +192,10 @@ const Ydb::Topic::DescribeConsumerResult& TConsumerDescription::GetProto() const
return Proto_;
}
+const Ydb::Topic::DescribePartitionResult& TPartitionDescription::GetProto() const {
+ return Proto_;
+}
+
const TString& TTopicDescription::GetOwner() const {
return Owner_;
}
@@ -305,6 +328,19 @@ TString TPartitionConsumerStats::GetReadSessionId() const {
return ReadSessionId_;
}
+TPartitionLocation::TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation)
+ : NodeId_(partitionLocation.node_id())
+ , Generation_(partitionLocation.generation())
+{
+}
+
+i32 TPartitionLocation::GetNodeId() const {
+ return NodeId_;
+}
+
+i64 TPartitionLocation::GetGeneration() const {
+ return Generation_;
+}
TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo)
: PartitionId_(partitionInfo.partition_id())
@@ -321,6 +357,10 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI
if (partitionInfo.has_partition_stats()) {
PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()};
}
+
+ if (partitionInfo.has_partition_location()) {
+ PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()};
+ }
}
TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo)
@@ -339,6 +379,9 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::Partiti
PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()};
PartitionConsumerStats_ = TPartitionConsumerStats{partitionInfo.partition_consumer_stats()};
}
+ if (partitionInfo.has_partition_location()) {
+ PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()};
+ }
}
const TMaybe<TPartitionStats>& TPartitionInfo::GetPartitionStats() const {
@@ -349,6 +392,10 @@ const TMaybe<TPartitionConsumerStats>& TPartitionInfo::GetPartitionConsumerStats
return PartitionConsumerStats_;
}
+const TMaybe<TPartitionLocation>& TPartitionInfo::GetPartitionLocation() const {
+ return PartitionLocation_;
+}
+
bool TPartitionInfo::GetActive() const {
return Active_;
}
@@ -379,6 +426,10 @@ TAsyncDescribeConsumerResult TTopicClient::DescribeConsumer(const TString& path,
return Impl_->DescribeConsumer(path, consumer, settings);
}
+TAsyncDescribePartitionResult TTopicClient::DescribePartition(const TString& path, i64 partitionId, const TDescribePartitionSettings& settings) {
+ return Impl_->DescribePartition(path, partitionId, settings);
+}
+
IRetryPolicy::TPtr IRetryPolicy::GetDefaultPolicy() {
static IRetryPolicy::TPtr policy = GetExponentialBackoffPolicy();
return policy;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
index 5c42758df99..49bf5719762 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
@@ -194,6 +194,10 @@ public:
request.set_include_stats(true);
}
+ if (settings.IncludeLocation_) {
+ request.set_include_location(true);
+ }
+
auto promise = NThreading::NewPromise<TDescribeTopicResult>();
auto extractor = [promise]
@@ -227,6 +231,10 @@ public:
request.set_include_stats(true);
}
+ if (settings.IncludeLocation_) {
+ request.set_include_location(true);
+ }
+
auto promise = NThreading::NewPromise<TDescribeConsumerResult>();
auto extractor = [promise]
@@ -251,6 +259,42 @@ public:
return promise.GetFuture();
}
+ TAsyncDescribePartitionResult DescribePartition(const TString& path, i64 partitionId, const TDescribePartitionSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::Topic::DescribePartitionRequest>(settings);
+ request.set_path(path);
+ request.set_partition_id(partitionId);
+
+ if (settings.IncludeStats_) {
+ request.set_include_stats(true);
+ }
+
+ if (settings.IncludeLocation_) {
+ request.set_include_location(true);
+ }
+
+ auto promise = NThreading::NewPromise<TDescribePartitionResult>();
+
+ auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable {
+ Ydb::Topic::DescribePartitionResult result;
+ if (any) {
+ any->UnpackTo(&result);
+ }
+
+ TDescribePartitionResult val(TStatus(std::move(status)), std::move(result));
+ promise.SetValue(std::move(val));
+ };
+
+ Connections_->RunDeferred<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Topic::V1::TopicService::Stub::AsyncDescribePartition,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings));
+
+ return promise.GetFuture();
+ }
+
TAsyncStatus CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset,
const TCommitOffsetSettings& settings) {
Ydb::Topic::CommitOffsetRequest request = MakeOperationRequest<Ydb::Topic::CommitOffsetRequest>(settings);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index cc49bcf1267..962bebac174 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -128,6 +128,21 @@ private:
TString ReadSessionId_;
};
+// Topic partition location
+class TPartitionLocation {
+public:
+ TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation);
+ i32 GetNodeId() const;
+ i64 GetGeneration() const;
+
+private:
+ // Node identificator.
+ i32 NodeId_ = 1;
+
+ // Partition generation.
+ i64 Generation_ = 2;
+};
+
class TPartitionInfo {
public:
TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo);
@@ -140,6 +155,7 @@ public:
const TMaybe<TPartitionStats>& GetPartitionStats() const;
const TMaybe<TPartitionConsumerStats>& GetPartitionConsumerStats() const;
+ const TMaybe<TPartitionLocation>& GetPartitionLocation() const;
private:
ui64 PartitionId_;
@@ -148,9 +164,9 @@ private:
TVector<ui64> ParentPartitionIds_;
TMaybe<TPartitionStats> PartitionStats_;
TMaybe<TPartitionConsumerStats> PartitionConsumerStats_;
+ TMaybe<TPartitionLocation> PartitionLocation_;
};
-
class TPartitioningSettings {
public:
TPartitioningSettings() : MinActivePartitions_(0), PartitionCountLimit_(0){}
@@ -245,8 +261,21 @@ private:
TConsumer Consumer_;
};
+class TPartitionDescription {
+ friend class NYdb::TProtoAccessor;
+
+public:
+ TPartitionDescription(Ydb::Topic::DescribePartitionResult&& desc);
+
+ const TPartitionInfo& GetPartition() const;
+private:
+ const Ydb::Topic::DescribePartitionResult& GetProto() const;
-// Result for describe resource request.
+ const Ydb::Topic::DescribePartitionResult Proto_;
+ TPartitionInfo Partition_;
+};
+
+// Result for describe topic request.
struct TDescribeTopicResult : public TStatus {
friend class NYdb::TProtoAccessor;
@@ -259,7 +288,7 @@ private:
TTopicDescription TopicDescription_;
};
-// Result for describe resource request.
+// Result for describe consumer request.
struct TDescribeConsumerResult : public TStatus {
friend class NYdb::TProtoAccessor;
@@ -272,9 +301,21 @@ private:
TConsumerDescription ConsumerDescription_;
};
+// Result for describe partition request.
+struct TDescribePartitionResult: public TStatus {
+ friend class NYdb::TProtoAccessor;
+
+ TDescribePartitionResult(TStatus&& status, Ydb::Topic::DescribePartitionResult&& result);
+
+ const TPartitionDescription& GetPartitionDescription() const;
+
+private:
+ TPartitionDescription PartitionDescription_;
+};
using TAsyncDescribeTopicResult = NThreading::TFuture<TDescribeTopicResult>;
using TAsyncDescribeConsumerResult = NThreading::TFuture<TDescribeConsumerResult>;
+using TAsyncDescribePartitionResult = NThreading::TFuture<TDescribePartitionResult>;
template <class TSettings>
class TAlterAttributesBuilderImpl {
@@ -529,18 +570,31 @@ struct TDropTopicSettings : public TOperationRequestSettings<TDropTopicSettings>
using TOperationRequestSettings<TDropTopicSettings>::TOperationRequestSettings;
};
-// Settings for describe resource request.
+// Settings for describe topic request.
struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> {
using TSelf = TDescribeTopicSettings;
FLUENT_SETTING_DEFAULT(bool, IncludeStats, false);
+
+ FLUENT_SETTING_DEFAULT(bool, IncludeLocation, false);
};
-// Settings for describe resource request.
+// Settings for describe consumer request.
struct TDescribeConsumerSettings : public TOperationRequestSettings<TDescribeConsumerSettings> {
using TSelf = TDescribeConsumerSettings;
FLUENT_SETTING_DEFAULT(bool, IncludeStats, false);
+
+ FLUENT_SETTING_DEFAULT(bool, IncludeLocation, false);
+};
+
+// Settings for describe partition request.
+struct TDescribePartitionSettings: public TOperationRequestSettings<TDescribePartitionSettings> {
+ using TSelf = TDescribePartitionSettings;
+
+ FLUENT_SETTING_DEFAULT(bool, IncludeStats, false);
+
+ FLUENT_SETTING_DEFAULT(bool, IncludeLocation, false);
};
// Settings for commit offset request.
@@ -1675,12 +1729,14 @@ public:
// Delete a topic.
TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& settings = {});
- // Describe settings of topic.
+ // Describe a topic.
TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& settings = {});
- // Describe settings of topic's consumer.
- TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer,
- const TDescribeConsumerSettings& settings = {});
+ // Describe a topic consumer.
+ TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& settings = {});
+
+ // Describe a topic partition
+ TAsyncDescribePartitionResult DescribePartition(const TString& path, i64 partitionId, const TDescribePartitionSettings& settings = {});
//! Create read session.
std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
index 7add9826037..a89ed6f1710 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
@@ -14,102 +14,313 @@
namespace NYdb::NTopic::NTests {
- Y_UNIT_TEST_SUITE(DescribeTopic) {
- Y_UNIT_TEST(Basic) {
- auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
- TTopicClient client(setup->GetDriver());
+ Y_UNIT_TEST_SUITE(Describe) {
+
+ void DescribeTopic(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets)
+ {
+ TDescribeTopicSettings settings;
+ settings.IncludeStats(requireStats);
+ settings.IncludeLocation(requireLocation);
- // DescribeTopic
{
- auto result = client.DescribeTopic(setup->GetTestTopicPath()).GetValueSync();
+ auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
const auto& description = result.GetTopicDescription();
- auto& partitions = description.GetPartitions();
+ const auto& partitions = description.GetPartitions();
UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1);
- auto& partition = partitions[0];
+ const auto& partition = partitions[0];
UNIT_ASSERT(partition.GetActive());
UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0);
+
+ if (requireStats)
+ {
+ const auto& stats = description.GetTopicStats();
+
+ if (requireNonEmptyStats)
+ {
+ UNIT_ASSERT_GT(stats.GetStoreSizeBytes(), 0);
+ UNIT_ASSERT_GT(stats.GetBytesWrittenPerMinute(), 0);
+ UNIT_ASSERT_GT(stats.GetBytesWrittenPerHour(), 0);
+ UNIT_ASSERT_GT(stats.GetBytesWrittenPerDay(), 0);
+ UNIT_ASSERT_GT(stats.GetMaxWriteTimeLag(), TDuration::Zero());
+ UNIT_ASSERT_GT(stats.GetMinLastWriteTime(), TInstant::Zero());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetStoreSizeBytes(), 0);
+ }
+ }
+
+ if (requireLocation)
+ {
+ UNIT_ASSERT(partition.GetPartitionLocation());
+ const auto& partitionLocation = *partition.GetPartitionLocation();
+ UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0);
+ UNIT_ASSERT_GE(partitionLocation.GetGeneration(), 0); // greater-or-equal 0
+ }
}
- // DescribeConsumer
+ if (killTablets)
{
- auto result = client.DescribeConsumer(setup->GetTestTopicPath(), setup->GetTestConsumer()).GetValueSync();
+ setup.KillTopicTablets(setup.GetTestTopicPath());
+
+ auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
- auto& description = result.GetConsumerDescription();
+ const auto& description = result.GetTopicDescription();
- auto& partitions = description.GetPartitions();
+ const auto& partitions = description.GetPartitions();
UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1);
- auto& partition = partitions[0];
+ const auto& partition = partitions[0];
UNIT_ASSERT(partition.GetActive());
UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0);
+
+ if (requireStats)
+ {
+ const auto& stats = description.GetTopicStats();
+
+ if (requireNonEmptyStats)
+ {
+ UNIT_ASSERT_GT(stats.GetStoreSizeBytes(), 0);
+ UNIT_ASSERT_GT(stats.GetBytesWrittenPerMinute(), 0);
+ UNIT_ASSERT_GT(stats.GetBytesWrittenPerHour(), 0);
+ UNIT_ASSERT_GT(stats.GetBytesWrittenPerDay(), 0);
+ UNIT_ASSERT_GT(stats.GetMaxWriteTimeLag(), TDuration::Zero());
+ UNIT_ASSERT_GT(stats.GetMinLastWriteTime(), TInstant::Zero());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats.GetStoreSizeBytes(), 0);
+ }
+ }
+
+ if (requireLocation)
+ {
+ UNIT_ASSERT(partition.GetPartitionLocation());
+ const auto& partitionLocation = *partition.GetPartitionLocation();
+ UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0);
+ UNIT_ASSERT_GT(partitionLocation.GetGeneration(), 0); // greater-then 0 after tablet restart
+ }
}
}
- Y_UNIT_TEST(Statistics) {
- auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
- TTopicClient client(setup->GetDriver());
+ void DescribeConsumer(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets)
+ {
+ TDescribeConsumerSettings settings;
+ settings.IncludeStats(requireStats);
+ settings.IncludeLocation(requireLocation);
- TDescribeTopicSettings settings;
- settings.IncludeStats(true);
+ {
+ auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ const auto& description = result.GetConsumerDescription();
+
+ const auto& partitions = description.GetPartitions();
+ UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1);
+
+ const auto& partition = partitions[0];
+ UNIT_ASSERT(partition.GetActive());
+ UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0);
+
+ if (requireStats)
+ {
+ const auto& stats = partition.GetPartitionStats();
+ const auto& consumerStats = partition.GetPartitionConsumerStats();
+ UNIT_ASSERT(stats);
+ UNIT_ASSERT(consumerStats);
+
+ if (requireNonEmptyStats)
+ {
+ UNIT_ASSERT_GE(stats->GetStartOffset(), 0);
+ UNIT_ASSERT_GE(stats->GetEndOffset(), 0);
+ UNIT_ASSERT_GT(stats->GetStoreSizeBytes(), 0);
+ UNIT_ASSERT_GT(stats->GetLastWriteTime(), TInstant::Zero());
+ UNIT_ASSERT_GT(stats->GetMaxWriteTimeLag(), TDuration::Zero());
+ UNIT_ASSERT_GT(stats->GetBytesWrittenPerMinute(), 0);
+ UNIT_ASSERT_GT(stats->GetBytesWrittenPerHour(), 0);
+ UNIT_ASSERT_GT(stats->GetBytesWrittenPerDay(), 0);
+
+ UNIT_ASSERT_GT(consumerStats->GetLastReadOffset(), 0);
+ UNIT_ASSERT_GT(consumerStats->GetCommittedOffset(), 0);
+ UNIT_ASSERT_GE(consumerStats->GetReadSessionId(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(consumerStats->GetReaderName(), "");
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats->GetStartOffset(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(consumerStats->GetLastReadOffset(), 0);
+ }
+ }
+
+ if (requireLocation)
+ {
+ UNIT_ASSERT(partition.GetPartitionLocation());
+ const auto& partitionLocation = *partition.GetPartitionLocation();
+ UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0);
+ UNIT_ASSERT_GE(partitionLocation.GetGeneration(), 0); // greater-or-equal 0
+ }
+ }
+
+ if (killTablets)
+ {
+ setup.KillTopicTablets(setup.GetTestTopicPath());
+
+ auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ const auto& description = result.GetConsumerDescription();
+
+ const auto& partitions = description.GetPartitions();
+ UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1);
+
+ const auto& partition = partitions[0];
+ UNIT_ASSERT(partition.GetActive());
+ UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0);
- // Get empty topic description
+ if (requireLocation)
+ {
+ UNIT_ASSERT(partition.GetPartitionLocation());
+ const auto& partitionLocation = *partition.GetPartitionLocation();
+ UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0);
+ UNIT_ASSERT_GT(partitionLocation.GetGeneration(), 0); // greater-then 0 after tablet restart
+ }
+ }
+ }
+
+ void DescribePartition(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets)
+ {
+ TDescribePartitionSettings settings;
+ settings.IncludeStats(requireStats);
+ settings.IncludeLocation(requireLocation);
+
+ i64 testPartitionId = 0;
+
+ {
+ auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ const auto& description = result.GetPartitionDescription();
+
+ const auto& partition = description.GetPartition();
+ UNIT_ASSERT(partition.GetActive());
+ UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), testPartitionId);
+
+ if (requireStats)
+ {
+ const auto& stats = partition.GetPartitionStats();
+ UNIT_ASSERT(stats);
+
+ if (requireNonEmptyStats)
+ {
+ UNIT_ASSERT_GE(stats->GetStartOffset(), 0);
+ UNIT_ASSERT_GE(stats->GetEndOffset(), 0);
+ UNIT_ASSERT_GT(stats->GetStoreSizeBytes(), 0);
+ UNIT_ASSERT_GT(stats->GetLastWriteTime(), TInstant::Zero());
+ UNIT_ASSERT_GT(stats->GetMaxWriteTimeLag(), TDuration::Zero());
+ UNIT_ASSERT_GT(stats->GetBytesWrittenPerMinute(), 0);
+ UNIT_ASSERT_GT(stats->GetBytesWrittenPerHour(), 0);
+ UNIT_ASSERT_GT(stats->GetBytesWrittenPerDay(), 0);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(stats->GetStoreSizeBytes(), 0);
+ }
+ }
+
+ if (requireLocation)
+ {
+ UNIT_ASSERT(partition.GetPartitionLocation());
+ const auto& partitionLocation = *partition.GetPartitionLocation();
+ UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0);
+ UNIT_ASSERT_GE(partitionLocation.GetGeneration(), 0); // greater-or-equal 0
+ }
+ }
+
+ if (killTablets)
{
- auto result = client.DescribeTopic(setup->GetTestTopicPath(), settings).GetValueSync();
+ setup.KillTopicTablets(setup.GetTestTopicPath());
+
+ auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
- auto& description = result.GetTopicDescription();
- UNIT_ASSERT_VALUES_EQUAL(description.GetPartitions().size(), 1);
+ const auto& description = result.GetPartitionDescription();
+
+ const auto& partition = description.GetPartition();
+ UNIT_ASSERT(partition.GetActive());
+ UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), testPartitionId);
- auto& stats = description.GetTopicStats();
- UNIT_ASSERT_VALUES_EQUAL(stats.GetStoreSizeBytes(), 0);
+ if (requireLocation)
+ {
+ UNIT_ASSERT(partition.GetPartitionLocation());
+ const auto& partitionLocation = *partition.GetPartitionLocation();
+ UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0);
+ UNIT_ASSERT_GT(partitionLocation.GetGeneration(), 0); // greater-then 0 after tablet restart
+ }
}
+ }
+
+ Y_UNIT_TEST(Basic) {
+ NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
+ TTopicClient client(setup.GetDriver());
+
+ DescribeTopic(setup, client, false, false, false, false);
+ DescribeConsumer(setup, client, false, false, false, false);
+ DescribePartition(setup, client, false, false, false, false);
+ }
+
+ Y_UNIT_TEST(Statistics) {
+ NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
+ TTopicClient client(setup.GetDriver());
+
+ // Get empty description
+ DescribeTopic(setup, client, true, false, false, false);
+ DescribeConsumer(setup, client, true, false, false, false);
+ DescribePartition(setup, client, true, false, false, false);
// Write a message
{
- auto writeSettings = TWriteSessionSettings().Path(setup->GetTestTopic()).MessageGroupId(::NPersQueue::SDKTestSetup::GetTestMessageGroupId());
+ auto writeSettings = TWriteSessionSettings().Path(setup.GetTestTopicPath()).MessageGroupId(::NPersQueue::SDKTestSetup::GetTestMessageGroupId());
auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings);
std::string message(10_KB, 'x');
UNIT_ASSERT(writeSession->Write(message));
writeSession->Close();
}
- // Get non-empty topic description
+ // Read a message
{
- auto result = client.DescribeTopic(setup->GetTestTopicPath(), settings).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ auto readSettings = TReadSessionSettings()
+ .ConsumerName(setup.GetTestConsumer())
+ .AppendTopics(setup.GetTestTopicPath());
+ auto readSession = client.CreateReadSession(readSettings);
- auto& description = result.GetTopicDescription();
+ auto event = readSession->GetEvent(true);
+ UNIT_ASSERT(event.Defined());
- auto& stats = description.GetTopicStats();
- UNIT_ASSERT_GT(stats.GetStoreSizeBytes(), 0);
- UNIT_ASSERT_GT(stats.GetBytesWrittenPerMinute(), 0);
- UNIT_ASSERT_GT(stats.GetBytesWrittenPerHour(), 0);
- UNIT_ASSERT_GT(stats.GetBytesWrittenPerDay(), 0);
- UNIT_ASSERT_GT(stats.GetMaxWriteTimeLag(), TDuration::Zero());
- UNIT_ASSERT_GT(stats.GetMinLastWriteTime(), TInstant::Zero());
+ auto& startPartitionSession = std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event);
+ startPartitionSession.Confirm();
- auto& partitions = description.GetPartitions();
- UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1);
+ event = readSession->GetEvent(true);
+ UNIT_ASSERT(event.Defined());
- auto& partition = partitions[0];
- UNIT_ASSERT(partition.GetActive());
- UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0);
-
- auto& partitionStats = *partition.GetPartitionStats();
- UNIT_ASSERT_VALUES_EQUAL(partitionStats.GetStartOffset(), 0);
- UNIT_ASSERT_GT(partitionStats.GetEndOffset(), 0);
- UNIT_ASSERT_GT(partitionStats.GetStoreSizeBytes(), 0);
- UNIT_ASSERT_GT(partitionStats.GetBytesWrittenPerMinute(), 0);
- UNIT_ASSERT_GT(partitionStats.GetBytesWrittenPerHour(), 0);
- UNIT_ASSERT_GT(partitionStats.GetBytesWrittenPerDay(), 0);
- UNIT_ASSERT_GT(partitionStats.GetMaxWriteTimeLag(), TDuration::Zero());
- UNIT_ASSERT_GT(partitionStats.GetLastWriteTime(), TInstant::Zero());
+ auto& dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
+ dataReceived.Commit();
}
+
+ // Get non-empty description
+ DescribeTopic(setup, client, true, true, false, false);
+ DescribeConsumer(setup, client, true, true, false, false);
+ DescribePartition(setup, client, true, true, false, false);
+ }
+
+ Y_UNIT_TEST(Location) {
+ NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
+ TTopicClient client(setup.GetDriver());
+
+ DescribeTopic(setup, client, false, false, true, false);
+ DescribeConsumer(setup, client, false, false, true, false);
+ DescribePartition(setup, client, false, false, true, false);
+
+ // Describe with KillTablets
+ DescribeTopic(setup, client, false, false, true, true);
+ DescribeConsumer(setup, client, false, false, true, true);
+ DescribePartition(setup, client, false, false, true, true);
}
}
}
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
index 382aee6829f..66a332f89ee 100644
--- a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
+++ b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
@@ -178,13 +178,11 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) {
auto pqGroup = server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription()
.GetPersQueueGroup();
- THashSet<ui64> tablets;
- for (const auto& p : pqGroup.GetPartitions()) {
- auto res = tablets.insert(p.GetTabletId());
- if (res.second) {
+ THashSet<ui64> restartedTablets;
+ for (const auto& p : pqGroup.GetPartitions())
+ if (restartedTablets.insert(p.GetTabletId()).second)
server.AnnoyingClient->KillTablet(*server.CleverServer, p.GetTabletId());
- }
- }
+
server.CleverServer->GetRuntime()->DispatchEvents();
}