diff options
author | alexv-smirnov <alex@ydb.tech> | 2022-08-26 11:42:19 +0300 |
---|---|---|
committer | alexv-smirnov <alex@ydb.tech> | 2022-08-26 11:42:19 +0300 |
commit | a9736534c949447c5d1a9cf9c7d65b9d1fe04279 (patch) | |
tree | 5ef9cfb917b6ae2f2e3e8f4720107bea59d9ce87 | |
parent | 76a1e556daf2542e3b3d55d7971cc1ed0abb1f50 (diff) | |
download | ydb-a9736534c949447c5d1a9cf9c7d65b9d1fe04279.tar.gz |
fix ydb cli topic read help messages and formats
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp | 66 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/format.cpp | 48 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/format.h | 4 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/formats.h | 16 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_read.cpp | 49 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_read.h | 6 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp | 12 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_write.cpp | 6 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_write.h | 6 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp | 24 |
10 files changed, 127 insertions, 110 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index 58c400542cb..e70da397703 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -31,11 +31,11 @@ namespace NYdb::NConsoleClient { // TODO(shmel1k@): improve docs THashMap<ETopicMetadataField, TString> TopicMetadataFieldsDescriptions = { {ETopicMetadataField::Body, "Message data"}, - {ETopicMetadataField::WriteTime, "Message write time. This time defines the UNIX timestamp the messages was written to server"}, - {ETopicMetadataField::CreateTime, "Message creation time. This time defines the UNIX timestamp the message was created by client"}, - {ETopicMetadataField::MessageGroupID, "Message group id. This identifier is used to stick to concrete partition using round-robin algorithm. All messages with the same message group id are guaranteed to be read in FIFO order"}, - {ETopicMetadataField::Offset, "Message offset. Offset defines unique message number in his partition"}, - {ETopicMetadataField::SeqNo, "Message sequence number, which is used for message deduplication"}, + {ETopicMetadataField::WriteTime, "Message write time, a UNIX timestamp the message was written to server"}, + {ETopicMetadataField::CreateTime, "Message creation time, a UNIX timestamp provided by the publishing client"}, + {ETopicMetadataField::MessageGroupID, "Message group id. All messages with the same message group id are guaranteed to be read in FIFO order"}, + {ETopicMetadataField::Offset, "Message offset. Offset orders messages in each partition"}, + {ETopicMetadataField::SeqNo, "Message sequence number, used for message deduplication when publishing"}, {ETopicMetadataField::Meta, "Message additional metadata"}, }; @@ -60,8 +60,8 @@ namespace NYdb::NConsoleClient { }; THashMap<ETransformBody, TString> TransformBodyDescriptions = { - {ETransformBody::None, "Do not transform body to any format"}, - {ETransformBody::Base64, "Transform body to base64 format"}, + {ETransformBody::None, "No conversions, binary data on the client is exactly the same as it is in the topic message."}, + {ETransformBody::Base64, "Message on the client is a base64-encoded representation of the topic message."}, }; constexpr TDuration DefaultIdleTimeout = TDuration::Seconds(1); @@ -336,12 +336,12 @@ namespace NYdb::NConsoleClient { } TCommandTopicRead::TCommandTopicRead() - : TYdbCommand("read", {}, "Read from topic command") { + : TYdbCommand("read", {}, "Read from a topic to the client filesystem or terminal") { } void TCommandTopicRead::AddAllowedMetadataFields(TConfig& config) { TStringStream description; - description << "Comma-separated list of message fields to print. Available fields: "; + description << "Comma-separated list of message fields to print in Pretty format. If not specified, all fields are printed. Available fields: "; NColorizer::TColors colors = NColorizer::AutoColors(Cout); for (const auto& iter : TopicMetadataFieldsDescriptions) { description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second; @@ -354,7 +354,7 @@ namespace NYdb::NConsoleClient { void TCommandTopicRead::AddAllowedTransformFormats(TConfig& config) { TStringStream description; - description << "Format to transform received body: Available \"transform\" values: "; + description << "Conversion between a message data in the topic and the client filesystem/terminal. Available options: "; NColorizer::TColors colors = NColorizer::AutoColors(Cout); for (const auto& iter : TransformBodyDescriptions) { description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second; @@ -362,22 +362,24 @@ namespace NYdb::NConsoleClient { config.Opts->AddLongOption("transform", description.Str()) .Optional() + .DefaultValue("none") .StoreResult(&TransformStr_); } void TCommandTopicRead::Config(TConfig& config) { TYdbCommand::Config(config); config.Opts->SetFreeArgsNum(1); - SetFreeArgTitle(0, "<topic-path>", "Topic to read data"); + SetFreeArgTitle(0, "<topic-path>", "Topic to read data from"); - AddFormats(config, { - EOutputFormat::Pretty, - EOutputFormat::NewlineDelimited, - EOutputFormat::Concatenated, + AddMessagingFormats(config, { + EMessagingFormat::SingleMessage, + EMessagingFormat::Pretty, + EMessagingFormat::NewlineDelimited, + EMessagingFormat::Concatenated, }); // TODO(shmel1k@): improve help. - config.Opts->AddLongOption('c', "consumer", "Consumer name") + config.Opts->AddLongOption('c', "consumer", "Consumer name.") .Required() .StoreResult(&Consumer_); // config.Opts->AddLongOption("offset", "Offset to start read from") @@ -386,7 +388,7 @@ namespace NYdb::NConsoleClient { // config.Opts->AddLongOption("partition", "Partition to read from") // .Optional() // .StoreResult(&Partition_); - config.Opts->AddLongOption('f', "file", "File to write data to") + config.Opts->AddLongOption('f', "file", "File to write data to. In not specified, data is written to the standard output.") .Optional() .StoreResult(&File_); // NOTE(shmel1k@): temporary disabled options @@ -399,7 +401,7 @@ namespace NYdb::NConsoleClient { // config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve // .Optional() // .StoreResult(&FlushMessagesCount_); - config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages") + config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages. Topic is considered empty if no new messages arrive within this period.") .Optional() .DefaultValue(DefaultIdleTimeout) .StoreResult(&IdleTimeout_); @@ -414,14 +416,17 @@ namespace NYdb::NConsoleClient { // config.Opts->AddLongOption("discard-above-limits", "Do not print messages with size more than defined in 'message-size-limit' option") // .Optional() // .StoreResult(&DiscardAboveLimits_); - config.Opts->AddLongOption("limit", "Messages count to read") + config.Opts->AddLongOption("limit", "Limit on message count to read, 0 - unlimited. " + "If avobe 0, processing stops when either topic is empty, or the specified limit reached. " + "Must be above 0 for pretty output format." + "\nDefault is 10 for pretty format, unlimited for streaming formats.") .Optional() .StoreResult(&Limit_); - config.Opts->AddLongOption('w', "wait", "Wait for infinite time for first message received") + config.Opts->AddLongOption('w', "wait", "Wait indefinitely for a first message received. If not specified, command exits on empty topic returning no data to the output.") .Optional() .NoArgument() .StoreValue(&Wait_, true); - config.Opts->AddLongOption("timestamp", "Timestamp from which messages will be read") + config.Opts->AddLongOption("timestamp", "Timestamp from which messages will be read. If not specified, messages are read from the last commit point for the chosen consumer.") .Optional() .StoreResult(&Timestamp_); @@ -485,7 +490,7 @@ namespace NYdb::NConsoleClient { void TCommandTopicRead::Parse(TConfig& config) { TYdbCommand::Parse(config); ParseTopicName(config, 0); - ParseFormats(); + ParseMessagingFormats(); ParseMetadataFields(); ParseTransformFormat(); } @@ -507,7 +512,7 @@ namespace NYdb::NConsoleClient { void TCommandTopicRead::ValidateConfig() { // TODO(shmel1k@): add more formats. - if (OutputFormat != EOutputFormat::Default && (Limit_.Defined() && (Limit_ < 0 || Limit_ > 500))) { + if (MessagingFormat != EMessagingFormat::SingleMessage && (Limit_.Defined() && (Limit_ < 0 || Limit_ > 500))) { throw TMisuseException() << "OutputFormat " << OutputFormat << " is not compatible with " << "limit equal '0' or more than '500': '" << *Limit_ << "' was given"; } @@ -528,7 +533,7 @@ namespace NYdb::NConsoleClient { Limit_, Commit_, Wait_, - OutputFormat, + MessagingFormat, MetadataFields_, Transform_, IdleTimeout_)); @@ -563,12 +568,11 @@ namespace NYdb::NConsoleClient { config.Opts->SetFreeArgsNum(1); SetFreeArgTitle(0, "<topic-path>", "Topic to write data"); - AddInputFormats(config, { - EOutputFormat::NewlineDelimited, - EOutputFormat::SingleMessage, + AddMessagingFormats(config, { + EMessagingFormat::NewlineDelimited, + EMessagingFormat::SingleMessage, // EOutputFormat::JsonRawStreamConcat, // EOutputFormat::JsonRawArray, - EOutputFormat::SingleMessage, }); AddAllowedCodecs(config); @@ -599,10 +603,10 @@ namespace NYdb::NConsoleClient { void TCommandTopicWrite::Parse(TConfig& config) { TYdbCommand::Parse(config); ParseTopicName(config, 0); - ParseFormats(); + ParseMessagingFormats(); ParseCodec(); - if (Delimiter_.Defined() && InputFormat != EOutputFormat::Default) { + if (Delimiter_.Defined() && MessagingFormat != EMessagingFormat::SingleMessage) { throw TMisuseException() << "Both mutually exclusive options \"delimiter\"(\"--delimiter\", \"-d\" " << "and \"input format\"(\"--input-format\") were provided."; } @@ -674,7 +678,7 @@ namespace NYdb::NConsoleClient { { auto writeSession = NPersQueue::TPersQueueClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings())); auto writer = TTopicWriter(writeSession, std::move(TTopicWriterParams( - InputFormat, + MessagingFormat, Delimiter_, MessageSizeLimit_, BatchDuration_, diff --git a/ydb/public/lib/ydb_cli/common/format.cpp b/ydb/public/lib/ydb_cli/common/format.cpp index 4c2bff7c00b..0db3a40d1f5 100644 --- a/ydb/public/lib/ydb_cli/common/format.cpp +++ b/ydb/public/lib/ydb_cli/common/format.cpp @@ -15,17 +15,10 @@ namespace { { EOutputFormat::JsonBase64, "Input in json format, binary strings are decoded with base64" }, { EOutputFormat::Csv, "Input in csv format" }, { EOutputFormat::Tsv, "Input in tsv format" }, - { EOutputFormat::SingleMessage, "Input data as a single message"}, // TODO - { EOutputFormat::NewlineDelimited, "Input data is '\\n' delimited"}, // TODO -// { EOutputFormat::JsonRawStreamConcat, ""}, // TODO, -// { EOutputFormat::JsonRawArray, ""}, // TODO }; THashMap<EOutputFormat, TString> FormatDescriptions = { { EOutputFormat::Pretty, "Human readable output" }, - { EOutputFormat::PrettyRaw, "Human readable format with raw data printed"}, // TODO(shmel1k@): improve - { EOutputFormat::PrettyUnicode, "Human readable format with data printed as unicode"}, // TODO(shmel1k@): improve - { EOutputFormat::PrettyBase64, "Human readable format with data printed as base64 encoded"}, // TODO(shmel1k@): improve { EOutputFormat::Json, "Output in json format" }, { EOutputFormat::JsonUnicode, "Output in json format, binary strings are encoded with unicode characters. " "Every row is a separate json on a separate line." }, @@ -42,13 +35,15 @@ namespace { "Every row is a separate binary data on a separate line"}, { EOutputFormat::ProtoJsonBase64, "Output result protobuf in json format, binary strings are encoded with base64" }, { EOutputFormat::Csv, "Output in csv format" }, - { EOutputFormat::SingleMessage, "Output result is presented as a single message"}, // TODO(shmel1k@): improve - { EOutputFormat::NewlineDelimited, "Newline delimited"}, // TODO(shmel1k@): improve - { EOutputFormat::NewlineBase64, "Newline base64"}, // TODO(shmel1k@): improve, - { EOutputFormat::Concatenated, "Concatenated"}, // TODO(shmel1k@): improve, - { EOutputFormat::JsonBase64StreamConcat, "Concatenated base64 stream"}, // TODO(shmel1k@): improve, - { EOutputFormat::JsonUnicodeStreamConcat, "Concatenated unicode stream"}, // TODO(shmel1k@): improve, - { EOutputFormat::JsonRawStreamConcat, "Concated raw stream"}, // TODO(shmel1k@): improve + }; + + THashMap<EMessagingFormat, TString> MessagingFormatDescriptions = { + { EMessagingFormat::Pretty, "Human readable output with metadata." }, + { EMessagingFormat::SingleMessage, "Single message."}, // TODO(shmel1k@): improve + { EMessagingFormat::NewlineDelimited, "Newline delimited stream of messages."}, // TODO(shmel1k@): improve + { EMessagingFormat::Concatenated, "Concatenated output stream of messages."}, // TODO(shmel1k@): improve, + { EMessagingFormat::JsonStreamConcat, "Concatenated Json stream of envelopes with metadata and messages in the ""body"" attribute." }, // TODO(shmel1k@): improve, + { EMessagingFormat::JsonArray, "Json array of envelopes with metadata and messages in the ""body"" attribute." }, // TODO(shmel1k@): improve, }; } @@ -114,6 +109,23 @@ void TCommandWithFormat::AddFormats(TClientCommand::TConfig& config, const TVect AllowedFormats = allowedFormats; } +void TCommandWithFormat::AddMessagingFormats(TClientCommand::TConfig& config, const TVector<EMessagingFormat>& allowedFormats) { + TStringStream description; + description << "Client-side format. Available options: "; + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + for (const auto& format : allowedFormats) { + auto findResult = MessagingFormatDescriptions.find(format); + Y_VERIFY(findResult != MessagingFormatDescriptions.end(), + "Couldn't find description for %s output format", (TStringBuilder() << format).c_str()); + description << "\n " << colors.BoldColor() << format << colors.OldColor() + << "\n " << findResult->second; + } + config.Opts->AddLongOption("format", description.Str()) + .DefaultValue( "single-message" ) + .RequiredArgument("STRING").StoreResult(&MessagingFormat); + AllowedMessagingFormats = allowedFormats; +} + void TCommandWithFormat::ParseFormats() { if (InputFormat != EOutputFormat::Default && std::find(AllowedInputFormats.begin(), AllowedInputFormats.end(), InputFormat) == AllowedInputFormats.end()) { @@ -129,6 +141,14 @@ void TCommandWithFormat::ParseFormats() { } } +void TCommandWithFormat::ParseMessagingFormats() { + if (MessagingFormat != EMessagingFormat::SingleMessage + && std::find(AllowedMessagingFormats.begin(), AllowedMessagingFormats.end(), MessagingFormat) == AllowedMessagingFormats.end()) { + throw TMisuseException() << "Messaging format " << MessagingFormat << " is not available for this command"; + } +} + + void TQueryPlanPrinter::Print(const TString& plan) { switch (Format) { case EOutputFormat::Default: diff --git a/ydb/public/lib/ydb_cli/common/format.h b/ydb/public/lib/ydb_cli/common/format.h index c8fa91a3246..bc571f7e0c5 100644 --- a/ydb/public/lib/ydb_cli/common/format.h +++ b/ydb/public/lib/ydb_cli/common/format.h @@ -23,7 +23,9 @@ class TCommandWithFormat { protected: void AddInputFormats(TClientCommand::TConfig& config, const TVector<EOutputFormat>& allowedFormats); void AddFormats(TClientCommand::TConfig& config, const TVector<EOutputFormat>& allowedFormats); + void AddMessagingFormats(TClientCommand::TConfig& config, const TVector<EMessagingFormat>& allowedFormats); void ParseFormats(); + void ParseMessagingFormats(); // Deprecated void AddDeprecatedJsonOption(TClientCommand::TConfig& config, @@ -33,10 +35,12 @@ protected: protected: EOutputFormat OutputFormat = EOutputFormat::Default; EOutputFormat InputFormat = EOutputFormat::Default; + EMessagingFormat MessagingFormat = EMessagingFormat::SingleMessage; private: TVector<EOutputFormat> AllowedInputFormats; TVector<EOutputFormat> AllowedFormats; + TVector<EMessagingFormat> AllowedMessagingFormats; bool DeprecatedOptionUsed = false; }; diff --git a/ydb/public/lib/ydb_cli/common/formats.h b/ydb/public/lib/ydb_cli/common/formats.h index 05fd8b6ba0b..09a893db58e 100644 --- a/ydb/public/lib/ydb_cli/common/formats.h +++ b/ydb/public/lib/ydb_cli/common/formats.h @@ -3,12 +3,10 @@ namespace NYdb { namespace NConsoleClient { +// EOutputFormat to be used in operations related to structured data enum class EOutputFormat { Default /* "default" */, Pretty /* "pretty" */, - PrettyRaw /* "pretty-raw" */, - PrettyUnicode /* "pretty-unicode" */, - PrettyBase64 /* "pretty-base64" */, Json /* "json" */, JsonUnicode /* "json-unicode" */, JsonUnicodeArray /* "json-unicode-array" */, @@ -18,14 +16,18 @@ enum class EOutputFormat { ProtoJsonBase64 /* "proto-json-base64" */, Csv /* "csv" */, Tsv /* "tsv" */, +}; + +// EMessagingFormat to be used in both input and output when working with files/pipes in operations related to messaging +// This format defines rules for data transformation, framing, metadata envelope format +enum class EMessagingFormat { + Pretty /* "pretty" */, SingleMessage /* "single-message" */, NewlineDelimited /* "newline-delimited" */, - NewlineBase64 /* "newline-base64" */, Concatenated /* "concatenated" */, - JsonBase64StreamConcat /* "json-base64-stream-concat" */, - JsonUnicodeStreamConcat /* "json-unicode-stream-concat" */, - JsonRawStreamConcat /* "json-raw-stream-concat" */, + JsonStreamConcat /* "json-stream-concat" */, + JsonArray /* "json-array" */, }; } diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp index e8660548f13..ecbb258efa7 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_read.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp @@ -12,8 +12,8 @@ namespace NYdb::NConsoleClient { constexpr i64 MessagesLimitDefaultPrettyFormat = 10; constexpr i64 MessagesLimitDefaultJsonArrayFormat = 500; - bool IsStreamingFormat(EOutputFormat format) { - return format == EOutputFormat::NewlineBase64 || format == EOutputFormat::NewlineDelimited || format == EOutputFormat::Concatenated; + bool IsStreamingFormat(EMessagingFormat format) { + return format == EMessagingFormat::NewlineDelimited || format == EMessagingFormat::Concatenated; } } @@ -24,13 +24,13 @@ namespace NYdb::NConsoleClient { TMaybe<i64> limit, bool commit, bool wait, - EOutputFormat format, + EMessagingFormat format, TVector<ETopicMetadataField> metadataFields, ETransformBody transform, TDuration idleTimeout) : MetadataFields_(metadataFields) , IdleTimeout_(idleTimeout) - , OutputFormat_(format) + , MessagingFormat_(format) , Transform_(transform) , Limit_(limit) , Commit_(commit) @@ -53,20 +53,20 @@ namespace NYdb::NConsoleClient { OutputTable_ = std::make_unique<TPrettyTable>(table); if (!ReaderParams_.Limit().Defined()) { - if (IsStreamingFormat(ReaderParams_.OutputFormat())) { + if (IsStreamingFormat(ReaderParams_.MessagingFormat())) { MessagesLeft_ = MessagesLimitUnlimited; } - if (ReaderParams_.OutputFormat() == EOutputFormat::Pretty) { + if (ReaderParams_.MessagingFormat() == EMessagingFormat::Pretty) { MessagesLeft_ = MessagesLimitDefaultPrettyFormat; } - if (ReaderParams_.OutputFormat() == EOutputFormat::JsonRawArray) { + if (ReaderParams_.MessagingFormat() == EMessagingFormat::JsonArray) { MessagesLeft_ = MessagesLimitDefaultJsonArrayFormat; } return; } i64 limit = *(ReaderParams_.Limit()); - if (IsStreamingFormat(ReaderParams_.OutputFormat()) && limit == 0) { + if (IsStreamingFormat(ReaderParams_.MessagingFormat()) && limit == 0) { limit = -1; } MessagesLeft_ = limit; @@ -83,20 +83,11 @@ namespace NYdb::NConsoleClient { using TReceivedMessage = NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage; - void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, EOutputFormat format, ETransformBody transform, ETopicMetadataField f, size_t idx) { + void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, ETransformBody transform, ETopicMetadataField f, size_t idx) { switch (f) { case ETopicMetadataField::Body: - if (format == EOutputFormat::PrettyBase64) { - row.Column(idx, FormatBody(message.GetData(), transform)); - } - if (format == EOutputFormat::Pretty || format == EOutputFormat::PrettyRaw) { - row.Column(idx, FormatBody(message.GetData(), transform)); - } - if (format == EOutputFormat::PrettyUnicode) { - row.Column(idx, message.GetData()); - } + row.Column(idx, FormatBody(message.GetData(), transform)); break; - case ETopicMetadataField::CreateTime: row.Column(idx, message.GetCreateTime()); break; @@ -128,7 +119,7 @@ namespace NYdb::NConsoleClient { TPrettyTable::TRow& row = OutputTable_->AddRow(); for (size_t i = 0; i < ReaderParams_.MetadataFields().size(); ++i) { ETopicMetadataField f = ReaderParams_.MetadataFields()[i]; - AddMetadataFieldToRow(row, message, ReaderParams_.OutputFormat(), ReaderParams_.Transform(), f, i); + AddMetadataFieldToRow(row, message, ReaderParams_.Transform(), f, i); } } @@ -141,14 +132,10 @@ namespace NYdb::NConsoleClient { } void TTopicReader::Close(IOutputStream& output, TDuration closeTimeout) { - if (ReaderParams_.OutputFormat() == EOutputFormat::Pretty || - ReaderParams_.OutputFormat() == EOutputFormat::PrettyBase64 || - ReaderParams_.OutputFormat() == EOutputFormat::PrettyRaw) { + if (ReaderParams_.MessagingFormat() == EMessagingFormat::Pretty) { PrintMessagesInPrettyFormat(output); } - if (ReaderParams_.OutputFormat() == EOutputFormat::JsonRawArray || - ReaderParams_.OutputFormat() == EOutputFormat::JsonBase64Array || - ReaderParams_.OutputFormat() == EOutputFormat::JsonUnicodeArray) { + if (ReaderParams_.MessagingFormat() == EMessagingFormat::JsonArray) { PrintMessagesInJsonArrayFormat(output); } output.Flush(); @@ -159,19 +146,19 @@ namespace NYdb::NConsoleClient { } void TTopicReader::HandleReceivedMessage(const TReceivedMessage& message, IOutputStream& output) { - EOutputFormat outputFormat = ReaderParams_.OutputFormat(); - if (outputFormat == EOutputFormat::Default || outputFormat == EOutputFormat::Concatenated) { + EMessagingFormat MessagingFormat = ReaderParams_.MessagingFormat(); + if (MessagingFormat == EMessagingFormat::SingleMessage || MessagingFormat == EMessagingFormat::Concatenated) { output << FormatBody(message.GetData(), ReaderParams_.Transform()); output.Flush(); return; } - if (outputFormat == EOutputFormat::NewlineDelimited) { + if (MessagingFormat == EMessagingFormat::NewlineDelimited) { output << FormatBody(message.GetData(), ReaderParams_.Transform()); output << "\n"; output.Flush(); return; } - if (outputFormat == EOutputFormat::Default) { + if (MessagingFormat == EMessagingFormat::SingleMessage) { output << FormatBody(message.GetData(), ReaderParams_.Transform()); return; } @@ -239,7 +226,7 @@ namespace NYdb::NConsoleClient { int TTopicReader::Run(IOutputStream& output) { LastMessageReceivedTs_ = TInstant::Now(); - bool waitForever = ReaderParams_.Wait() && (ReaderParams_.OutputFormat() == EOutputFormat::NewlineDelimited || ReaderParams_.OutputFormat() == EOutputFormat::Concatenated); + bool waitForever = ReaderParams_.Wait() && (ReaderParams_.MessagingFormat() == EMessagingFormat::NewlineDelimited || ReaderParams_.MessagingFormat() == EMessagingFormat::Concatenated); while ((MessagesLeft_ > 0 || MessagesLeft_ == -1) && !IsInterrupted()) { TInstant messageReceiveDeadline = LastMessageReceivedTs_ + ReaderParams_.IdleTimeout(); diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h index 5afdb6d9f59..cc6278c77b2 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_read.h +++ b/ydb/public/lib/ydb_cli/topic/topic_read.h @@ -20,7 +20,7 @@ namespace NYdb::NConsoleClient { TMaybe<i64> limit, bool commit, bool wait, - EOutputFormat format, + EMessagingFormat format, TVector<ETopicMetadataField> metadataFields, ETransformBody transform, TDuration idleTimeout); @@ -33,7 +33,7 @@ namespace NYdb::NConsoleClient { GETTER(bool, Commit); GETTER(TMaybe<i64>, Limit); GETTER(bool, Wait); - GETTER(EOutputFormat, OutputFormat); + GETTER(EMessagingFormat, MessagingFormat); GETTER(ETransformBody, Transform); GETTER(TDuration, IdleTimeout); // TODO(shmel1k@): add batching settings. @@ -45,7 +45,7 @@ namespace NYdb::NConsoleClient { TMaybe<int> FlushMessagesCount_; TDuration IdleTimeout_; - EOutputFormat OutputFormat_ = EOutputFormat::Default; + EMessagingFormat MessagingFormat_ = EMessagingFormat::SingleMessage; ETransformBody Transform_ = ETransformBody::None; TMaybe<i64> Limit_ = Nothing(); bool Commit_ = false; diff --git a/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp index d18bc53c4d4..cbe1ae66c24 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp @@ -24,7 +24,7 @@ namespace NYdb::NConsoleClient { { "some simple message", }, - "", TTopicReaderSettings(Nothing(), false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1))); + "", TTopicReaderSettings(Nothing(), false, false, EMessagingFormat::SingleMessage, {}, ETransformBody::None, TDuration::Seconds(1))); } void TestRun_ReadTwoMessages_With_Limit_1() { @@ -35,7 +35,7 @@ namespace NYdb::NConsoleClient { { "message1", }, - "", TTopicReaderSettings(1, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1))); + "", TTopicReaderSettings(1, false, false, EMessagingFormat::SingleMessage, {}, ETransformBody::None, TDuration::Seconds(1))); } void TestRun_ReadMoreMessagesThanLimit_Without_Wait_NewlineDelimited() { @@ -51,7 +51,7 @@ namespace NYdb::NConsoleClient { "message2", "message3", }, - "\n", TTopicReaderSettings(limit, false, false, EOutputFormat::NewlineDelimited, {}, ETransformBody::None, TDuration::Seconds(1))); + "\n", TTopicReaderSettings(limit, false, false, EMessagingFormat::NewlineDelimited, {}, ETransformBody::None, TDuration::Seconds(1))); } void TestRun_ReadMoreMessagesThanLimit_Without_Wait_NoDelimiter() { @@ -64,7 +64,7 @@ namespace NYdb::NConsoleClient { { "message1message2message3message4", }, - "", TTopicReaderSettings(limit, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1))); + "", TTopicReaderSettings(limit, false, false, EMessagingFormat::SingleMessage, {}, ETransformBody::None, TDuration::Seconds(1))); } void TestRun_ReadMessages_Output_Base64() { @@ -80,7 +80,7 @@ namespace NYdb::NConsoleClient { "bWVzc2FnZTI=", "bWVzc2FnZTM=", }, - "\n", TTopicReaderSettings(limit, false, false, EOutputFormat::NewlineDelimited, {}, ETransformBody::Base64, TDuration::Seconds(1))); + "\n", TTopicReaderSettings(limit, false, false, EMessagingFormat::NewlineDelimited, {}, ETransformBody::Base64, TDuration::Seconds(1))); } void TestRun_Read_Less_Messages_Than_Sent() { @@ -94,7 +94,7 @@ namespace NYdb::NConsoleClient { { "message1message2", }, - "", TTopicReaderSettings(limit, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1))); + "", TTopicReaderSettings(limit, false, false, EMessagingFormat::SingleMessage, {}, ETransformBody::None, TDuration::Seconds(1))); } private: diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.cpp b/ydb/public/lib/ydb_cli/topic/topic_write.cpp index e05ef1976f7..f4bbdf5cb60 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_write.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_write.cpp @@ -13,15 +13,15 @@ namespace NYdb::NConsoleClient { TTopicWriterParams::TTopicWriterParams() { } - TTopicWriterParams::TTopicWriterParams(EOutputFormat inputFormat, TMaybe<TString> delimiter, + TTopicWriterParams::TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter, ui64 messageSizeLimit, TMaybe<TDuration> batchDuration, TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount) - : InputFormat_(inputFormat) + : MessagingFormat_(inputFormat) , BatchDuration_(batchDuration) , BatchSize_(batchSize) , BatchMessagesCount_(batchMessagesCount) , MessageSizeLimit_(messageSizeLimit) { - if (inputFormat == EOutputFormat::NewlineDelimited || inputFormat == EOutputFormat::Concatenated) { + if (inputFormat == EMessagingFormat::NewlineDelimited || inputFormat == EMessagingFormat::Concatenated) { Delimiter_ = TMaybe<char>('\n'); } if (delimiter.Defined()) { diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.h b/ydb/public/lib/ydb_cli/topic/topic_write.h index 445e3c9016f..131c0a7ea26 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_write.h +++ b/ydb/public/lib/ydb_cli/topic/topic_write.h @@ -14,7 +14,7 @@ namespace NYdb::NConsoleClient { class TTopicWriterParams { public: TTopicWriterParams(); - TTopicWriterParams(EOutputFormat inputFormat, TMaybe<TString> delimiter, + TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter, ui64 messageSizeLimit, TMaybe<TDuration> batchDuration, TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount); TTopicWriterParams(const TTopicWriterParams&) = default; @@ -25,12 +25,12 @@ namespace NYdb::NConsoleClient { GETTER(TMaybe<ui64>, BatchSize); GETTER(TMaybe<ui64>, BatchMessagesCount); GETTER(ui64, MessageSizeLimit); - GETTER(EOutputFormat, InputFormat); + GETTER(EMessagingFormat, MessagingFormat); private: TMaybe<TString> File_; TMaybe<char> Delimiter_; - EOutputFormat InputFormat_ = EOutputFormat::Default; + EMessagingFormat MessagingFormat_ = EMessagingFormat::SingleMessage; // TODO(shmel1k@): move to 'TWithBatchingCommand' or something like that. TMaybe<TDuration> BatchDuration_; diff --git a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp index 8540e647780..57dbc5e32f0 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp @@ -36,7 +36,7 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_OnlyDelimiters() { TStringStream str = TString("\n") * 6; - TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\n", 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing())); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -97,7 +97,7 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_1KiB_Newline_Delimiter() { TStringStream str = TString("a") * 512 + "\n" + TString("b") * 512; - TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\n", 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing())); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -121,7 +121,7 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_1KiB_Newline_Delimited_With_Two_Delimiters_In_A_Row() { TStringStream str = TString("a") * 512 + "\n\n" + TString("b") * 512; - TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing())); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -165,7 +165,7 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_ZeroSymbol_Delimited() { auto& s = "\0\0\0\0\n\nprivet"; TStringStream str = TString(std::begin(s), std::end(s)); - TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\0", 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\0", 0, Nothing(), Nothing(), Nothing())); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -204,7 +204,7 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_Custom_Delimiter_Delimited() { TStringStream str = TString("privet_vasya_kak_dela?"); - TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "_", 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "_", 0, Nothing(), Nothing(), Nothing())); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -237,25 +237,25 @@ namespace NYdb::NConsoleClient { } void TestTopicWriterParams_Format_NewlineDelimited() { - TTopicWriterParams p = TTopicWriterParams(EOutputFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing()); - UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::NewlineDelimited); + TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing()); + UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::NewlineDelimited); UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n'); } void TestTopicWriterParams_Format_Concatenated() { - TTopicWriterParams p = TTopicWriterParams(EOutputFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing()); - UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::Concatenated); + TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing()); + UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::Concatenated); UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n'); } void TestTopicWriterParams_No_Delimiter() { - TTopicWriterParams p = TTopicWriterParams(EOutputFormat::Default, Nothing(), 0, Nothing(), Nothing(), Nothing()); - UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::Default); + TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing()); + UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::SingleMessage); UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), Nothing()); } void TestTopicWriterParams_InvalidDelimiter() { - UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EOutputFormat::Default, "invalid", 0, Nothing(), Nothing(), Nothing()), yexception); + UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EMessagingFormat::SingleMessage, "invalid", 0, Nothing(), Nothing(), Nothing()), yexception); } private: |