aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2023-03-31 13:37:12 +0300
committeralexnick <alexnick@ydb.tech>2023-03-31 13:37:12 +0300
commit4f0f8b51b11c0b0da236b27a4b4d91dd8c6e7ffb (patch)
treec6a88397d4cb25ed311deaa6721bdd2848441a69
parente7e32304b29c9377e76848838fd9e8efb6e1689a (diff)
downloadydb-4f0f8b51b11c0b0da236b27a4b4d91dd8c6e7ffb.tar.gz
full topic describe
full topic describe
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp80
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h2
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp44
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.h19
-rw-r--r--ydb/public/lib/ydb_cli/common/print_utils.cpp5
-rw-r--r--ydb/public/lib/ydb_cli/common/print_utils.h1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp72
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h39
8 files changed, 252 insertions, 10 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
index bbaf60f7e55..41440aa6162 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
@@ -144,8 +144,8 @@ void TCommandDescribe::Config(TConfig& config) {
// Table options
config.Opts->AddLongOption("partition-boundaries", "[Table] Show partition key boundaries").StoreTrue(&ShowKeyShardBoundaries)
.AddLongName("shard-boundaries");
- config.Opts->AddLongOption("stats", "[Table] Show table statistics").StoreTrue(&ShowTableStats);
- config.Opts->AddLongOption("partition-stats", "[Table] Show partition statistics").StoreTrue(&ShowPartitionStats);
+ config.Opts->AddLongOption("stats", "[Table|Topic] Show table/topic statistics").StoreTrue(&ShowStats);
+ config.Opts->AddLongOption("partition-stats", "[Table|Topic] Show partition statistics").StoreTrue(&ShowPartitionStats);
AddDeprecatedJsonOption(config, "(Deprecated, will be removed soon. Use --format option instead) [Table] Output in json format");
AddFormats(config, { EOutputFormat::Pretty, EOutputFormat::ProtoJsonBase64 });
@@ -213,6 +213,56 @@ namespace {
}
}
+namespace {
+
+ void PrintStatistics(const NTopic::TTopicDescription& topicDescription) {
+ Cout << Endl << "Topic stats:" << Endl;
+ auto& topicStats = topicDescription.GetTopicStats();
+ Cout << "Approximate size of topic: " << PrettySize(topicStats.GetStoreSizeBytes()) << Endl;
+ Cout << "Max partitions write time lag: " << FormatDuration(topicStats.GetMaxWriteTimeLag()) << Endl;
+ Cout << "Min partitions last write time: " << FormatTime(topicStats.GetMinLastWriteTime()) << Endl;
+ Cout << "Written size per minute: " << PrettySize(topicStats.GetBytesWrittenPerMinute()) << Endl;
+ Cout << "Written size per hour: " << PrettySize(topicStats.GetBytesWrittenPerHour()) << Endl;
+ Cout << "Written size per day: " << PrettySize(topicStats.GetBytesWrittenPerDay()) << Endl;
+
+ }
+
+ void PrintPartitionStatistics(const NTopic::TTopicDescription& topicDescription) {
+ Cout << Endl << "Topic partitions stats:" << Endl;
+
+ TVector<TString> columnNames = { "#" };
+ columnNames.push_back("Active");
+ columnNames.push_back("Start offset");
+ columnNames.push_back("End offset");
+ columnNames.push_back("Size");
+ columnNames.push_back("Last write time");
+ columnNames.push_back("Max write time lag");
+ columnNames.push_back("Written size per minute");
+ columnNames.push_back("Written size per hour");
+ columnNames.push_back("Written size per day");
+
+ TPrettyTable table(columnNames);
+ for (const auto& part : topicDescription.GetPartitions()) {
+ auto& row = table.AddRow();
+ row.Column(0, part.GetPartitionId());
+ row.Column(1, part.GetActive());
+ const auto& partStats = part.GetPartitionStats();
+ if (partStats) {
+ row.Column(2, partStats->GetStartOffset());
+ row.Column(3, partStats->GetEndOffset());
+ row.Column(4, PrettySize(partStats->GetStoreSizeBytes()));
+ row.Column(5, FormatTime(partStats->GetLastWriteTime()));
+ row.Column(6, FormatDuration(partStats->GetMaxWriteTimeLag()));
+ row.Column(7, PrettySize(partStats->GetBytesWrittenPerMinute()));
+ row.Column(8, PrettySize(partStats->GetBytesWrittenPerHour()));
+ row.Column(9, PrettySize(partStats->GetBytesWrittenPerDay()));
+ }
+ }
+ Cout << table;
+ }
+
+}
+
int TCommandDescribe::PrintTopicResponsePretty(const NYdb::NTopic::TTopicDescription& description) {
Cout << Endl << "RetentionPeriod: " << description.GetRetentionPeriod().Hours() << " hours";
if (description.GetRetentionStorageMb().Defined()) {
@@ -225,7 +275,14 @@ int TCommandDescribe::PrintTopicResponsePretty(const NYdb::NTopic::TTopicDescrip
Cout << Endl << "SupportedCodecs: " << FormatCodecs(description.GetSupportedCodecs()) << Endl;
}
PrintTopicConsumers(description.GetConsumers());
- Cout << Endl;
+
+ if (ShowStats) {
+ PrintStatistics(description);
+ }
+ if (ShowPartitionStats){
+ PrintPartitionStatistics(description);
+ }
+
return EXIT_SUCCESS;
}
@@ -269,7 +326,10 @@ int TCommandDescribe::PrintTopicResponse(const NYdb::NTopic::TDescribeTopicResul
int TCommandDescribe::DescribeTopic(TDriver& driver) {
NYdb::NTopic::TTopicClient topicClient(driver);
- auto describeResult = topicClient.DescribeTopic(Path).GetValueSync();
+ NYdb::NTopic::TDescribeTopicSettings settings;
+ settings.IncludeStats(ShowStats || ShowPartitionStats);
+
+ auto describeResult = topicClient.DescribeTopic(Path, settings).GetValueSync();
ThrowOnError(describeResult);
return PrintTopicResponse(describeResult);
}
@@ -283,7 +343,7 @@ int TCommandDescribe::DescribeTable(TDriver& driver) {
FillSettings(
NTable::TDescribeTableSettings()
.WithKeyShardBoundary(ShowKeyShardBoundaries)
- .WithTableStatistics(ShowTableStats || ShowPartitionStats)
+ .WithTableStatistics(ShowStats || ShowPartitionStats)
.WithPartitionStatistics(ShowPartitionStats)
)
).GetValueSync();
@@ -299,7 +359,7 @@ int TCommandDescribe::DescribeColumnTable(TDriver& driver) {
Path,
FillSettings(
NTable::TDescribeTableSettings()
- .WithTableStatistics(ShowTableStats)
+ .WithTableStatistics(ShowStats)
)
).GetValueSync();
ThrowOnError(result);
@@ -539,6 +599,8 @@ namespace {
Cout << "Created: " << FormatTime(tableDescription.GetCreationTime()) << Endl;
}
+
+
void PrintPartitionInfo(const NTable::TTableDescription& tableDescription, bool showBoundaries, bool showStats) {
const TVector<NTable::TKeyRange>& ranges = tableDescription.GetKeyRanges();
const TVector<NTable::TPartitionStats>& stats = tableDescription.GetPartitionStats();
@@ -669,7 +731,7 @@ void TCommandDescribe::PrintResponsePretty(const NTable::TTableDescription& tabl
tableDescription.GetEffectivePermissions()
);
}
- if (ShowTableStats) {
+ if (ShowStats) {
PrintStatistics(tableDescription);
}
if (ShowKeyShardBoundaries || ShowPartitionStats) {
@@ -696,12 +758,12 @@ int TCommandDescribe::PrintResponseProtoJsonBase64(const NTable::TTableDescripti
}
void TCommandDescribe::WarnAboutTableOptions() {
- if (ShowKeyShardBoundaries || ShowTableStats || ShowPartitionStats || OutputFormat != EOutputFormat::Default) {
+ if (ShowKeyShardBoundaries || ShowStats || ShowPartitionStats || OutputFormat != EOutputFormat::Default) {
TVector<TString> options;
if (ShowKeyShardBoundaries) {
options.emplace_back("\"partition-boundaries\"(\"shard-boundaries\")");
}
- if (ShowTableStats) {
+ if (ShowStats) {
options.emplace_back("\"stats\"");
}
if (ShowPartitionStats) {
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h
index bc6f2e867c2..24f545d245f 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h
@@ -62,7 +62,7 @@ private:
bool ShowPermissions = false;
// Table options
bool ShowKeyShardBoundaries = false;
- bool ShowTableStats = false;
+ bool ShowStats = false;
bool ShowPartitionStats = false;
};
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
index 7fd4c247ff9..ea0b2d85845 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
@@ -365,8 +365,15 @@ namespace NYdb::NConsoleClient {
: TClientCommandTree("consumer", {}, "Consumer operations") {
AddCommand(std::make_unique<TCommandTopicConsumerAdd>());
AddCommand(std::make_unique<TCommandTopicConsumerDrop>());
+ AddCommand(std::make_unique<TCommandTopicConsumerOffset>());
}
+ TCommandTopicConsumerOffset::TCommandTopicConsumerOffset()
+ : TClientCommandTree("offset", {}, "Consumer offset operations") {
+ AddCommand(std::make_unique<TCommandTopicConsumerCommitOffset>());
+ }
+
+
TCommandTopicConsumerAdd::TCommandTopicConsumerAdd()
: TYdbCommand("add", {}, "Consumer add operation") {
}
@@ -443,6 +450,43 @@ namespace NYdb::NConsoleClient {
return EXIT_SUCCESS;
}
+
+ TCommandTopicConsumerCommitOffset::TCommandTopicConsumerCommitOffset()
+ : TYdbCommand("commit", {}, "Commit offset for consumer") {
+ }
+
+ void TCommandTopicConsumerCommitOffset::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+ config.Opts->AddLongOption("consumer", "Consumer which offset will be changed")
+ .Required()
+ .StoreResult(&ConsumerName_);
+
+ config.Opts->AddLongOption("partition", "Partition which offset will be changed")
+ .Required()
+ .StoreResult(&PartitionId_);
+
+ config.Opts->AddLongOption("offset", "Partition offset to be setted for desired consumer")
+ .Required()
+ .StoreResult(&Offset_);
+
+ config.Opts->SetFreeArgsNum(1);
+ SetFreeArgTitle(0, "<topic-path>", "Topic from which consumer will be dropped");
+ }
+
+ void TCommandTopicConsumerCommitOffset::Parse(TConfig& config) {
+ TYdbCommand::Parse(config);
+ ParseTopicName(config, 0);
+ }
+
+ int TCommandTopicConsumerCommitOffset::Run(TConfig& config) {
+ TDriver driver = CreateDriver(config);
+ NYdb::NTopic::TTopicClient topicClient(driver);
+
+ TStatus status = topicClient.CommitOffset(TopicName, PartitionId_, ConsumerName_, Offset_).GetValueSync();
+ ThrowOnError(status);
+ return EXIT_SUCCESS;
+ }
+
void TCommandWithTransformBody::AddTransform(TClientCommand::TConfig& config) {
TStringStream description;
description << "Conversion between a message data in the topic and the client filesystem/terminal. Available options: ";
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
index 46c52fc76bd..5daa65a4a5e 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
@@ -84,6 +84,11 @@ namespace NYdb::NConsoleClient {
TCommandTopicConsumer();
};
+ class TCommandTopicConsumerOffset: public TClientCommandTree {
+ public:
+ TCommandTopicConsumerOffset();
+ };
+
class TCommandTopicConsumerAdd: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs {
public:
TCommandTopicConsumerAdd();
@@ -107,6 +112,20 @@ namespace NYdb::NConsoleClient {
TString ConsumerName_;
};
+ class TCommandTopicConsumerCommitOffset: public TYdbCommand, public TCommandWithTopicName {
+ public:
+ TCommandTopicConsumerCommitOffset();
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
+
+ private:
+ TString ConsumerName_;
+ ui64 PartitionId_;
+ ui64 Offset_;
+ };
+
+
class TCommandWithTransformBody {
protected:
void AddTransform(TClientCommand::TConfig& config);
diff --git a/ydb/public/lib/ydb_cli/common/print_utils.cpp b/ydb/public/lib/ydb_cli/common/print_utils.cpp
index 43957de6159..bcc44138416 100644
--- a/ydb/public/lib/ydb_cli/common/print_utils.cpp
+++ b/ydb/public/lib/ydb_cli/common/print_utils.cpp
@@ -66,6 +66,11 @@ TString FormatTime(TInstant time) {
}
};
+TString FormatDuration(TDuration duration) {
+ return Sprintf("%.02f seconds",(duration.MilliSeconds() * 0.001));
+};
+
+
TString EntryTypeToString(NScheme::ESchemeEntryType entry) {
switch (entry) {
case NScheme::ESchemeEntryType::Directory:
diff --git a/ydb/public/lib/ydb_cli/common/print_utils.h b/ydb/public/lib/ydb_cli/common/print_utils.h
index 79cd1162d0a..0b5bfa81ad5 100644
--- a/ydb/public/lib/ydb_cli/common/print_utils.h
+++ b/ydb/public/lib/ydb_cli/common/print_utils.h
@@ -8,6 +8,7 @@ namespace NConsoleClient {
void PrintSchemeEntry(IOutputStream& o, const NScheme::TSchemeEntry& entry, NColorizer::TColors colors);
TString FormatTime(TInstant time);
+TString FormatDuration(TDuration duration);
TString PrettySize(size_t size);
TString EntryTypeToString(NScheme::ESchemeEntryType entry);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
index bfa05a487c8..4b02afae62c 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
@@ -45,6 +45,7 @@ TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result)
, PartitionWriteSpeedBytesPerSecond_(Proto_.partition_write_speed_bytes_per_second())
, PartitionWriteBurstBytes_(Proto_.partition_write_burst_bytes())
, MeteringMode_(TProtoAccessor::FromProto(Proto_.metering_mode()))
+ , TopicStats_(Proto_.topic_stats())
{
Owner_ = Proto_.self().owner();
PermissionToSchemeEntry(Proto_.self().permissions(), &Permissions_);
@@ -176,6 +177,10 @@ const TString& TTopicDescription::GetOwner() const {
return Owner_;
}
+const TTopicStats& TTopicDescription::GetTopicStats() const {
+ return TopicStats_;
+}
+
const TVector<NScheme::TPermissions>& TTopicDescription::GetPermissions() const {
return Permissions_;
}
@@ -197,9 +202,51 @@ ui64 TPartitioningSettings::GetPartitionCountLimit() const {
return PartitionCountLimit_;
}
+TTopicStats::TTopicStats(const Ydb::Topic::DescribeTopicResult::TopicStats& topicStats)
+ : StoreSizeBytes_(topicStats.store_size_bytes())
+ , MinLastWriteTime_(TInstant::Seconds(topicStats.min_last_write_time().seconds()))
+ , MaxWriteTimeLag_(TDuration::Seconds(topicStats.max_write_time_lag().seconds()) + TDuration::MicroSeconds(topicStats.max_write_time_lag().nanos() / 1000))
+ , BytesWrittenPerMinute_(topicStats.bytes_written().per_minute())
+ , BytesWrittenPerHour_(topicStats.bytes_written().per_hour())
+ , BytesWrittenPerDay_(topicStats.bytes_written().per_day())
+{
+}
+
+ui64 TTopicStats::GetStoreSizeBytes() const {
+ return StoreSizeBytes_;
+}
+
+TInstant TTopicStats::GetMinLastWriteTime() const {
+ return MinLastWriteTime_;
+}
+
+TDuration TTopicStats::GetMaxWriteTimeLag() const {
+ return MaxWriteTimeLag_;
+}
+
+ui64 TTopicStats::GetBytesWrittenPerMinute() const {
+ return BytesWrittenPerMinute_;
+}
+
+ui64 TTopicStats::GetBytesWrittenPerHour() const {
+ return BytesWrittenPerHour_;
+}
+
+ui64 TTopicStats::GetBytesWrittenPerDay() const {
+ return BytesWrittenPerDay_;
+}
+
+
TPartitionStats::TPartitionStats(const Ydb::Topic::PartitionStats& partitionStats)
: StartOffset_(partitionStats.partition_offsets().start())
, EndOffset_(partitionStats.partition_offsets().end())
+ , StoreSizeBytes_(partitionStats.store_size_bytes())
+ , LastWriteTime_(TInstant::Seconds(partitionStats.last_write_time().seconds()))
+ , MaxWriteTimeLag_(TDuration::Seconds(partitionStats.max_write_time_lag().seconds()) + TDuration::MicroSeconds(partitionStats.max_write_time_lag().nanos() / 1000))
+ , BytesWrittenPerMinute_(partitionStats.bytes_written().per_minute())
+ , BytesWrittenPerHour_(partitionStats.bytes_written().per_hour())
+ , BytesWrittenPerDay_(partitionStats.bytes_written().per_day())
+
{}
ui64 TPartitionStats::GetStartOffset() const {
@@ -210,6 +257,31 @@ ui64 TPartitionStats::GetEndOffset() const {
return EndOffset_;
}
+ui64 TPartitionStats::GetStoreSizeBytes() const {
+ return StoreSizeBytes_;
+}
+
+TInstant TPartitionStats::GetLastWriteTime() const {
+ return LastWriteTime_;
+}
+
+TDuration TPartitionStats::GetMaxWriteTimeLag() const {
+ return MaxWriteTimeLag_;
+}
+
+ui64 TPartitionStats::GetBytesWrittenPerMinute() const {
+ return BytesWrittenPerMinute_;
+}
+
+ui64 TPartitionStats::GetBytesWrittenPerHour() const {
+ return BytesWrittenPerHour_;
+}
+
+ui64 TPartitionStats::GetBytesWrittenPerDay() const {
+ return BytesWrittenPerDay_;
+}
+
+
TPartitionConsumerStats::TPartitionConsumerStats(const Ydb::Topic::DescribeConsumerResult::PartitionConsumerStats& partitionStats)
: CommittedOffset_(partitionStats.committed_offset())
{}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index e857f9ce4ae..4ca694b04cd 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -62,15 +62,51 @@ private:
TVector<ECodec> SupportedCodecs_;
};
+
+class TTopicStats {
+public:
+ TTopicStats(const Ydb::Topic::DescribeTopicResult::TopicStats& topicStats);
+
+ ui64 GetStoreSizeBytes() const;
+ TDuration GetMaxWriteTimeLag() const;
+ TInstant GetMinLastWriteTime() const;
+ ui64 GetBytesWrittenPerMinute() const;
+ ui64 GetBytesWrittenPerHour() const;
+ ui64 GetBytesWrittenPerDay() const;
+
+private:
+ ui64 StoreSizeBytes_;
+ TInstant MinLastWriteTime_;
+ TDuration MaxWriteTimeLag_;
+ ui64 BytesWrittenPerMinute_;
+ ui64 BytesWrittenPerHour_;
+ ui64 BytesWrittenPerDay_;
+};
+
+
class TPartitionStats {
public:
TPartitionStats(const Ydb::Topic::PartitionStats& partitionStats);
ui64 GetStartOffset() const;
ui64 GetEndOffset() const;
+ ui64 GetStoreSizeBytes() const;
+ TDuration GetMaxWriteTimeLag() const;
+ TInstant GetLastWriteTime() const;
+ ui64 GetBytesWrittenPerMinute() const;
+ ui64 GetBytesWrittenPerHour() const;
+ ui64 GetBytesWrittenPerDay() const;
+
private:
ui64 StartOffset_;
ui64 EndOffset_;
+ ui64 StoreSizeBytes_;
+ TInstant LastWriteTime_;
+ TDuration MaxWriteTimeLag_;
+ ui64 BytesWrittenPerMinute_;
+ ui64 BytesWrittenPerHour_;
+ ui64 BytesWrittenPerDay_;
+
};
class TPartitionConsumerStats {
@@ -152,6 +188,8 @@ public:
EMeteringMode GetMeteringMode() const;
+ const TTopicStats& GetTopicStats() const;
+
void SerializeTo(Ydb::Topic::CreateTopicRequest& request) const;
private:
@@ -169,6 +207,7 @@ private:
EMeteringMode MeteringMode_;
TMap<TString, TString> Attributes_;
TVector<TConsumer> Consumers_;
+ TTopicStats TopicStats_;
TString Owner_;
TVector<NScheme::TPermissions> Permissions_;