aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2022-08-15 18:40:07 +0300
committershmel1k <shmel1k@ydb.tech>2022-08-15 18:40:07 +0300
commit7d61eb045d194499b22019bae65d1d9bfb3f5128 (patch)
tree01a5b6ceeb26d700621d2ff75b5c3bf2bc71fa9d
parent34207d58634f03de42cd726816d6598bd719600d (diff)
downloadydb-7d61eb045d194499b22019bae65d1d9bfb3f5128.tar.gz
[] improved 'topic read' documentation
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp98
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.h2
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h2
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.cpp20
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.h6
5 files changed, 65 insertions, 63 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
index 9766a08ff8..39b0bc6c35 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
@@ -29,39 +29,39 @@ namespace NYdb::NConsoleClient {
};
// TODO(shmel1k@): improve docs
- THashMap<EStreamMetadataField, TString> StreamMetadataFieldsDescriptions = {
- {EStreamMetadataField::Body, "message content"},
- {EStreamMetadataField::WriteTime, "message write time"},
- {EStreamMetadataField::CreateTime, "message creation time"},
- {EStreamMetadataField::MessageGroupID, "message group id"},
- {EStreamMetadataField::Offset, "offset"},
- {EStreamMetadataField::SeqNo, "seqno"},
- {EStreamMetadataField::Meta, "meta"},
+ THashMap<ETopicMetadataField, TString> TopicMetadataFieldsDescriptions = {
+ {ETopicMetadataField::Body, "Message data"},
+ {ETopicMetadataField::WriteTime, "Message write time. This time defines the UNIX timestamp the messages was written to server"},
+ {ETopicMetadataField::CreateTime, "Message creation time. This time defines the UNIX timestamp the message was created by client"},
+ {ETopicMetadataField::MessageGroupID, "Message group id. This identifier is used to stick to concrete partition using round-robin algorithm. All messages with the same message group id are guaranteed to be read in FIFO order"},
+ {ETopicMetadataField::Offset, "Message offset. Offset defines unique message number in his partition"},
+ {ETopicMetadataField::SeqNo, "Message sequence number, which is used for message deduplication"},
+ {ETopicMetadataField::Meta, "Message additional metadata"},
};
- const TVector<EStreamMetadataField> AllStreamMetadataFields = {
- EStreamMetadataField::Body,
- EStreamMetadataField::WriteTime,
- EStreamMetadataField::CreateTime,
- EStreamMetadataField::MessageGroupID,
- EStreamMetadataField::Offset,
- EStreamMetadataField::SeqNo,
- EStreamMetadataField::Meta,
+ const TVector<ETopicMetadataField> AllTopicMetadataFields = {
+ ETopicMetadataField::Body,
+ ETopicMetadataField::WriteTime,
+ ETopicMetadataField::CreateTime,
+ ETopicMetadataField::MessageGroupID,
+ ETopicMetadataField::Offset,
+ ETopicMetadataField::SeqNo,
+ ETopicMetadataField::Meta,
};
- const THashMap<TString, EStreamMetadataField> StreamMetadataFieldsMap = {
- {"body", EStreamMetadataField::Body},
- {"write_time", EStreamMetadataField::WriteTime},
- {"create_time", EStreamMetadataField::CreateTime},
- {"message_group_id", EStreamMetadataField::MessageGroupID},
- {"offset", EStreamMetadataField::Offset},
- {"seq_no", EStreamMetadataField::SeqNo},
- {"meta", EStreamMetadataField::Meta},
+ const THashMap<TString, ETopicMetadataField> TopicMetadataFieldsMap = {
+ {"body", ETopicMetadataField::Body},
+ {"write_time", ETopicMetadataField::WriteTime},
+ {"create_time", ETopicMetadataField::CreateTime},
+ {"message_group_id", ETopicMetadataField::MessageGroupID},
+ {"offset", ETopicMetadataField::Offset},
+ {"seq_no", ETopicMetadataField::SeqNo},
+ {"meta", ETopicMetadataField::Meta},
};
THashMap<ETransformBody, TString> TransformBodyDescriptions = {
- {ETransformBody::None, "do not transform body to any format"},
- {ETransformBody::Base64, "transform body to base64 format"},
+ {ETransformBody::None, "Do not transform body to any format"},
+ {ETransformBody::Base64, "Transform body to base64 format"},
};
constexpr TDuration DefaultIdleTimeout = TDuration::Seconds(1);
@@ -258,7 +258,7 @@ namespace NYdb::NConsoleClient {
}
void TCommandTopicConsumerAdd::Config(TConfig& config) {
- TYdbCommand::Parse(config);
+ TYdbCommand::Config(config);
config.Opts->AddLongOption("consumer-name", "New consumer for topic")
.Required()
.StoreResult(&ConsumerName_);
@@ -343,7 +343,7 @@ namespace NYdb::NConsoleClient {
TStringStream description;
description << "Comma-separated list of message fields to print. Available fields: ";
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
- for (const auto& iter : StreamMetadataFieldsDescriptions) {
+ for (const auto& iter : TopicMetadataFieldsDescriptions) {
description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second;
}
@@ -368,7 +368,7 @@ namespace NYdb::NConsoleClient {
void TCommandTopicRead::Config(TConfig& config) {
TYdbCommand::Config(config);
config.Opts->SetFreeArgsNum(1);
- SetFreeArgTitle(0, "<stream-path>", "Stream to read data");
+ SetFreeArgTitle(0, "<topic-path>", "Topic to read data");
AddFormats(config, {
EOutputFormat::Pretty,
@@ -389,15 +389,16 @@ namespace NYdb::NConsoleClient {
config.Opts->AddLongOption('f', "file", "File to write data to")
.Optional()
.StoreResult(&File_);
- config.Opts->AddLongOption("flush-duration", "Duration for message flushing")
- .Optional()
- .StoreResult(&FlushDuration_);
- config.Opts->AddLongOption("flush-size", "Maximum flush size") // TODO(shmel1k@): improve
- .Optional()
- .StoreResult(&FlushSize_);
- config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve
- .Optional()
- .StoreResult(&FlushMessagesCount_);
+// NOTE(shmel1k@): temporary disabled options
+// config.Opts->AddLongOption("flush-duration", "Duration for message flushing")
+// .Optional()
+// .StoreResult(&FlushDuration_);
+// config.Opts->AddLongOption("flush-size", "Maximum flush size") // TODO(shmel1k@): improve
+// .Optional()
+// .StoreResult(&FlushSize_);
+// config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve
+// .Optional()
+// .StoreResult(&FlushMessagesCount_);
config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages")
.Optional()
.DefaultValue(DefaultIdleTimeout)
@@ -406,9 +407,10 @@ namespace NYdb::NConsoleClient {
.Optional()
.DefaultValue(true)
.StoreResult(&Commit_);
- config.Opts->AddLongOption("message-size-limit", "Maximum message size")
- .Optional()
- .StoreResult(&MessageSizeLimit_);
+// NOTE(shmel1k@): temporary disabled option
+// config.Opts->AddLongOption("message-size-limit", "Maximum message size")
+// .Optional()
+// .StoreResult(&MessageSizeLimit_);
config.Opts->AddLongOption("discard-above-limits", "Do not print messages with size more than defined in 'message-size-limit' option")
.Optional()
.StoreResult(&DiscardAboveLimits_);
@@ -428,7 +430,7 @@ namespace NYdb::NConsoleClient {
}
void TCommandTopicRead::ParseMetadataFields() {
- MetadataFields_ = AllStreamMetadataFields;
+ MetadataFields_ = AllTopicMetadataFields;
// TODO(shmel1k@): discuss: disable all fields?
if (WithMetadataFields_ == "all") {
@@ -440,21 +442,21 @@ namespace NYdb::NConsoleClient {
return;
}
- TSet<EStreamMetadataField> set;
+ TSet<ETopicMetadataField> set;
for (const auto& field : split) {
- auto f = StreamMetadataFieldsMap.find(field);
- if (f == StreamMetadataFieldsMap.end()) {
+ auto f = TopicMetadataFieldsMap.find(field);
+ if (f == TopicMetadataFieldsMap.end()) {
throw TMisuseException() << "Field " << field << " not found in available fields"; // TODO(shmel1k@): improve message.
}
set.insert(f->second);
}
- TVector<EStreamMetadataField> result;
+ TVector<ETopicMetadataField> result;
result.reserve(set.size());
// NOTE(shmel1k@): preserving the order from AllMetadataFields
for (const auto metadataField : set) {
- auto f = std::find(AllStreamMetadataFields.begin(), AllStreamMetadataFields.end(), metadataField);
- if (f == AllStreamMetadataFields.end()) {
+ auto f = std::find(AllTopicMetadataFields.begin(), AllTopicMetadataFields.end(), metadataField);
+ if (f == AllTopicMetadataFields.end()) {
continue;
}
result.push_back(metadataField);
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
index 95a3f97f11..57c2446894 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
@@ -121,7 +121,7 @@ namespace NYdb::NConsoleClient {
TDuration IdleTimeout_;
TString WithMetadataFields_ = "all"; // TODO(shmel1k@): improve.
- TVector<EStreamMetadataField> MetadataFields_;
+ TVector<ETopicMetadataField> MetadataFields_;
TMaybe<ui64> MessageSizeLimit_;
TMaybe<i64> Limit_ = Nothing();
diff --git a/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h b/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h
index 7ecc1bbda0..4347c35ede 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h
+++ b/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h
@@ -3,7 +3,7 @@
#include <util/string/vector.h>
namespace NYdb::NConsoleClient {
- enum class EStreamMetadataField {
+ enum class ETopicMetadataField {
Body /* "body" */,
WriteTime /* "write_time" */,
CreateTime /* "create_time" */,
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
index 34a213b6f9..734b3c99fc 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_read.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
@@ -25,7 +25,7 @@ namespace NYdb::NConsoleClient {
bool commit,
bool wait,
EOutputFormat format,
- TVector<EStreamMetadataField> metadataFields,
+ TVector<ETopicMetadataField> metadataFields,
ETransformBody transform,
TDuration idleTimeout)
: MetadataFields_(metadataFields)
@@ -83,9 +83,9 @@ namespace NYdb::NConsoleClient {
using TReceivedMessage = NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage;
- void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, EOutputFormat format, ETransformBody transform, EStreamMetadataField f, size_t idx) {
+ void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, EOutputFormat format, ETransformBody transform, ETopicMetadataField f, size_t idx) {
switch (f) {
- case EStreamMetadataField::Body:
+ case ETopicMetadataField::Body:
if (format == EOutputFormat::PrettyBase64) {
row.Column(idx, FormatBody(message.GetData(), transform));
}
@@ -97,22 +97,22 @@ namespace NYdb::NConsoleClient {
}
break;
- case EStreamMetadataField::CreateTime:
+ case ETopicMetadataField::CreateTime:
row.Column(idx, message.GetCreateTime());
break;
- case EStreamMetadataField::MessageGroupID:
+ case ETopicMetadataField::MessageGroupID:
row.Column(idx, message.GetMessageGroupId());
break;
- case EStreamMetadataField::Offset:
+ case ETopicMetadataField::Offset:
row.Column(idx, message.GetOffset());
break;
- case EStreamMetadataField::WriteTime:
+ case ETopicMetadataField::WriteTime:
row.Column(idx, message.GetWriteTime()); // improve for pretty
break;
- case EStreamMetadataField::SeqNo:
+ case ETopicMetadataField::SeqNo:
row.Column(idx, message.GetSeqNo());
break;
- case EStreamMetadataField::Meta:
+ case ETopicMetadataField::Meta:
NJson::TJsonValue json;
for (auto const& [k, v] : message.GetMeta()->Fields) {
json[k] = v;
@@ -127,7 +127,7 @@ namespace NYdb::NConsoleClient {
for (const auto& message : ReceivedMessages_) {
TPrettyTable::TRow& row = OutputTable_->AddRow();
for (size_t i = 0; i < ReaderParams_.MetadataFields().size(); ++i) {
- EStreamMetadataField f = ReaderParams_.MetadataFields()[i];
+ ETopicMetadataField f = ReaderParams_.MetadataFields()[i];
AddMetadataFieldToRow(row, message, ReaderParams_.OutputFormat(), ReaderParams_.Transform(), f, i);
}
}
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h
index f3297007bd..5afdb6d9f5 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_read.h
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.h
@@ -21,7 +21,7 @@ namespace NYdb::NConsoleClient {
bool commit,
bool wait,
EOutputFormat format,
- TVector<EStreamMetadataField> metadataFields,
+ TVector<ETopicMetadataField> metadataFields,
ETransformBody transform,
TDuration idleTimeout);
@@ -29,7 +29,7 @@ namespace NYdb::NConsoleClient {
TTopicReaderSettings(const TTopicReaderSettings&) = default;
TTopicReaderSettings(TTopicReaderSettings&&) = default;
- GETTER(TVector<EStreamMetadataField>, MetadataFields);
+ GETTER(TVector<ETopicMetadataField>, MetadataFields);
GETTER(bool, Commit);
GETTER(TMaybe<i64>, Limit);
GETTER(bool, Wait);
@@ -39,7 +39,7 @@ namespace NYdb::NConsoleClient {
// TODO(shmel1k@): add batching settings.
private:
- TVector<EStreamMetadataField> MetadataFields_;
+ TVector<ETopicMetadataField> MetadataFields_;
TMaybe<TDuration> FlushDuration_;
TMaybe<int> FlushSize_;
TMaybe<int> FlushMessagesCount_;