diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-07-25 19:31:28 +0300 |
---|---|---|
committer | root <root@qavm-2ed34686.qemu> | 2023-07-25 19:31:28 +0300 |
commit | dd49b96fb21fbea107931e3ca4f1d9b292241969 (patch) | |
tree | b02fefe5eb8aa412cfdda30a6563846dd372f4c5 | |
parent | 6780bb36eaf2c3d3fd68bc20ddc754969d90c5b8 (diff) | |
download | ydb-dd49b96fb21fbea107931e3ca4f1d9b292241969.tar.gz |
Describe topic, consumer, partition in SDK
6 files changed, 438 insertions, 66 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h index e64aeee9525..2f75531305c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h @@ -199,6 +199,7 @@ public: Server.AnnoyingClient->KickNodeInHive(Server.CleverServer->GetRuntime(), i); } } + void AllowTablets() { for (ui32 i = 0; i < Server.CleverServer->StaticNodes() + Server.CleverServer->DynamicNodes(); i++) { Server.AnnoyingClient->MarkNodeInHive(Server.CleverServer->GetRuntime(), i, true); @@ -214,5 +215,16 @@ public: UNIT_ASSERT_C(describeResult->Record.GetPathDescription().HasPersQueueGroup(), describeResult->Record); Server.AnnoyingClient->KillTablet(*Server.CleverServer, describeResult->Record.GetPathDescription().GetPersQueueGroup().GetBalancerTabletID()); } + + void KillTopicTablets(const TString& topicName) { + auto pqGroup = Server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription().GetPersQueueGroup(); + + THashSet<ui64> restartedTablets; + for (const auto& p : pqGroup.GetPartitions()) + if (restartedTablets.insert(p.GetTabletId()).second) + Server.AnnoyingClient->KillTablet(*Server.CleverServer, p.GetTabletId()); + + Server.CleverServer->GetRuntime()->DispatchEvents(); + } }; } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index 35869b47aa4..a820288af02 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -37,6 +37,16 @@ const TConsumerDescription& TDescribeConsumerResult::GetConsumerDescription() co return ConsumerDescription_; } +TDescribePartitionResult::TDescribePartitionResult(TStatus&& status, Ydb::Topic::DescribePartitionResult&& result) + : TStatus(std::move(status)) + , PartitionDescription_(std::move(result)) +{ +} + +const TPartitionDescription& TDescribePartitionResult::GetPartitionDescription() const { + return PartitionDescription_; +} + TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result) : Proto_(std::move(result)) , PartitioningSettings_(Proto_.partitioning_settings()) @@ -74,6 +84,11 @@ TConsumerDescription::TConsumerDescription(Ydb::Topic::DescribeConsumerResult&& } } +TPartitionDescription::TPartitionDescription(Ydb::Topic::DescribePartitionResult&& result) + : Proto_(std::move(result)) + , Partition_(Proto_.partition()) +{ +} TConsumer::TConsumer(const Ydb::Topic::Consumer& consumer) : ConsumerName_(consumer.name()) @@ -124,6 +139,10 @@ const TVector<TPartitionInfo>& TConsumerDescription::GetPartitions() const { return Partitions_; } +const TPartitionInfo& TPartitionDescription::GetPartition() const { + return Partition_; +} + const TConsumer& TConsumerDescription::GetConsumer() const { return Consumer_; } @@ -173,6 +192,10 @@ const Ydb::Topic::DescribeConsumerResult& TConsumerDescription::GetProto() const return Proto_; } +const Ydb::Topic::DescribePartitionResult& TPartitionDescription::GetProto() const { + return Proto_; +} + const TString& TTopicDescription::GetOwner() const { return Owner_; } @@ -305,6 +328,19 @@ TString TPartitionConsumerStats::GetReadSessionId() const { return ReadSessionId_; } +TPartitionLocation::TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation) + : NodeId_(partitionLocation.node_id()) + , Generation_(partitionLocation.generation()) +{ +} + +i32 TPartitionLocation::GetNodeId() const { + return NodeId_; +} + +i64 TPartitionLocation::GetGeneration() const { + return Generation_; +} TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo) : PartitionId_(partitionInfo.partition_id()) @@ -321,6 +357,10 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI if (partitionInfo.has_partition_stats()) { PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()}; } + + if (partitionInfo.has_partition_location()) { + PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()}; + } } TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo) @@ -339,6 +379,9 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::Partiti PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()}; PartitionConsumerStats_ = TPartitionConsumerStats{partitionInfo.partition_consumer_stats()}; } + if (partitionInfo.has_partition_location()) { + PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()}; + } } const TMaybe<TPartitionStats>& TPartitionInfo::GetPartitionStats() const { @@ -349,6 +392,10 @@ const TMaybe<TPartitionConsumerStats>& TPartitionInfo::GetPartitionConsumerStats return PartitionConsumerStats_; } +const TMaybe<TPartitionLocation>& TPartitionInfo::GetPartitionLocation() const { + return PartitionLocation_; +} + bool TPartitionInfo::GetActive() const { return Active_; } @@ -379,6 +426,10 @@ TAsyncDescribeConsumerResult TTopicClient::DescribeConsumer(const TString& path, return Impl_->DescribeConsumer(path, consumer, settings); } +TAsyncDescribePartitionResult TTopicClient::DescribePartition(const TString& path, i64 partitionId, const TDescribePartitionSettings& settings) { + return Impl_->DescribePartition(path, partitionId, settings); +} + IRetryPolicy::TPtr IRetryPolicy::GetDefaultPolicy() { static IRetryPolicy::TPtr policy = GetExponentialBackoffPolicy(); return policy; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h index 5c42758df99..49bf5719762 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h @@ -194,6 +194,10 @@ public: request.set_include_stats(true); } + if (settings.IncludeLocation_) { + request.set_include_location(true); + } + auto promise = NThreading::NewPromise<TDescribeTopicResult>(); auto extractor = [promise] @@ -227,6 +231,10 @@ public: request.set_include_stats(true); } + if (settings.IncludeLocation_) { + request.set_include_location(true); + } + auto promise = NThreading::NewPromise<TDescribeConsumerResult>(); auto extractor = [promise] @@ -251,6 +259,42 @@ public: return promise.GetFuture(); } + TAsyncDescribePartitionResult DescribePartition(const TString& path, i64 partitionId, const TDescribePartitionSettings& settings) { + auto request = MakeOperationRequest<Ydb::Topic::DescribePartitionRequest>(settings); + request.set_path(path); + request.set_partition_id(partitionId); + + if (settings.IncludeStats_) { + request.set_include_stats(true); + } + + if (settings.IncludeLocation_) { + request.set_include_location(true); + } + + auto promise = NThreading::NewPromise<TDescribePartitionResult>(); + + auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Topic::DescribePartitionResult result; + if (any) { + any->UnpackTo(&result); + } + + TDescribePartitionResult val(TStatus(std::move(status)), std::move(result)); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>( + std::move(request), + extractor, + &Ydb::Topic::V1::TopicService::Stub::AsyncDescribePartition, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); + } + TAsyncStatus CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset, const TCommitOffsetSettings& settings) { Ydb::Topic::CommitOffsetRequest request = MakeOperationRequest<Ydb::Topic::CommitOffsetRequest>(settings); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index cc49bcf1267..962bebac174 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -128,6 +128,21 @@ private: TString ReadSessionId_; }; +// Topic partition location +class TPartitionLocation { +public: + TPartitionLocation(const Ydb::Topic::PartitionLocation& partitionLocation); + i32 GetNodeId() const; + i64 GetGeneration() const; + +private: + // Node identificator. + i32 NodeId_ = 1; + + // Partition generation. + i64 Generation_ = 2; +}; + class TPartitionInfo { public: TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo); @@ -140,6 +155,7 @@ public: const TMaybe<TPartitionStats>& GetPartitionStats() const; const TMaybe<TPartitionConsumerStats>& GetPartitionConsumerStats() const; + const TMaybe<TPartitionLocation>& GetPartitionLocation() const; private: ui64 PartitionId_; @@ -148,9 +164,9 @@ private: TVector<ui64> ParentPartitionIds_; TMaybe<TPartitionStats> PartitionStats_; TMaybe<TPartitionConsumerStats> PartitionConsumerStats_; + TMaybe<TPartitionLocation> PartitionLocation_; }; - class TPartitioningSettings { public: TPartitioningSettings() : MinActivePartitions_(0), PartitionCountLimit_(0){} @@ -245,8 +261,21 @@ private: TConsumer Consumer_; }; +class TPartitionDescription { + friend class NYdb::TProtoAccessor; + +public: + TPartitionDescription(Ydb::Topic::DescribePartitionResult&& desc); + + const TPartitionInfo& GetPartition() const; +private: + const Ydb::Topic::DescribePartitionResult& GetProto() const; -// Result for describe resource request. + const Ydb::Topic::DescribePartitionResult Proto_; + TPartitionInfo Partition_; +}; + +// Result for describe topic request. struct TDescribeTopicResult : public TStatus { friend class NYdb::TProtoAccessor; @@ -259,7 +288,7 @@ private: TTopicDescription TopicDescription_; }; -// Result for describe resource request. +// Result for describe consumer request. struct TDescribeConsumerResult : public TStatus { friend class NYdb::TProtoAccessor; @@ -272,9 +301,21 @@ private: TConsumerDescription ConsumerDescription_; }; +// Result for describe partition request. +struct TDescribePartitionResult: public TStatus { + friend class NYdb::TProtoAccessor; + + TDescribePartitionResult(TStatus&& status, Ydb::Topic::DescribePartitionResult&& result); + + const TPartitionDescription& GetPartitionDescription() const; + +private: + TPartitionDescription PartitionDescription_; +}; using TAsyncDescribeTopicResult = NThreading::TFuture<TDescribeTopicResult>; using TAsyncDescribeConsumerResult = NThreading::TFuture<TDescribeConsumerResult>; +using TAsyncDescribePartitionResult = NThreading::TFuture<TDescribePartitionResult>; template <class TSettings> class TAlterAttributesBuilderImpl { @@ -529,18 +570,31 @@ struct TDropTopicSettings : public TOperationRequestSettings<TDropTopicSettings> using TOperationRequestSettings<TDropTopicSettings>::TOperationRequestSettings; }; -// Settings for describe resource request. +// Settings for describe topic request. struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> { using TSelf = TDescribeTopicSettings; FLUENT_SETTING_DEFAULT(bool, IncludeStats, false); + + FLUENT_SETTING_DEFAULT(bool, IncludeLocation, false); }; -// Settings for describe resource request. +// Settings for describe consumer request. struct TDescribeConsumerSettings : public TOperationRequestSettings<TDescribeConsumerSettings> { using TSelf = TDescribeConsumerSettings; FLUENT_SETTING_DEFAULT(bool, IncludeStats, false); + + FLUENT_SETTING_DEFAULT(bool, IncludeLocation, false); +}; + +// Settings for describe partition request. +struct TDescribePartitionSettings: public TOperationRequestSettings<TDescribePartitionSettings> { + using TSelf = TDescribePartitionSettings; + + FLUENT_SETTING_DEFAULT(bool, IncludeStats, false); + + FLUENT_SETTING_DEFAULT(bool, IncludeLocation, false); }; // Settings for commit offset request. @@ -1675,12 +1729,14 @@ public: // Delete a topic. TAsyncStatus DropTopic(const TString& path, const TDropTopicSettings& settings = {}); - // Describe settings of topic. + // Describe a topic. TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& settings = {}); - // Describe settings of topic's consumer. - TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, - const TDescribeConsumerSettings& settings = {}); + // Describe a topic consumer. + TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& settings = {}); + + // Describe a topic partition + TAsyncDescribePartitionResult DescribePartition(const TString& path, i64 partitionId, const TDescribePartitionSettings& settings = {}); //! Create read session. std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp index 7add9826037..a89ed6f1710 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp @@ -14,102 +14,313 @@ namespace NYdb::NTopic::NTests { - Y_UNIT_TEST_SUITE(DescribeTopic) { - Y_UNIT_TEST(Basic) { - auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); - TTopicClient client(setup->GetDriver()); + Y_UNIT_TEST_SUITE(Describe) { + + void DescribeTopic(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets) + { + TDescribeTopicSettings settings; + settings.IncludeStats(requireStats); + settings.IncludeLocation(requireLocation); - // DescribeTopic { - auto result = client.DescribeTopic(setup->GetTestTopicPath()).GetValueSync(); + auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); const auto& description = result.GetTopicDescription(); - auto& partitions = description.GetPartitions(); + const auto& partitions = description.GetPartitions(); UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1); - auto& partition = partitions[0]; + const auto& partition = partitions[0]; UNIT_ASSERT(partition.GetActive()); UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0); + + if (requireStats) + { + const auto& stats = description.GetTopicStats(); + + if (requireNonEmptyStats) + { + UNIT_ASSERT_GT(stats.GetStoreSizeBytes(), 0); + UNIT_ASSERT_GT(stats.GetBytesWrittenPerMinute(), 0); + UNIT_ASSERT_GT(stats.GetBytesWrittenPerHour(), 0); + UNIT_ASSERT_GT(stats.GetBytesWrittenPerDay(), 0); + UNIT_ASSERT_GT(stats.GetMaxWriteTimeLag(), TDuration::Zero()); + UNIT_ASSERT_GT(stats.GetMinLastWriteTime(), TInstant::Zero()); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.GetStoreSizeBytes(), 0); + } + } + + if (requireLocation) + { + UNIT_ASSERT(partition.GetPartitionLocation()); + const auto& partitionLocation = *partition.GetPartitionLocation(); + UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0); + UNIT_ASSERT_GE(partitionLocation.GetGeneration(), 0); // greater-or-equal 0 + } } - // DescribeConsumer + if (killTablets) { - auto result = client.DescribeConsumer(setup->GetTestTopicPath(), setup->GetTestConsumer()).GetValueSync(); + setup.KillTopicTablets(setup.GetTestTopicPath()); + + auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - auto& description = result.GetConsumerDescription(); + const auto& description = result.GetTopicDescription(); - auto& partitions = description.GetPartitions(); + const auto& partitions = description.GetPartitions(); UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1); - auto& partition = partitions[0]; + const auto& partition = partitions[0]; UNIT_ASSERT(partition.GetActive()); UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0); + + if (requireStats) + { + const auto& stats = description.GetTopicStats(); + + if (requireNonEmptyStats) + { + UNIT_ASSERT_GT(stats.GetStoreSizeBytes(), 0); + UNIT_ASSERT_GT(stats.GetBytesWrittenPerMinute(), 0); + UNIT_ASSERT_GT(stats.GetBytesWrittenPerHour(), 0); + UNIT_ASSERT_GT(stats.GetBytesWrittenPerDay(), 0); + UNIT_ASSERT_GT(stats.GetMaxWriteTimeLag(), TDuration::Zero()); + UNIT_ASSERT_GT(stats.GetMinLastWriteTime(), TInstant::Zero()); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats.GetStoreSizeBytes(), 0); + } + } + + if (requireLocation) + { + UNIT_ASSERT(partition.GetPartitionLocation()); + const auto& partitionLocation = *partition.GetPartitionLocation(); + UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0); + UNIT_ASSERT_GT(partitionLocation.GetGeneration(), 0); // greater-then 0 after tablet restart + } } } - Y_UNIT_TEST(Statistics) { - auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); - TTopicClient client(setup->GetDriver()); + void DescribeConsumer(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets) + { + TDescribeConsumerSettings settings; + settings.IncludeStats(requireStats); + settings.IncludeLocation(requireLocation); - TDescribeTopicSettings settings; - settings.IncludeStats(true); + { + auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const auto& description = result.GetConsumerDescription(); + + const auto& partitions = description.GetPartitions(); + UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1); + + const auto& partition = partitions[0]; + UNIT_ASSERT(partition.GetActive()); + UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0); + + if (requireStats) + { + const auto& stats = partition.GetPartitionStats(); + const auto& consumerStats = partition.GetPartitionConsumerStats(); + UNIT_ASSERT(stats); + UNIT_ASSERT(consumerStats); + + if (requireNonEmptyStats) + { + UNIT_ASSERT_GE(stats->GetStartOffset(), 0); + UNIT_ASSERT_GE(stats->GetEndOffset(), 0); + UNIT_ASSERT_GT(stats->GetStoreSizeBytes(), 0); + UNIT_ASSERT_GT(stats->GetLastWriteTime(), TInstant::Zero()); + UNIT_ASSERT_GT(stats->GetMaxWriteTimeLag(), TDuration::Zero()); + UNIT_ASSERT_GT(stats->GetBytesWrittenPerMinute(), 0); + UNIT_ASSERT_GT(stats->GetBytesWrittenPerHour(), 0); + UNIT_ASSERT_GT(stats->GetBytesWrittenPerDay(), 0); + + UNIT_ASSERT_GT(consumerStats->GetLastReadOffset(), 0); + UNIT_ASSERT_GT(consumerStats->GetCommittedOffset(), 0); + UNIT_ASSERT_GE(consumerStats->GetReadSessionId(), 0); + UNIT_ASSERT_VALUES_EQUAL(consumerStats->GetReaderName(), ""); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats->GetStartOffset(), 0); + UNIT_ASSERT_VALUES_EQUAL(consumerStats->GetLastReadOffset(), 0); + } + } + + if (requireLocation) + { + UNIT_ASSERT(partition.GetPartitionLocation()); + const auto& partitionLocation = *partition.GetPartitionLocation(); + UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0); + UNIT_ASSERT_GE(partitionLocation.GetGeneration(), 0); // greater-or-equal 0 + } + } + + if (killTablets) + { + setup.KillTopicTablets(setup.GetTestTopicPath()); + + auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const auto& description = result.GetConsumerDescription(); + + const auto& partitions = description.GetPartitions(); + UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1); + + const auto& partition = partitions[0]; + UNIT_ASSERT(partition.GetActive()); + UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0); - // Get empty topic description + if (requireLocation) + { + UNIT_ASSERT(partition.GetPartitionLocation()); + const auto& partitionLocation = *partition.GetPartitionLocation(); + UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0); + UNIT_ASSERT_GT(partitionLocation.GetGeneration(), 0); // greater-then 0 after tablet restart + } + } + } + + void DescribePartition(NPersQueue::NTests::TPersQueueYdbSdkTestSetup& setup, TTopicClient& client, bool requireStats, bool requireNonEmptyStats, bool requireLocation, bool killTablets) + { + TDescribePartitionSettings settings; + settings.IncludeStats(requireStats); + settings.IncludeLocation(requireLocation); + + i64 testPartitionId = 0; + + { + auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const auto& description = result.GetPartitionDescription(); + + const auto& partition = description.GetPartition(); + UNIT_ASSERT(partition.GetActive()); + UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), testPartitionId); + + if (requireStats) + { + const auto& stats = partition.GetPartitionStats(); + UNIT_ASSERT(stats); + + if (requireNonEmptyStats) + { + UNIT_ASSERT_GE(stats->GetStartOffset(), 0); + UNIT_ASSERT_GE(stats->GetEndOffset(), 0); + UNIT_ASSERT_GT(stats->GetStoreSizeBytes(), 0); + UNIT_ASSERT_GT(stats->GetLastWriteTime(), TInstant::Zero()); + UNIT_ASSERT_GT(stats->GetMaxWriteTimeLag(), TDuration::Zero()); + UNIT_ASSERT_GT(stats->GetBytesWrittenPerMinute(), 0); + UNIT_ASSERT_GT(stats->GetBytesWrittenPerHour(), 0); + UNIT_ASSERT_GT(stats->GetBytesWrittenPerDay(), 0); + } else { + UNIT_ASSERT_VALUES_EQUAL(stats->GetStoreSizeBytes(), 0); + } + } + + if (requireLocation) + { + UNIT_ASSERT(partition.GetPartitionLocation()); + const auto& partitionLocation = *partition.GetPartitionLocation(); + UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0); + UNIT_ASSERT_GE(partitionLocation.GetGeneration(), 0); // greater-or-equal 0 + } + } + + if (killTablets) { - auto result = client.DescribeTopic(setup->GetTestTopicPath(), settings).GetValueSync(); + setup.KillTopicTablets(setup.GetTestTopicPath()); + + auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - auto& description = result.GetTopicDescription(); - UNIT_ASSERT_VALUES_EQUAL(description.GetPartitions().size(), 1); + const auto& description = result.GetPartitionDescription(); + + const auto& partition = description.GetPartition(); + UNIT_ASSERT(partition.GetActive()); + UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), testPartitionId); - auto& stats = description.GetTopicStats(); - UNIT_ASSERT_VALUES_EQUAL(stats.GetStoreSizeBytes(), 0); + if (requireLocation) + { + UNIT_ASSERT(partition.GetPartitionLocation()); + const auto& partitionLocation = *partition.GetPartitionLocation(); + UNIT_ASSERT_GT(partitionLocation.GetNodeId(), 0); + UNIT_ASSERT_GT(partitionLocation.GetGeneration(), 0); // greater-then 0 after tablet restart + } } + } + + Y_UNIT_TEST(Basic) { + NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); + TTopicClient client(setup.GetDriver()); + + DescribeTopic(setup, client, false, false, false, false); + DescribeConsumer(setup, client, false, false, false, false); + DescribePartition(setup, client, false, false, false, false); + } + + Y_UNIT_TEST(Statistics) { + NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); + TTopicClient client(setup.GetDriver()); + + // Get empty description + DescribeTopic(setup, client, true, false, false, false); + DescribeConsumer(setup, client, true, false, false, false); + DescribePartition(setup, client, true, false, false, false); // Write a message { - auto writeSettings = TWriteSessionSettings().Path(setup->GetTestTopic()).MessageGroupId(::NPersQueue::SDKTestSetup::GetTestMessageGroupId()); + auto writeSettings = TWriteSessionSettings().Path(setup.GetTestTopicPath()).MessageGroupId(::NPersQueue::SDKTestSetup::GetTestMessageGroupId()); auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings); std::string message(10_KB, 'x'); UNIT_ASSERT(writeSession->Write(message)); writeSession->Close(); } - // Get non-empty topic description + // Read a message { - auto result = client.DescribeTopic(setup->GetTestTopicPath(), settings).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + auto readSettings = TReadSessionSettings() + .ConsumerName(setup.GetTestConsumer()) + .AppendTopics(setup.GetTestTopicPath()); + auto readSession = client.CreateReadSession(readSettings); - auto& description = result.GetTopicDescription(); + auto event = readSession->GetEvent(true); + UNIT_ASSERT(event.Defined()); - auto& stats = description.GetTopicStats(); - UNIT_ASSERT_GT(stats.GetStoreSizeBytes(), 0); - UNIT_ASSERT_GT(stats.GetBytesWrittenPerMinute(), 0); - UNIT_ASSERT_GT(stats.GetBytesWrittenPerHour(), 0); - UNIT_ASSERT_GT(stats.GetBytesWrittenPerDay(), 0); - UNIT_ASSERT_GT(stats.GetMaxWriteTimeLag(), TDuration::Zero()); - UNIT_ASSERT_GT(stats.GetMinLastWriteTime(), TInstant::Zero()); + auto& startPartitionSession = std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event); + startPartitionSession.Confirm(); - auto& partitions = description.GetPartitions(); - UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 1); + event = readSession->GetEvent(true); + UNIT_ASSERT(event.Defined()); - auto& partition = partitions[0]; - UNIT_ASSERT(partition.GetActive()); - UNIT_ASSERT_VALUES_EQUAL(partition.GetPartitionId(), 0); - - auto& partitionStats = *partition.GetPartitionStats(); - UNIT_ASSERT_VALUES_EQUAL(partitionStats.GetStartOffset(), 0); - UNIT_ASSERT_GT(partitionStats.GetEndOffset(), 0); - UNIT_ASSERT_GT(partitionStats.GetStoreSizeBytes(), 0); - UNIT_ASSERT_GT(partitionStats.GetBytesWrittenPerMinute(), 0); - UNIT_ASSERT_GT(partitionStats.GetBytesWrittenPerHour(), 0); - UNIT_ASSERT_GT(partitionStats.GetBytesWrittenPerDay(), 0); - UNIT_ASSERT_GT(partitionStats.GetMaxWriteTimeLag(), TDuration::Zero()); - UNIT_ASSERT_GT(partitionStats.GetLastWriteTime(), TInstant::Zero()); + auto& dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event); + dataReceived.Commit(); } + + // Get non-empty description + DescribeTopic(setup, client, true, true, false, false); + DescribeConsumer(setup, client, true, true, false, false); + DescribePartition(setup, client, true, true, false, false); + } + + Y_UNIT_TEST(Location) { + NPersQueue::NTests::TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); + TTopicClient client(setup.GetDriver()); + + DescribeTopic(setup, client, false, false, true, false); + DescribeConsumer(setup, client, false, false, true, false); + DescribePartition(setup, client, false, false, true, false); + + // Describe with KillTablets + DescribeTopic(setup, client, false, false, true, true); + DescribeConsumer(setup, client, false, false, true, true); + DescribePartition(setup, client, false, false, true, true); } } } diff --git a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp index 382aee6829f..66a332f89ee 100644 --- a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp +++ b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp @@ -178,13 +178,11 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) { auto pqGroup = server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription() .GetPersQueueGroup(); - THashSet<ui64> tablets; - for (const auto& p : pqGroup.GetPartitions()) { - auto res = tablets.insert(p.GetTabletId()); - if (res.second) { + THashSet<ui64> restartedTablets; + for (const auto& p : pqGroup.GetPartitions()) + if (restartedTablets.insert(p.GetTabletId()).second) server.AnnoyingClient->KillTablet(*server.CleverServer, p.GetTabletId()); - } - } + server.CleverServer->GetRuntime()->DispatchEvents(); } |