diff options
author | shmel1k <shmel1k@ydb.tech> | 2022-08-15 18:40:07 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2022-08-15 18:40:07 +0300 |
commit | 7d61eb045d194499b22019bae65d1d9bfb3f5128 (patch) | |
tree | 01a5b6ceeb26d700621d2ff75b5c3bf2bc71fa9d | |
parent | 34207d58634f03de42cd726816d6598bd719600d (diff) | |
download | ydb-7d61eb045d194499b22019bae65d1d9bfb3f5128.tar.gz |
[] improved 'topic read' documentation
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp | 98 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_topic.h | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_read.cpp | 20 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_read.h | 6 |
5 files changed, 65 insertions, 63 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index 9766a08ff8..39b0bc6c35 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -29,39 +29,39 @@ namespace NYdb::NConsoleClient { }; // TODO(shmel1k@): improve docs - THashMap<EStreamMetadataField, TString> StreamMetadataFieldsDescriptions = { - {EStreamMetadataField::Body, "message content"}, - {EStreamMetadataField::WriteTime, "message write time"}, - {EStreamMetadataField::CreateTime, "message creation time"}, - {EStreamMetadataField::MessageGroupID, "message group id"}, - {EStreamMetadataField::Offset, "offset"}, - {EStreamMetadataField::SeqNo, "seqno"}, - {EStreamMetadataField::Meta, "meta"}, + THashMap<ETopicMetadataField, TString> TopicMetadataFieldsDescriptions = { + {ETopicMetadataField::Body, "Message data"}, + {ETopicMetadataField::WriteTime, "Message write time. This time defines the UNIX timestamp the messages was written to server"}, + {ETopicMetadataField::CreateTime, "Message creation time. This time defines the UNIX timestamp the message was created by client"}, + {ETopicMetadataField::MessageGroupID, "Message group id. This identifier is used to stick to concrete partition using round-robin algorithm. All messages with the same message group id are guaranteed to be read in FIFO order"}, + {ETopicMetadataField::Offset, "Message offset. Offset defines unique message number in his partition"}, + {ETopicMetadataField::SeqNo, "Message sequence number, which is used for message deduplication"}, + {ETopicMetadataField::Meta, "Message additional metadata"}, }; - const TVector<EStreamMetadataField> AllStreamMetadataFields = { - EStreamMetadataField::Body, - EStreamMetadataField::WriteTime, - EStreamMetadataField::CreateTime, - EStreamMetadataField::MessageGroupID, - EStreamMetadataField::Offset, - EStreamMetadataField::SeqNo, - EStreamMetadataField::Meta, + const TVector<ETopicMetadataField> AllTopicMetadataFields = { + ETopicMetadataField::Body, + ETopicMetadataField::WriteTime, + ETopicMetadataField::CreateTime, + ETopicMetadataField::MessageGroupID, + ETopicMetadataField::Offset, + ETopicMetadataField::SeqNo, + ETopicMetadataField::Meta, }; - const THashMap<TString, EStreamMetadataField> StreamMetadataFieldsMap = { - {"body", EStreamMetadataField::Body}, - {"write_time", EStreamMetadataField::WriteTime}, - {"create_time", EStreamMetadataField::CreateTime}, - {"message_group_id", EStreamMetadataField::MessageGroupID}, - {"offset", EStreamMetadataField::Offset}, - {"seq_no", EStreamMetadataField::SeqNo}, - {"meta", EStreamMetadataField::Meta}, + const THashMap<TString, ETopicMetadataField> TopicMetadataFieldsMap = { + {"body", ETopicMetadataField::Body}, + {"write_time", ETopicMetadataField::WriteTime}, + {"create_time", ETopicMetadataField::CreateTime}, + {"message_group_id", ETopicMetadataField::MessageGroupID}, + {"offset", ETopicMetadataField::Offset}, + {"seq_no", ETopicMetadataField::SeqNo}, + {"meta", ETopicMetadataField::Meta}, }; THashMap<ETransformBody, TString> TransformBodyDescriptions = { - {ETransformBody::None, "do not transform body to any format"}, - {ETransformBody::Base64, "transform body to base64 format"}, + {ETransformBody::None, "Do not transform body to any format"}, + {ETransformBody::Base64, "Transform body to base64 format"}, }; constexpr TDuration DefaultIdleTimeout = TDuration::Seconds(1); @@ -258,7 +258,7 @@ namespace NYdb::NConsoleClient { } void TCommandTopicConsumerAdd::Config(TConfig& config) { - TYdbCommand::Parse(config); + TYdbCommand::Config(config); config.Opts->AddLongOption("consumer-name", "New consumer for topic") .Required() .StoreResult(&ConsumerName_); @@ -343,7 +343,7 @@ namespace NYdb::NConsoleClient { TStringStream description; description << "Comma-separated list of message fields to print. Available fields: "; NColorizer::TColors colors = NColorizer::AutoColors(Cout); - for (const auto& iter : StreamMetadataFieldsDescriptions) { + for (const auto& iter : TopicMetadataFieldsDescriptions) { description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second; } @@ -368,7 +368,7 @@ namespace NYdb::NConsoleClient { void TCommandTopicRead::Config(TConfig& config) { TYdbCommand::Config(config); config.Opts->SetFreeArgsNum(1); - SetFreeArgTitle(0, "<stream-path>", "Stream to read data"); + SetFreeArgTitle(0, "<topic-path>", "Topic to read data"); AddFormats(config, { EOutputFormat::Pretty, @@ -389,15 +389,16 @@ namespace NYdb::NConsoleClient { config.Opts->AddLongOption('f', "file", "File to write data to") .Optional() .StoreResult(&File_); - config.Opts->AddLongOption("flush-duration", "Duration for message flushing") - .Optional() - .StoreResult(&FlushDuration_); - config.Opts->AddLongOption("flush-size", "Maximum flush size") // TODO(shmel1k@): improve - .Optional() - .StoreResult(&FlushSize_); - config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve - .Optional() - .StoreResult(&FlushMessagesCount_); +// NOTE(shmel1k@): temporary disabled options +// config.Opts->AddLongOption("flush-duration", "Duration for message flushing") +// .Optional() +// .StoreResult(&FlushDuration_); +// config.Opts->AddLongOption("flush-size", "Maximum flush size") // TODO(shmel1k@): improve +// .Optional() +// .StoreResult(&FlushSize_); +// config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve +// .Optional() +// .StoreResult(&FlushMessagesCount_); config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages") .Optional() .DefaultValue(DefaultIdleTimeout) @@ -406,9 +407,10 @@ namespace NYdb::NConsoleClient { .Optional() .DefaultValue(true) .StoreResult(&Commit_); - config.Opts->AddLongOption("message-size-limit", "Maximum message size") - .Optional() - .StoreResult(&MessageSizeLimit_); +// NOTE(shmel1k@): temporary disabled option +// config.Opts->AddLongOption("message-size-limit", "Maximum message size") +// .Optional() +// .StoreResult(&MessageSizeLimit_); config.Opts->AddLongOption("discard-above-limits", "Do not print messages with size more than defined in 'message-size-limit' option") .Optional() .StoreResult(&DiscardAboveLimits_); @@ -428,7 +430,7 @@ namespace NYdb::NConsoleClient { } void TCommandTopicRead::ParseMetadataFields() { - MetadataFields_ = AllStreamMetadataFields; + MetadataFields_ = AllTopicMetadataFields; // TODO(shmel1k@): discuss: disable all fields? if (WithMetadataFields_ == "all") { @@ -440,21 +442,21 @@ namespace NYdb::NConsoleClient { return; } - TSet<EStreamMetadataField> set; + TSet<ETopicMetadataField> set; for (const auto& field : split) { - auto f = StreamMetadataFieldsMap.find(field); - if (f == StreamMetadataFieldsMap.end()) { + auto f = TopicMetadataFieldsMap.find(field); + if (f == TopicMetadataFieldsMap.end()) { throw TMisuseException() << "Field " << field << " not found in available fields"; // TODO(shmel1k@): improve message. } set.insert(f->second); } - TVector<EStreamMetadataField> result; + TVector<ETopicMetadataField> result; result.reserve(set.size()); // NOTE(shmel1k@): preserving the order from AllMetadataFields for (const auto metadataField : set) { - auto f = std::find(AllStreamMetadataFields.begin(), AllStreamMetadataFields.end(), metadataField); - if (f == AllStreamMetadataFields.end()) { + auto f = std::find(AllTopicMetadataFields.begin(), AllTopicMetadataFields.end(), metadataField); + if (f == AllTopicMetadataFields.end()) { continue; } result.push_back(metadataField); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index 95a3f97f11..57c2446894 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -121,7 +121,7 @@ namespace NYdb::NConsoleClient { TDuration IdleTimeout_; TString WithMetadataFields_ = "all"; // TODO(shmel1k@): improve. - TVector<EStreamMetadataField> MetadataFields_; + TVector<ETopicMetadataField> MetadataFields_; TMaybe<ui64> MessageSizeLimit_; TMaybe<i64> Limit_ = Nothing(); diff --git a/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h b/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h index 7ecc1bbda0..4347c35ede 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h +++ b/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h @@ -3,7 +3,7 @@ #include <util/string/vector.h> namespace NYdb::NConsoleClient { - enum class EStreamMetadataField { + enum class ETopicMetadataField { Body /* "body" */, WriteTime /* "write_time" */, CreateTime /* "create_time" */, diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp index 34a213b6f9..734b3c99fc 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_read.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp @@ -25,7 +25,7 @@ namespace NYdb::NConsoleClient { bool commit, bool wait, EOutputFormat format, - TVector<EStreamMetadataField> metadataFields, + TVector<ETopicMetadataField> metadataFields, ETransformBody transform, TDuration idleTimeout) : MetadataFields_(metadataFields) @@ -83,9 +83,9 @@ namespace NYdb::NConsoleClient { using TReceivedMessage = NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage; - void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, EOutputFormat format, ETransformBody transform, EStreamMetadataField f, size_t idx) { + void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, EOutputFormat format, ETransformBody transform, ETopicMetadataField f, size_t idx) { switch (f) { - case EStreamMetadataField::Body: + case ETopicMetadataField::Body: if (format == EOutputFormat::PrettyBase64) { row.Column(idx, FormatBody(message.GetData(), transform)); } @@ -97,22 +97,22 @@ namespace NYdb::NConsoleClient { } break; - case EStreamMetadataField::CreateTime: + case ETopicMetadataField::CreateTime: row.Column(idx, message.GetCreateTime()); break; - case EStreamMetadataField::MessageGroupID: + case ETopicMetadataField::MessageGroupID: row.Column(idx, message.GetMessageGroupId()); break; - case EStreamMetadataField::Offset: + case ETopicMetadataField::Offset: row.Column(idx, message.GetOffset()); break; - case EStreamMetadataField::WriteTime: + case ETopicMetadataField::WriteTime: row.Column(idx, message.GetWriteTime()); // improve for pretty break; - case EStreamMetadataField::SeqNo: + case ETopicMetadataField::SeqNo: row.Column(idx, message.GetSeqNo()); break; - case EStreamMetadataField::Meta: + case ETopicMetadataField::Meta: NJson::TJsonValue json; for (auto const& [k, v] : message.GetMeta()->Fields) { json[k] = v; @@ -127,7 +127,7 @@ namespace NYdb::NConsoleClient { for (const auto& message : ReceivedMessages_) { TPrettyTable::TRow& row = OutputTable_->AddRow(); for (size_t i = 0; i < ReaderParams_.MetadataFields().size(); ++i) { - EStreamMetadataField f = ReaderParams_.MetadataFields()[i]; + ETopicMetadataField f = ReaderParams_.MetadataFields()[i]; AddMetadataFieldToRow(row, message, ReaderParams_.OutputFormat(), ReaderParams_.Transform(), f, i); } } diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h index f3297007bd..5afdb6d9f5 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_read.h +++ b/ydb/public/lib/ydb_cli/topic/topic_read.h @@ -21,7 +21,7 @@ namespace NYdb::NConsoleClient { bool commit, bool wait, EOutputFormat format, - TVector<EStreamMetadataField> metadataFields, + TVector<ETopicMetadataField> metadataFields, ETransformBody transform, TDuration idleTimeout); @@ -29,7 +29,7 @@ namespace NYdb::NConsoleClient { TTopicReaderSettings(const TTopicReaderSettings&) = default; TTopicReaderSettings(TTopicReaderSettings&&) = default; - GETTER(TVector<EStreamMetadataField>, MetadataFields); + GETTER(TVector<ETopicMetadataField>, MetadataFields); GETTER(bool, Commit); GETTER(TMaybe<i64>, Limit); GETTER(bool, Wait); @@ -39,7 +39,7 @@ namespace NYdb::NConsoleClient { // TODO(shmel1k@): add batching settings. private: - TVector<EStreamMetadataField> MetadataFields_; + TVector<ETopicMetadataField> MetadataFields_; TMaybe<TDuration> FlushDuration_; TMaybe<int> FlushSize_; TMaybe<int> FlushMessagesCount_; |