aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexv-smirnov <alex@ydb.tech>2022-08-26 11:42:19 +0300
committeralexv-smirnov <alex@ydb.tech>2022-08-26 11:42:19 +0300
commita9736534c949447c5d1a9cf9c7d65b9d1fe04279 (patch)
tree5ef9cfb917b6ae2f2e3e8f4720107bea59d9ce87
parent76a1e556daf2542e3b3d55d7971cc1ed0abb1f50 (diff)
downloadydb-a9736534c949447c5d1a9cf9c7d65b9d1fe04279.tar.gz
fix ydb cli topic read help messages and formats
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp66
-rw-r--r--ydb/public/lib/ydb_cli/common/format.cpp48
-rw-r--r--ydb/public/lib/ydb_cli/common/format.h4
-rw-r--r--ydb/public/lib/ydb_cli/common/formats.h16
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.cpp49
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.h6
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp12
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write.cpp6
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write.h6
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp24
10 files changed, 127 insertions, 110 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
index 58c400542cb..e70da397703 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
@@ -31,11 +31,11 @@ namespace NYdb::NConsoleClient {
// TODO(shmel1k@): improve docs
THashMap<ETopicMetadataField, TString> TopicMetadataFieldsDescriptions = {
{ETopicMetadataField::Body, "Message data"},
- {ETopicMetadataField::WriteTime, "Message write time. This time defines the UNIX timestamp the messages was written to server"},
- {ETopicMetadataField::CreateTime, "Message creation time. This time defines the UNIX timestamp the message was created by client"},
- {ETopicMetadataField::MessageGroupID, "Message group id. This identifier is used to stick to concrete partition using round-robin algorithm. All messages with the same message group id are guaranteed to be read in FIFO order"},
- {ETopicMetadataField::Offset, "Message offset. Offset defines unique message number in his partition"},
- {ETopicMetadataField::SeqNo, "Message sequence number, which is used for message deduplication"},
+ {ETopicMetadataField::WriteTime, "Message write time, a UNIX timestamp the message was written to server"},
+ {ETopicMetadataField::CreateTime, "Message creation time, a UNIX timestamp provided by the publishing client"},
+ {ETopicMetadataField::MessageGroupID, "Message group id. All messages with the same message group id are guaranteed to be read in FIFO order"},
+ {ETopicMetadataField::Offset, "Message offset. Offset orders messages in each partition"},
+ {ETopicMetadataField::SeqNo, "Message sequence number, used for message deduplication when publishing"},
{ETopicMetadataField::Meta, "Message additional metadata"},
};
@@ -60,8 +60,8 @@ namespace NYdb::NConsoleClient {
};
THashMap<ETransformBody, TString> TransformBodyDescriptions = {
- {ETransformBody::None, "Do not transform body to any format"},
- {ETransformBody::Base64, "Transform body to base64 format"},
+ {ETransformBody::None, "No conversions, binary data on the client is exactly the same as it is in the topic message."},
+ {ETransformBody::Base64, "Message on the client is a base64-encoded representation of the topic message."},
};
constexpr TDuration DefaultIdleTimeout = TDuration::Seconds(1);
@@ -336,12 +336,12 @@ namespace NYdb::NConsoleClient {
}
TCommandTopicRead::TCommandTopicRead()
- : TYdbCommand("read", {}, "Read from topic command") {
+ : TYdbCommand("read", {}, "Read from a topic to the client filesystem or terminal") {
}
void TCommandTopicRead::AddAllowedMetadataFields(TConfig& config) {
TStringStream description;
- description << "Comma-separated list of message fields to print. Available fields: ";
+ description << "Comma-separated list of message fields to print in Pretty format. If not specified, all fields are printed. Available fields: ";
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
for (const auto& iter : TopicMetadataFieldsDescriptions) {
description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second;
@@ -354,7 +354,7 @@ namespace NYdb::NConsoleClient {
void TCommandTopicRead::AddAllowedTransformFormats(TConfig& config) {
TStringStream description;
- description << "Format to transform received body: Available \"transform\" values: ";
+ description << "Conversion between a message data in the topic and the client filesystem/terminal. Available options: ";
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
for (const auto& iter : TransformBodyDescriptions) {
description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second;
@@ -362,22 +362,24 @@ namespace NYdb::NConsoleClient {
config.Opts->AddLongOption("transform", description.Str())
.Optional()
+ .DefaultValue("none")
.StoreResult(&TransformStr_);
}
void TCommandTopicRead::Config(TConfig& config) {
TYdbCommand::Config(config);
config.Opts->SetFreeArgsNum(1);
- SetFreeArgTitle(0, "<topic-path>", "Topic to read data");
+ SetFreeArgTitle(0, "<topic-path>", "Topic to read data from");
- AddFormats(config, {
- EOutputFormat::Pretty,
- EOutputFormat::NewlineDelimited,
- EOutputFormat::Concatenated,
+ AddMessagingFormats(config, {
+ EMessagingFormat::SingleMessage,
+ EMessagingFormat::Pretty,
+ EMessagingFormat::NewlineDelimited,
+ EMessagingFormat::Concatenated,
});
// TODO(shmel1k@): improve help.
- config.Opts->AddLongOption('c', "consumer", "Consumer name")
+ config.Opts->AddLongOption('c', "consumer", "Consumer name.")
.Required()
.StoreResult(&Consumer_);
// config.Opts->AddLongOption("offset", "Offset to start read from")
@@ -386,7 +388,7 @@ namespace NYdb::NConsoleClient {
// config.Opts->AddLongOption("partition", "Partition to read from")
// .Optional()
// .StoreResult(&Partition_);
- config.Opts->AddLongOption('f', "file", "File to write data to")
+ config.Opts->AddLongOption('f', "file", "File to write data to. In not specified, data is written to the standard output.")
.Optional()
.StoreResult(&File_);
// NOTE(shmel1k@): temporary disabled options
@@ -399,7 +401,7 @@ namespace NYdb::NConsoleClient {
// config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve
// .Optional()
// .StoreResult(&FlushMessagesCount_);
- config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages")
+ config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages. Topic is considered empty if no new messages arrive within this period.")
.Optional()
.DefaultValue(DefaultIdleTimeout)
.StoreResult(&IdleTimeout_);
@@ -414,14 +416,17 @@ namespace NYdb::NConsoleClient {
// config.Opts->AddLongOption("discard-above-limits", "Do not print messages with size more than defined in 'message-size-limit' option")
// .Optional()
// .StoreResult(&DiscardAboveLimits_);
- config.Opts->AddLongOption("limit", "Messages count to read")
+ config.Opts->AddLongOption("limit", "Limit on message count to read, 0 - unlimited. "
+ "If avobe 0, processing stops when either topic is empty, or the specified limit reached. "
+ "Must be above 0 for pretty output format."
+ "\nDefault is 10 for pretty format, unlimited for streaming formats.")
.Optional()
.StoreResult(&Limit_);
- config.Opts->AddLongOption('w', "wait", "Wait for infinite time for first message received")
+ config.Opts->AddLongOption('w', "wait", "Wait indefinitely for a first message received. If not specified, command exits on empty topic returning no data to the output.")
.Optional()
.NoArgument()
.StoreValue(&Wait_, true);
- config.Opts->AddLongOption("timestamp", "Timestamp from which messages will be read")
+ config.Opts->AddLongOption("timestamp", "Timestamp from which messages will be read. If not specified, messages are read from the last commit point for the chosen consumer.")
.Optional()
.StoreResult(&Timestamp_);
@@ -485,7 +490,7 @@ namespace NYdb::NConsoleClient {
void TCommandTopicRead::Parse(TConfig& config) {
TYdbCommand::Parse(config);
ParseTopicName(config, 0);
- ParseFormats();
+ ParseMessagingFormats();
ParseMetadataFields();
ParseTransformFormat();
}
@@ -507,7 +512,7 @@ namespace NYdb::NConsoleClient {
void TCommandTopicRead::ValidateConfig() {
// TODO(shmel1k@): add more formats.
- if (OutputFormat != EOutputFormat::Default && (Limit_.Defined() && (Limit_ < 0 || Limit_ > 500))) {
+ if (MessagingFormat != EMessagingFormat::SingleMessage && (Limit_.Defined() && (Limit_ < 0 || Limit_ > 500))) {
throw TMisuseException() << "OutputFormat " << OutputFormat << " is not compatible with "
<< "limit equal '0' or more than '500': '" << *Limit_ << "' was given";
}
@@ -528,7 +533,7 @@ namespace NYdb::NConsoleClient {
Limit_,
Commit_,
Wait_,
- OutputFormat,
+ MessagingFormat,
MetadataFields_,
Transform_,
IdleTimeout_));
@@ -563,12 +568,11 @@ namespace NYdb::NConsoleClient {
config.Opts->SetFreeArgsNum(1);
SetFreeArgTitle(0, "<topic-path>", "Topic to write data");
- AddInputFormats(config, {
- EOutputFormat::NewlineDelimited,
- EOutputFormat::SingleMessage,
+ AddMessagingFormats(config, {
+ EMessagingFormat::NewlineDelimited,
+ EMessagingFormat::SingleMessage,
// EOutputFormat::JsonRawStreamConcat,
// EOutputFormat::JsonRawArray,
- EOutputFormat::SingleMessage,
});
AddAllowedCodecs(config);
@@ -599,10 +603,10 @@ namespace NYdb::NConsoleClient {
void TCommandTopicWrite::Parse(TConfig& config) {
TYdbCommand::Parse(config);
ParseTopicName(config, 0);
- ParseFormats();
+ ParseMessagingFormats();
ParseCodec();
- if (Delimiter_.Defined() && InputFormat != EOutputFormat::Default) {
+ if (Delimiter_.Defined() && MessagingFormat != EMessagingFormat::SingleMessage) {
throw TMisuseException() << "Both mutually exclusive options \"delimiter\"(\"--delimiter\", \"-d\" "
<< "and \"input format\"(\"--input-format\") were provided.";
}
@@ -674,7 +678,7 @@ namespace NYdb::NConsoleClient {
{
auto writeSession = NPersQueue::TPersQueueClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings()));
auto writer = TTopicWriter(writeSession, std::move(TTopicWriterParams(
- InputFormat,
+ MessagingFormat,
Delimiter_,
MessageSizeLimit_,
BatchDuration_,
diff --git a/ydb/public/lib/ydb_cli/common/format.cpp b/ydb/public/lib/ydb_cli/common/format.cpp
index 4c2bff7c00b..0db3a40d1f5 100644
--- a/ydb/public/lib/ydb_cli/common/format.cpp
+++ b/ydb/public/lib/ydb_cli/common/format.cpp
@@ -15,17 +15,10 @@ namespace {
{ EOutputFormat::JsonBase64, "Input in json format, binary strings are decoded with base64" },
{ EOutputFormat::Csv, "Input in csv format" },
{ EOutputFormat::Tsv, "Input in tsv format" },
- { EOutputFormat::SingleMessage, "Input data as a single message"}, // TODO
- { EOutputFormat::NewlineDelimited, "Input data is '\\n' delimited"}, // TODO
-// { EOutputFormat::JsonRawStreamConcat, ""}, // TODO,
-// { EOutputFormat::JsonRawArray, ""}, // TODO
};
THashMap<EOutputFormat, TString> FormatDescriptions = {
{ EOutputFormat::Pretty, "Human readable output" },
- { EOutputFormat::PrettyRaw, "Human readable format with raw data printed"}, // TODO(shmel1k@): improve
- { EOutputFormat::PrettyUnicode, "Human readable format with data printed as unicode"}, // TODO(shmel1k@): improve
- { EOutputFormat::PrettyBase64, "Human readable format with data printed as base64 encoded"}, // TODO(shmel1k@): improve
{ EOutputFormat::Json, "Output in json format" },
{ EOutputFormat::JsonUnicode, "Output in json format, binary strings are encoded with unicode characters. "
"Every row is a separate json on a separate line." },
@@ -42,13 +35,15 @@ namespace {
"Every row is a separate binary data on a separate line"},
{ EOutputFormat::ProtoJsonBase64, "Output result protobuf in json format, binary strings are encoded with base64" },
{ EOutputFormat::Csv, "Output in csv format" },
- { EOutputFormat::SingleMessage, "Output result is presented as a single message"}, // TODO(shmel1k@): improve
- { EOutputFormat::NewlineDelimited, "Newline delimited"}, // TODO(shmel1k@): improve
- { EOutputFormat::NewlineBase64, "Newline base64"}, // TODO(shmel1k@): improve,
- { EOutputFormat::Concatenated, "Concatenated"}, // TODO(shmel1k@): improve,
- { EOutputFormat::JsonBase64StreamConcat, "Concatenated base64 stream"}, // TODO(shmel1k@): improve,
- { EOutputFormat::JsonUnicodeStreamConcat, "Concatenated unicode stream"}, // TODO(shmel1k@): improve,
- { EOutputFormat::JsonRawStreamConcat, "Concated raw stream"}, // TODO(shmel1k@): improve
+ };
+
+ THashMap<EMessagingFormat, TString> MessagingFormatDescriptions = {
+ { EMessagingFormat::Pretty, "Human readable output with metadata." },
+ { EMessagingFormat::SingleMessage, "Single message."}, // TODO(shmel1k@): improve
+ { EMessagingFormat::NewlineDelimited, "Newline delimited stream of messages."}, // TODO(shmel1k@): improve
+ { EMessagingFormat::Concatenated, "Concatenated output stream of messages."}, // TODO(shmel1k@): improve,
+ { EMessagingFormat::JsonStreamConcat, "Concatenated Json stream of envelopes with metadata and messages in the ""body"" attribute." }, // TODO(shmel1k@): improve,
+ { EMessagingFormat::JsonArray, "Json array of envelopes with metadata and messages in the ""body"" attribute." }, // TODO(shmel1k@): improve,
};
}
@@ -114,6 +109,23 @@ void TCommandWithFormat::AddFormats(TClientCommand::TConfig& config, const TVect
AllowedFormats = allowedFormats;
}
+void TCommandWithFormat::AddMessagingFormats(TClientCommand::TConfig& config, const TVector<EMessagingFormat>& allowedFormats) {
+ TStringStream description;
+ description << "Client-side format. Available options: ";
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+ for (const auto& format : allowedFormats) {
+ auto findResult = MessagingFormatDescriptions.find(format);
+ Y_VERIFY(findResult != MessagingFormatDescriptions.end(),
+ "Couldn't find description for %s output format", (TStringBuilder() << format).c_str());
+ description << "\n " << colors.BoldColor() << format << colors.OldColor()
+ << "\n " << findResult->second;
+ }
+ config.Opts->AddLongOption("format", description.Str())
+ .DefaultValue( "single-message" )
+ .RequiredArgument("STRING").StoreResult(&MessagingFormat);
+ AllowedMessagingFormats = allowedFormats;
+}
+
void TCommandWithFormat::ParseFormats() {
if (InputFormat != EOutputFormat::Default
&& std::find(AllowedInputFormats.begin(), AllowedInputFormats.end(), InputFormat) == AllowedInputFormats.end()) {
@@ -129,6 +141,14 @@ void TCommandWithFormat::ParseFormats() {
}
}
+void TCommandWithFormat::ParseMessagingFormats() {
+ if (MessagingFormat != EMessagingFormat::SingleMessage
+ && std::find(AllowedMessagingFormats.begin(), AllowedMessagingFormats.end(), MessagingFormat) == AllowedMessagingFormats.end()) {
+ throw TMisuseException() << "Messaging format " << MessagingFormat << " is not available for this command";
+ }
+}
+
+
void TQueryPlanPrinter::Print(const TString& plan) {
switch (Format) {
case EOutputFormat::Default:
diff --git a/ydb/public/lib/ydb_cli/common/format.h b/ydb/public/lib/ydb_cli/common/format.h
index c8fa91a3246..bc571f7e0c5 100644
--- a/ydb/public/lib/ydb_cli/common/format.h
+++ b/ydb/public/lib/ydb_cli/common/format.h
@@ -23,7 +23,9 @@ class TCommandWithFormat {
protected:
void AddInputFormats(TClientCommand::TConfig& config, const TVector<EOutputFormat>& allowedFormats);
void AddFormats(TClientCommand::TConfig& config, const TVector<EOutputFormat>& allowedFormats);
+ void AddMessagingFormats(TClientCommand::TConfig& config, const TVector<EMessagingFormat>& allowedFormats);
void ParseFormats();
+ void ParseMessagingFormats();
// Deprecated
void AddDeprecatedJsonOption(TClientCommand::TConfig& config,
@@ -33,10 +35,12 @@ protected:
protected:
EOutputFormat OutputFormat = EOutputFormat::Default;
EOutputFormat InputFormat = EOutputFormat::Default;
+ EMessagingFormat MessagingFormat = EMessagingFormat::SingleMessage;
private:
TVector<EOutputFormat> AllowedInputFormats;
TVector<EOutputFormat> AllowedFormats;
+ TVector<EMessagingFormat> AllowedMessagingFormats;
bool DeprecatedOptionUsed = false;
};
diff --git a/ydb/public/lib/ydb_cli/common/formats.h b/ydb/public/lib/ydb_cli/common/formats.h
index 05fd8b6ba0b..09a893db58e 100644
--- a/ydb/public/lib/ydb_cli/common/formats.h
+++ b/ydb/public/lib/ydb_cli/common/formats.h
@@ -3,12 +3,10 @@
namespace NYdb {
namespace NConsoleClient {
+// EOutputFormat to be used in operations related to structured data
enum class EOutputFormat {
Default /* "default" */,
Pretty /* "pretty" */,
- PrettyRaw /* "pretty-raw" */,
- PrettyUnicode /* "pretty-unicode" */,
- PrettyBase64 /* "pretty-base64" */,
Json /* "json" */,
JsonUnicode /* "json-unicode" */,
JsonUnicodeArray /* "json-unicode-array" */,
@@ -18,14 +16,18 @@ enum class EOutputFormat {
ProtoJsonBase64 /* "proto-json-base64" */,
Csv /* "csv" */,
Tsv /* "tsv" */,
+};
+
+// EMessagingFormat to be used in both input and output when working with files/pipes in operations related to messaging
+// This format defines rules for data transformation, framing, metadata envelope format
+enum class EMessagingFormat {
+ Pretty /* "pretty" */,
SingleMessage /* "single-message" */,
NewlineDelimited /* "newline-delimited" */,
- NewlineBase64 /* "newline-base64" */,
Concatenated /* "concatenated" */,
- JsonBase64StreamConcat /* "json-base64-stream-concat" */,
- JsonUnicodeStreamConcat /* "json-unicode-stream-concat" */,
- JsonRawStreamConcat /* "json-raw-stream-concat" */,
+ JsonStreamConcat /* "json-stream-concat" */,
+ JsonArray /* "json-array" */,
};
}
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
index e8660548f13..ecbb258efa7 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_read.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
@@ -12,8 +12,8 @@ namespace NYdb::NConsoleClient {
constexpr i64 MessagesLimitDefaultPrettyFormat = 10;
constexpr i64 MessagesLimitDefaultJsonArrayFormat = 500;
- bool IsStreamingFormat(EOutputFormat format) {
- return format == EOutputFormat::NewlineBase64 || format == EOutputFormat::NewlineDelimited || format == EOutputFormat::Concatenated;
+ bool IsStreamingFormat(EMessagingFormat format) {
+ return format == EMessagingFormat::NewlineDelimited || format == EMessagingFormat::Concatenated;
}
}
@@ -24,13 +24,13 @@ namespace NYdb::NConsoleClient {
TMaybe<i64> limit,
bool commit,
bool wait,
- EOutputFormat format,
+ EMessagingFormat format,
TVector<ETopicMetadataField> metadataFields,
ETransformBody transform,
TDuration idleTimeout)
: MetadataFields_(metadataFields)
, IdleTimeout_(idleTimeout)
- , OutputFormat_(format)
+ , MessagingFormat_(format)
, Transform_(transform)
, Limit_(limit)
, Commit_(commit)
@@ -53,20 +53,20 @@ namespace NYdb::NConsoleClient {
OutputTable_ = std::make_unique<TPrettyTable>(table);
if (!ReaderParams_.Limit().Defined()) {
- if (IsStreamingFormat(ReaderParams_.OutputFormat())) {
+ if (IsStreamingFormat(ReaderParams_.MessagingFormat())) {
MessagesLeft_ = MessagesLimitUnlimited;
}
- if (ReaderParams_.OutputFormat() == EOutputFormat::Pretty) {
+ if (ReaderParams_.MessagingFormat() == EMessagingFormat::Pretty) {
MessagesLeft_ = MessagesLimitDefaultPrettyFormat;
}
- if (ReaderParams_.OutputFormat() == EOutputFormat::JsonRawArray) {
+ if (ReaderParams_.MessagingFormat() == EMessagingFormat::JsonArray) {
MessagesLeft_ = MessagesLimitDefaultJsonArrayFormat;
}
return;
}
i64 limit = *(ReaderParams_.Limit());
- if (IsStreamingFormat(ReaderParams_.OutputFormat()) && limit == 0) {
+ if (IsStreamingFormat(ReaderParams_.MessagingFormat()) && limit == 0) {
limit = -1;
}
MessagesLeft_ = limit;
@@ -83,20 +83,11 @@ namespace NYdb::NConsoleClient {
using TReceivedMessage = NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage;
- void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, EOutputFormat format, ETransformBody transform, ETopicMetadataField f, size_t idx) {
+ void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, ETransformBody transform, ETopicMetadataField f, size_t idx) {
switch (f) {
case ETopicMetadataField::Body:
- if (format == EOutputFormat::PrettyBase64) {
- row.Column(idx, FormatBody(message.GetData(), transform));
- }
- if (format == EOutputFormat::Pretty || format == EOutputFormat::PrettyRaw) {
- row.Column(idx, FormatBody(message.GetData(), transform));
- }
- if (format == EOutputFormat::PrettyUnicode) {
- row.Column(idx, message.GetData());
- }
+ row.Column(idx, FormatBody(message.GetData(), transform));
break;
-
case ETopicMetadataField::CreateTime:
row.Column(idx, message.GetCreateTime());
break;
@@ -128,7 +119,7 @@ namespace NYdb::NConsoleClient {
TPrettyTable::TRow& row = OutputTable_->AddRow();
for (size_t i = 0; i < ReaderParams_.MetadataFields().size(); ++i) {
ETopicMetadataField f = ReaderParams_.MetadataFields()[i];
- AddMetadataFieldToRow(row, message, ReaderParams_.OutputFormat(), ReaderParams_.Transform(), f, i);
+ AddMetadataFieldToRow(row, message, ReaderParams_.Transform(), f, i);
}
}
@@ -141,14 +132,10 @@ namespace NYdb::NConsoleClient {
}
void TTopicReader::Close(IOutputStream& output, TDuration closeTimeout) {
- if (ReaderParams_.OutputFormat() == EOutputFormat::Pretty ||
- ReaderParams_.OutputFormat() == EOutputFormat::PrettyBase64 ||
- ReaderParams_.OutputFormat() == EOutputFormat::PrettyRaw) {
+ if (ReaderParams_.MessagingFormat() == EMessagingFormat::Pretty) {
PrintMessagesInPrettyFormat(output);
}
- if (ReaderParams_.OutputFormat() == EOutputFormat::JsonRawArray ||
- ReaderParams_.OutputFormat() == EOutputFormat::JsonBase64Array ||
- ReaderParams_.OutputFormat() == EOutputFormat::JsonUnicodeArray) {
+ if (ReaderParams_.MessagingFormat() == EMessagingFormat::JsonArray) {
PrintMessagesInJsonArrayFormat(output);
}
output.Flush();
@@ -159,19 +146,19 @@ namespace NYdb::NConsoleClient {
}
void TTopicReader::HandleReceivedMessage(const TReceivedMessage& message, IOutputStream& output) {
- EOutputFormat outputFormat = ReaderParams_.OutputFormat();
- if (outputFormat == EOutputFormat::Default || outputFormat == EOutputFormat::Concatenated) {
+ EMessagingFormat MessagingFormat = ReaderParams_.MessagingFormat();
+ if (MessagingFormat == EMessagingFormat::SingleMessage || MessagingFormat == EMessagingFormat::Concatenated) {
output << FormatBody(message.GetData(), ReaderParams_.Transform());
output.Flush();
return;
}
- if (outputFormat == EOutputFormat::NewlineDelimited) {
+ if (MessagingFormat == EMessagingFormat::NewlineDelimited) {
output << FormatBody(message.GetData(), ReaderParams_.Transform());
output << "\n";
output.Flush();
return;
}
- if (outputFormat == EOutputFormat::Default) {
+ if (MessagingFormat == EMessagingFormat::SingleMessage) {
output << FormatBody(message.GetData(), ReaderParams_.Transform());
return;
}
@@ -239,7 +226,7 @@ namespace NYdb::NConsoleClient {
int TTopicReader::Run(IOutputStream& output) {
LastMessageReceivedTs_ = TInstant::Now();
- bool waitForever = ReaderParams_.Wait() && (ReaderParams_.OutputFormat() == EOutputFormat::NewlineDelimited || ReaderParams_.OutputFormat() == EOutputFormat::Concatenated);
+ bool waitForever = ReaderParams_.Wait() && (ReaderParams_.MessagingFormat() == EMessagingFormat::NewlineDelimited || ReaderParams_.MessagingFormat() == EMessagingFormat::Concatenated);
while ((MessagesLeft_ > 0 || MessagesLeft_ == -1) && !IsInterrupted()) {
TInstant messageReceiveDeadline = LastMessageReceivedTs_ + ReaderParams_.IdleTimeout();
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h
index 5afdb6d9f59..cc6278c77b2 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_read.h
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.h
@@ -20,7 +20,7 @@ namespace NYdb::NConsoleClient {
TMaybe<i64> limit,
bool commit,
bool wait,
- EOutputFormat format,
+ EMessagingFormat format,
TVector<ETopicMetadataField> metadataFields,
ETransformBody transform,
TDuration idleTimeout);
@@ -33,7 +33,7 @@ namespace NYdb::NConsoleClient {
GETTER(bool, Commit);
GETTER(TMaybe<i64>, Limit);
GETTER(bool, Wait);
- GETTER(EOutputFormat, OutputFormat);
+ GETTER(EMessagingFormat, MessagingFormat);
GETTER(ETransformBody, Transform);
GETTER(TDuration, IdleTimeout);
// TODO(shmel1k@): add batching settings.
@@ -45,7 +45,7 @@ namespace NYdb::NConsoleClient {
TMaybe<int> FlushMessagesCount_;
TDuration IdleTimeout_;
- EOutputFormat OutputFormat_ = EOutputFormat::Default;
+ EMessagingFormat MessagingFormat_ = EMessagingFormat::SingleMessage;
ETransformBody Transform_ = ETransformBody::None;
TMaybe<i64> Limit_ = Nothing();
bool Commit_ = false;
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp
index d18bc53c4d4..cbe1ae66c24 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp
@@ -24,7 +24,7 @@ namespace NYdb::NConsoleClient {
{
"some simple message",
},
- "", TTopicReaderSettings(Nothing(), false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1)));
+ "", TTopicReaderSettings(Nothing(), false, false, EMessagingFormat::SingleMessage, {}, ETransformBody::None, TDuration::Seconds(1)));
}
void TestRun_ReadTwoMessages_With_Limit_1() {
@@ -35,7 +35,7 @@ namespace NYdb::NConsoleClient {
{
"message1",
},
- "", TTopicReaderSettings(1, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1)));
+ "", TTopicReaderSettings(1, false, false, EMessagingFormat::SingleMessage, {}, ETransformBody::None, TDuration::Seconds(1)));
}
void TestRun_ReadMoreMessagesThanLimit_Without_Wait_NewlineDelimited() {
@@ -51,7 +51,7 @@ namespace NYdb::NConsoleClient {
"message2",
"message3",
},
- "\n", TTopicReaderSettings(limit, false, false, EOutputFormat::NewlineDelimited, {}, ETransformBody::None, TDuration::Seconds(1)));
+ "\n", TTopicReaderSettings(limit, false, false, EMessagingFormat::NewlineDelimited, {}, ETransformBody::None, TDuration::Seconds(1)));
}
void TestRun_ReadMoreMessagesThanLimit_Without_Wait_NoDelimiter() {
@@ -64,7 +64,7 @@ namespace NYdb::NConsoleClient {
{
"message1message2message3message4",
},
- "", TTopicReaderSettings(limit, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1)));
+ "", TTopicReaderSettings(limit, false, false, EMessagingFormat::SingleMessage, {}, ETransformBody::None, TDuration::Seconds(1)));
}
void TestRun_ReadMessages_Output_Base64() {
@@ -80,7 +80,7 @@ namespace NYdb::NConsoleClient {
"bWVzc2FnZTI=",
"bWVzc2FnZTM=",
},
- "\n", TTopicReaderSettings(limit, false, false, EOutputFormat::NewlineDelimited, {}, ETransformBody::Base64, TDuration::Seconds(1)));
+ "\n", TTopicReaderSettings(limit, false, false, EMessagingFormat::NewlineDelimited, {}, ETransformBody::Base64, TDuration::Seconds(1)));
}
void TestRun_Read_Less_Messages_Than_Sent() {
@@ -94,7 +94,7 @@ namespace NYdb::NConsoleClient {
{
"message1message2",
},
- "", TTopicReaderSettings(limit, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1)));
+ "", TTopicReaderSettings(limit, false, false, EMessagingFormat::SingleMessage, {}, ETransformBody::None, TDuration::Seconds(1)));
}
private:
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.cpp b/ydb/public/lib/ydb_cli/topic/topic_write.cpp
index e05ef1976f7..f4bbdf5cb60 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_write.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_write.cpp
@@ -13,15 +13,15 @@ namespace NYdb::NConsoleClient {
TTopicWriterParams::TTopicWriterParams() {
}
- TTopicWriterParams::TTopicWriterParams(EOutputFormat inputFormat, TMaybe<TString> delimiter,
+ TTopicWriterParams::TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter,
ui64 messageSizeLimit, TMaybe<TDuration> batchDuration,
TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount)
- : InputFormat_(inputFormat)
+ : MessagingFormat_(inputFormat)
, BatchDuration_(batchDuration)
, BatchSize_(batchSize)
, BatchMessagesCount_(batchMessagesCount)
, MessageSizeLimit_(messageSizeLimit) {
- if (inputFormat == EOutputFormat::NewlineDelimited || inputFormat == EOutputFormat::Concatenated) {
+ if (inputFormat == EMessagingFormat::NewlineDelimited || inputFormat == EMessagingFormat::Concatenated) {
Delimiter_ = TMaybe<char>('\n');
}
if (delimiter.Defined()) {
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.h b/ydb/public/lib/ydb_cli/topic/topic_write.h
index 445e3c9016f..131c0a7ea26 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_write.h
+++ b/ydb/public/lib/ydb_cli/topic/topic_write.h
@@ -14,7 +14,7 @@ namespace NYdb::NConsoleClient {
class TTopicWriterParams {
public:
TTopicWriterParams();
- TTopicWriterParams(EOutputFormat inputFormat, TMaybe<TString> delimiter,
+ TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter,
ui64 messageSizeLimit, TMaybe<TDuration> batchDuration,
TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount);
TTopicWriterParams(const TTopicWriterParams&) = default;
@@ -25,12 +25,12 @@ namespace NYdb::NConsoleClient {
GETTER(TMaybe<ui64>, BatchSize);
GETTER(TMaybe<ui64>, BatchMessagesCount);
GETTER(ui64, MessageSizeLimit);
- GETTER(EOutputFormat, InputFormat);
+ GETTER(EMessagingFormat, MessagingFormat);
private:
TMaybe<TString> File_;
TMaybe<char> Delimiter_;
- EOutputFormat InputFormat_ = EOutputFormat::Default;
+ EMessagingFormat MessagingFormat_ = EMessagingFormat::SingleMessage;
// TODO(shmel1k@): move to 'TWithBatchingCommand' or something like that.
TMaybe<TDuration> BatchDuration_;
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
index 8540e647780..57dbc5e32f0 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
@@ -36,7 +36,7 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_OnlyDelimiters() {
TStringStream str = TString("\n") * 6;
- TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\n", 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing()));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -97,7 +97,7 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_1KiB_Newline_Delimiter() {
TStringStream str = TString("a") * 512 + "\n" + TString("b") * 512;
- TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\n", 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing()));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -121,7 +121,7 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_1KiB_Newline_Delimited_With_Two_Delimiters_In_A_Row() {
TStringStream str = TString("a") * 512 + "\n\n" + TString("b") * 512;
- TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing()));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -165,7 +165,7 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_ZeroSymbol_Delimited() {
auto& s = "\0\0\0\0\n\nprivet";
TStringStream str = TString(std::begin(s), std::end(s));
- TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\0", 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\0", 0, Nothing(), Nothing(), Nothing()));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -204,7 +204,7 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_Custom_Delimiter_Delimited() {
TStringStream str = TString("privet_vasya_kak_dela?");
- TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "_", 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "_", 0, Nothing(), Nothing(), Nothing()));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -237,25 +237,25 @@ namespace NYdb::NConsoleClient {
}
void TestTopicWriterParams_Format_NewlineDelimited() {
- TTopicWriterParams p = TTopicWriterParams(EOutputFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing());
- UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::NewlineDelimited);
+ TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing());
+ UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::NewlineDelimited);
UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n');
}
void TestTopicWriterParams_Format_Concatenated() {
- TTopicWriterParams p = TTopicWriterParams(EOutputFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing());
- UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::Concatenated);
+ TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing());
+ UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::Concatenated);
UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n');
}
void TestTopicWriterParams_No_Delimiter() {
- TTopicWriterParams p = TTopicWriterParams(EOutputFormat::Default, Nothing(), 0, Nothing(), Nothing(), Nothing());
- UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::Default);
+ TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing());
+ UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::SingleMessage);
UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), Nothing());
}
void TestTopicWriterParams_InvalidDelimiter() {
- UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EOutputFormat::Default, "invalid", 0, Nothing(), Nothing(), Nothing()), yexception);
+ UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EMessagingFormat::SingleMessage, "invalid", 0, Nothing(), Nothing(), Nothing()), yexception);
}
private: