diff options
author | alexnick <alexnick@ydb.tech> | 2023-03-31 13:37:12 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2023-03-31 13:37:12 +0300 |
commit | 4f0f8b51b11c0b0da236b27a4b4d91dd8c6e7ffb (patch) | |
tree | c6a88397d4cb25ed311deaa6721bdd2848441a69 | |
parent | e7e32304b29c9377e76848838fd9e8efb6e1689a (diff) | |
download | ydb-4f0f8b51b11c0b0da236b27a4b4d91dd8c6e7ffb.tar.gz |
full topic describe
full topic describe
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp | 80 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp | 44 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_topic.h | 19 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/print_utils.cpp | 5 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/print_utils.h | 1 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp | 72 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/topic.h | 39 |
8 files changed, 252 insertions, 10 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index bbaf60f7e55..41440aa6162 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -144,8 +144,8 @@ void TCommandDescribe::Config(TConfig& config) { // Table options config.Opts->AddLongOption("partition-boundaries", "[Table] Show partition key boundaries").StoreTrue(&ShowKeyShardBoundaries) .AddLongName("shard-boundaries"); - config.Opts->AddLongOption("stats", "[Table] Show table statistics").StoreTrue(&ShowTableStats); - config.Opts->AddLongOption("partition-stats", "[Table] Show partition statistics").StoreTrue(&ShowPartitionStats); + config.Opts->AddLongOption("stats", "[Table|Topic] Show table/topic statistics").StoreTrue(&ShowStats); + config.Opts->AddLongOption("partition-stats", "[Table|Topic] Show partition statistics").StoreTrue(&ShowPartitionStats); AddDeprecatedJsonOption(config, "(Deprecated, will be removed soon. Use --format option instead) [Table] Output in json format"); AddFormats(config, { EOutputFormat::Pretty, EOutputFormat::ProtoJsonBase64 }); @@ -213,6 +213,56 @@ namespace { } } +namespace { + + void PrintStatistics(const NTopic::TTopicDescription& topicDescription) { + Cout << Endl << "Topic stats:" << Endl; + auto& topicStats = topicDescription.GetTopicStats(); + Cout << "Approximate size of topic: " << PrettySize(topicStats.GetStoreSizeBytes()) << Endl; + Cout << "Max partitions write time lag: " << FormatDuration(topicStats.GetMaxWriteTimeLag()) << Endl; + Cout << "Min partitions last write time: " << FormatTime(topicStats.GetMinLastWriteTime()) << Endl; + Cout << "Written size per minute: " << PrettySize(topicStats.GetBytesWrittenPerMinute()) << Endl; + Cout << "Written size per hour: " << PrettySize(topicStats.GetBytesWrittenPerHour()) << Endl; + Cout << "Written size per day: " << PrettySize(topicStats.GetBytesWrittenPerDay()) << Endl; + + } + + void PrintPartitionStatistics(const NTopic::TTopicDescription& topicDescription) { + Cout << Endl << "Topic partitions stats:" << Endl; + + TVector<TString> columnNames = { "#" }; + columnNames.push_back("Active"); + columnNames.push_back("Start offset"); + columnNames.push_back("End offset"); + columnNames.push_back("Size"); + columnNames.push_back("Last write time"); + columnNames.push_back("Max write time lag"); + columnNames.push_back("Written size per minute"); + columnNames.push_back("Written size per hour"); + columnNames.push_back("Written size per day"); + + TPrettyTable table(columnNames); + for (const auto& part : topicDescription.GetPartitions()) { + auto& row = table.AddRow(); + row.Column(0, part.GetPartitionId()); + row.Column(1, part.GetActive()); + const auto& partStats = part.GetPartitionStats(); + if (partStats) { + row.Column(2, partStats->GetStartOffset()); + row.Column(3, partStats->GetEndOffset()); + row.Column(4, PrettySize(partStats->GetStoreSizeBytes())); + row.Column(5, FormatTime(partStats->GetLastWriteTime())); + row.Column(6, FormatDuration(partStats->GetMaxWriteTimeLag())); + row.Column(7, PrettySize(partStats->GetBytesWrittenPerMinute())); + row.Column(8, PrettySize(partStats->GetBytesWrittenPerHour())); + row.Column(9, PrettySize(partStats->GetBytesWrittenPerDay())); + } + } + Cout << table; + } + +} + int TCommandDescribe::PrintTopicResponsePretty(const NYdb::NTopic::TTopicDescription& description) { Cout << Endl << "RetentionPeriod: " << description.GetRetentionPeriod().Hours() << " hours"; if (description.GetRetentionStorageMb().Defined()) { @@ -225,7 +275,14 @@ int TCommandDescribe::PrintTopicResponsePretty(const NYdb::NTopic::TTopicDescrip Cout << Endl << "SupportedCodecs: " << FormatCodecs(description.GetSupportedCodecs()) << Endl; } PrintTopicConsumers(description.GetConsumers()); - Cout << Endl; + + if (ShowStats) { + PrintStatistics(description); + } + if (ShowPartitionStats){ + PrintPartitionStatistics(description); + } + return EXIT_SUCCESS; } @@ -269,7 +326,10 @@ int TCommandDescribe::PrintTopicResponse(const NYdb::NTopic::TDescribeTopicResul int TCommandDescribe::DescribeTopic(TDriver& driver) { NYdb::NTopic::TTopicClient topicClient(driver); - auto describeResult = topicClient.DescribeTopic(Path).GetValueSync(); + NYdb::NTopic::TDescribeTopicSettings settings; + settings.IncludeStats(ShowStats || ShowPartitionStats); + + auto describeResult = topicClient.DescribeTopic(Path, settings).GetValueSync(); ThrowOnError(describeResult); return PrintTopicResponse(describeResult); } @@ -283,7 +343,7 @@ int TCommandDescribe::DescribeTable(TDriver& driver) { FillSettings( NTable::TDescribeTableSettings() .WithKeyShardBoundary(ShowKeyShardBoundaries) - .WithTableStatistics(ShowTableStats || ShowPartitionStats) + .WithTableStatistics(ShowStats || ShowPartitionStats) .WithPartitionStatistics(ShowPartitionStats) ) ).GetValueSync(); @@ -299,7 +359,7 @@ int TCommandDescribe::DescribeColumnTable(TDriver& driver) { Path, FillSettings( NTable::TDescribeTableSettings() - .WithTableStatistics(ShowTableStats) + .WithTableStatistics(ShowStats) ) ).GetValueSync(); ThrowOnError(result); @@ -539,6 +599,8 @@ namespace { Cout << "Created: " << FormatTime(tableDescription.GetCreationTime()) << Endl; } + + void PrintPartitionInfo(const NTable::TTableDescription& tableDescription, bool showBoundaries, bool showStats) { const TVector<NTable::TKeyRange>& ranges = tableDescription.GetKeyRanges(); const TVector<NTable::TPartitionStats>& stats = tableDescription.GetPartitionStats(); @@ -669,7 +731,7 @@ void TCommandDescribe::PrintResponsePretty(const NTable::TTableDescription& tabl tableDescription.GetEffectivePermissions() ); } - if (ShowTableStats) { + if (ShowStats) { PrintStatistics(tableDescription); } if (ShowKeyShardBoundaries || ShowPartitionStats) { @@ -696,12 +758,12 @@ int TCommandDescribe::PrintResponseProtoJsonBase64(const NTable::TTableDescripti } void TCommandDescribe::WarnAboutTableOptions() { - if (ShowKeyShardBoundaries || ShowTableStats || ShowPartitionStats || OutputFormat != EOutputFormat::Default) { + if (ShowKeyShardBoundaries || ShowStats || ShowPartitionStats || OutputFormat != EOutputFormat::Default) { TVector<TString> options; if (ShowKeyShardBoundaries) { options.emplace_back("\"partition-boundaries\"(\"shard-boundaries\")"); } - if (ShowTableStats) { + if (ShowStats) { options.emplace_back("\"stats\""); } if (ShowPartitionStats) { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h index bc6f2e867c2..24f545d245f 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h @@ -62,7 +62,7 @@ private: bool ShowPermissions = false; // Table options bool ShowKeyShardBoundaries = false; - bool ShowTableStats = false; + bool ShowStats = false; bool ShowPartitionStats = false; }; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index 7fd4c247ff9..ea0b2d85845 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -365,8 +365,15 @@ namespace NYdb::NConsoleClient { : TClientCommandTree("consumer", {}, "Consumer operations") { AddCommand(std::make_unique<TCommandTopicConsumerAdd>()); AddCommand(std::make_unique<TCommandTopicConsumerDrop>()); + AddCommand(std::make_unique<TCommandTopicConsumerOffset>()); } + TCommandTopicConsumerOffset::TCommandTopicConsumerOffset() + : TClientCommandTree("offset", {}, "Consumer offset operations") { + AddCommand(std::make_unique<TCommandTopicConsumerCommitOffset>()); + } + + TCommandTopicConsumerAdd::TCommandTopicConsumerAdd() : TYdbCommand("add", {}, "Consumer add operation") { } @@ -443,6 +450,43 @@ namespace NYdb::NConsoleClient { return EXIT_SUCCESS; } + + TCommandTopicConsumerCommitOffset::TCommandTopicConsumerCommitOffset() + : TYdbCommand("commit", {}, "Commit offset for consumer") { + } + + void TCommandTopicConsumerCommitOffset::Config(TConfig& config) { + TYdbCommand::Config(config); + config.Opts->AddLongOption("consumer", "Consumer which offset will be changed") + .Required() + .StoreResult(&ConsumerName_); + + config.Opts->AddLongOption("partition", "Partition which offset will be changed") + .Required() + .StoreResult(&PartitionId_); + + config.Opts->AddLongOption("offset", "Partition offset to be setted for desired consumer") + .Required() + .StoreResult(&Offset_); + + config.Opts->SetFreeArgsNum(1); + SetFreeArgTitle(0, "<topic-path>", "Topic from which consumer will be dropped"); + } + + void TCommandTopicConsumerCommitOffset::Parse(TConfig& config) { + TYdbCommand::Parse(config); + ParseTopicName(config, 0); + } + + int TCommandTopicConsumerCommitOffset::Run(TConfig& config) { + TDriver driver = CreateDriver(config); + NYdb::NTopic::TTopicClient topicClient(driver); + + TStatus status = topicClient.CommitOffset(TopicName, PartitionId_, ConsumerName_, Offset_).GetValueSync(); + ThrowOnError(status); + return EXIT_SUCCESS; + } + void TCommandWithTransformBody::AddTransform(TClientCommand::TConfig& config) { TStringStream description; description << "Conversion between a message data in the topic and the client filesystem/terminal. Available options: "; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index 46c52fc76bd..5daa65a4a5e 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -84,6 +84,11 @@ namespace NYdb::NConsoleClient { TCommandTopicConsumer(); }; + class TCommandTopicConsumerOffset: public TClientCommandTree { + public: + TCommandTopicConsumerOffset(); + }; + class TCommandTopicConsumerAdd: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs { public: TCommandTopicConsumerAdd(); @@ -107,6 +112,20 @@ namespace NYdb::NConsoleClient { TString ConsumerName_; }; + class TCommandTopicConsumerCommitOffset: public TYdbCommand, public TCommandWithTopicName { + public: + TCommandTopicConsumerCommitOffset(); + void Config(TConfig& config) override; + void Parse(TConfig& config) override; + int Run(TConfig& config) override; + + private: + TString ConsumerName_; + ui64 PartitionId_; + ui64 Offset_; + }; + + class TCommandWithTransformBody { protected: void AddTransform(TClientCommand::TConfig& config); diff --git a/ydb/public/lib/ydb_cli/common/print_utils.cpp b/ydb/public/lib/ydb_cli/common/print_utils.cpp index 43957de6159..bcc44138416 100644 --- a/ydb/public/lib/ydb_cli/common/print_utils.cpp +++ b/ydb/public/lib/ydb_cli/common/print_utils.cpp @@ -66,6 +66,11 @@ TString FormatTime(TInstant time) { } }; +TString FormatDuration(TDuration duration) { + return Sprintf("%.02f seconds",(duration.MilliSeconds() * 0.001)); +}; + + TString EntryTypeToString(NScheme::ESchemeEntryType entry) { switch (entry) { case NScheme::ESchemeEntryType::Directory: diff --git a/ydb/public/lib/ydb_cli/common/print_utils.h b/ydb/public/lib/ydb_cli/common/print_utils.h index 79cd1162d0a..0b5bfa81ad5 100644 --- a/ydb/public/lib/ydb_cli/common/print_utils.h +++ b/ydb/public/lib/ydb_cli/common/print_utils.h @@ -8,6 +8,7 @@ namespace NConsoleClient { void PrintSchemeEntry(IOutputStream& o, const NScheme::TSchemeEntry& entry, NColorizer::TColors colors); TString FormatTime(TInstant time); +TString FormatDuration(TDuration duration); TString PrettySize(size_t size); TString EntryTypeToString(NScheme::ESchemeEntryType entry); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index bfa05a487c8..4b02afae62c 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -45,6 +45,7 @@ TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result) , PartitionWriteSpeedBytesPerSecond_(Proto_.partition_write_speed_bytes_per_second()) , PartitionWriteBurstBytes_(Proto_.partition_write_burst_bytes()) , MeteringMode_(TProtoAccessor::FromProto(Proto_.metering_mode())) + , TopicStats_(Proto_.topic_stats()) { Owner_ = Proto_.self().owner(); PermissionToSchemeEntry(Proto_.self().permissions(), &Permissions_); @@ -176,6 +177,10 @@ const TString& TTopicDescription::GetOwner() const { return Owner_; } +const TTopicStats& TTopicDescription::GetTopicStats() const { + return TopicStats_; +} + const TVector<NScheme::TPermissions>& TTopicDescription::GetPermissions() const { return Permissions_; } @@ -197,9 +202,51 @@ ui64 TPartitioningSettings::GetPartitionCountLimit() const { return PartitionCountLimit_; } +TTopicStats::TTopicStats(const Ydb::Topic::DescribeTopicResult::TopicStats& topicStats) + : StoreSizeBytes_(topicStats.store_size_bytes()) + , MinLastWriteTime_(TInstant::Seconds(topicStats.min_last_write_time().seconds())) + , MaxWriteTimeLag_(TDuration::Seconds(topicStats.max_write_time_lag().seconds()) + TDuration::MicroSeconds(topicStats.max_write_time_lag().nanos() / 1000)) + , BytesWrittenPerMinute_(topicStats.bytes_written().per_minute()) + , BytesWrittenPerHour_(topicStats.bytes_written().per_hour()) + , BytesWrittenPerDay_(topicStats.bytes_written().per_day()) +{ +} + +ui64 TTopicStats::GetStoreSizeBytes() const { + return StoreSizeBytes_; +} + +TInstant TTopicStats::GetMinLastWriteTime() const { + return MinLastWriteTime_; +} + +TDuration TTopicStats::GetMaxWriteTimeLag() const { + return MaxWriteTimeLag_; +} + +ui64 TTopicStats::GetBytesWrittenPerMinute() const { + return BytesWrittenPerMinute_; +} + +ui64 TTopicStats::GetBytesWrittenPerHour() const { + return BytesWrittenPerHour_; +} + +ui64 TTopicStats::GetBytesWrittenPerDay() const { + return BytesWrittenPerDay_; +} + + TPartitionStats::TPartitionStats(const Ydb::Topic::PartitionStats& partitionStats) : StartOffset_(partitionStats.partition_offsets().start()) , EndOffset_(partitionStats.partition_offsets().end()) + , StoreSizeBytes_(partitionStats.store_size_bytes()) + , LastWriteTime_(TInstant::Seconds(partitionStats.last_write_time().seconds())) + , MaxWriteTimeLag_(TDuration::Seconds(partitionStats.max_write_time_lag().seconds()) + TDuration::MicroSeconds(partitionStats.max_write_time_lag().nanos() / 1000)) + , BytesWrittenPerMinute_(partitionStats.bytes_written().per_minute()) + , BytesWrittenPerHour_(partitionStats.bytes_written().per_hour()) + , BytesWrittenPerDay_(partitionStats.bytes_written().per_day()) + {} ui64 TPartitionStats::GetStartOffset() const { @@ -210,6 +257,31 @@ ui64 TPartitionStats::GetEndOffset() const { return EndOffset_; } +ui64 TPartitionStats::GetStoreSizeBytes() const { + return StoreSizeBytes_; +} + +TInstant TPartitionStats::GetLastWriteTime() const { + return LastWriteTime_; +} + +TDuration TPartitionStats::GetMaxWriteTimeLag() const { + return MaxWriteTimeLag_; +} + +ui64 TPartitionStats::GetBytesWrittenPerMinute() const { + return BytesWrittenPerMinute_; +} + +ui64 TPartitionStats::GetBytesWrittenPerHour() const { + return BytesWrittenPerHour_; +} + +ui64 TPartitionStats::GetBytesWrittenPerDay() const { + return BytesWrittenPerDay_; +} + + TPartitionConsumerStats::TPartitionConsumerStats(const Ydb::Topic::DescribeConsumerResult::PartitionConsumerStats& partitionStats) : CommittedOffset_(partitionStats.committed_offset()) {} diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index e857f9ce4ae..4ca694b04cd 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -62,15 +62,51 @@ private: TVector<ECodec> SupportedCodecs_; }; + +class TTopicStats { +public: + TTopicStats(const Ydb::Topic::DescribeTopicResult::TopicStats& topicStats); + + ui64 GetStoreSizeBytes() const; + TDuration GetMaxWriteTimeLag() const; + TInstant GetMinLastWriteTime() const; + ui64 GetBytesWrittenPerMinute() const; + ui64 GetBytesWrittenPerHour() const; + ui64 GetBytesWrittenPerDay() const; + +private: + ui64 StoreSizeBytes_; + TInstant MinLastWriteTime_; + TDuration MaxWriteTimeLag_; + ui64 BytesWrittenPerMinute_; + ui64 BytesWrittenPerHour_; + ui64 BytesWrittenPerDay_; +}; + + class TPartitionStats { public: TPartitionStats(const Ydb::Topic::PartitionStats& partitionStats); ui64 GetStartOffset() const; ui64 GetEndOffset() const; + ui64 GetStoreSizeBytes() const; + TDuration GetMaxWriteTimeLag() const; + TInstant GetLastWriteTime() const; + ui64 GetBytesWrittenPerMinute() const; + ui64 GetBytesWrittenPerHour() const; + ui64 GetBytesWrittenPerDay() const; + private: ui64 StartOffset_; ui64 EndOffset_; + ui64 StoreSizeBytes_; + TInstant LastWriteTime_; + TDuration MaxWriteTimeLag_; + ui64 BytesWrittenPerMinute_; + ui64 BytesWrittenPerHour_; + ui64 BytesWrittenPerDay_; + }; class TPartitionConsumerStats { @@ -152,6 +188,8 @@ public: EMeteringMode GetMeteringMode() const; + const TTopicStats& GetTopicStats() const; + void SerializeTo(Ydb::Topic::CreateTopicRequest& request) const; private: @@ -169,6 +207,7 @@ private: EMeteringMode MeteringMode_; TMap<TString, TString> Attributes_; TVector<TConsumer> Consumers_; + TTopicStats TopicStats_; TString Owner_; TVector<NScheme::TPermissions> Permissions_; |