diff options
author | shmel1k <shmel1k@ydb.tech> | 2022-09-09 12:55:53 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2022-09-09 12:55:53 +0300 |
commit | 422aefd03b121a1966b80756d8a06a86d9787e04 (patch) | |
tree | e90bdbb2ce02363373b929fae08f72dd308ce1b3 | |
parent | 75a5e826e2f818015c40fc7d76f3cdb4d5116151 (diff) | |
download | ydb-422aefd03b121a1966b80756d8a06a86d9787e04.tar.gz |
PR from branch users/shmel1k/LOGBROKER-7217_improve_writing
[] fix tests
[] improved writing: added transform option
[] improve 'topic write' options
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp | 256 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_topic.h | 32 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_util.h | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_write.cpp | 76 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_write.h | 14 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp | 85 |
6 files changed, 276 insertions, 188 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index 7585a101331..2e4e0f59d18 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -15,10 +15,10 @@ namespace NYdb::NConsoleClient { namespace { THashMap<NYdb::NTopic::ECodec, TString> CodecsDescriptions = { - {NYdb::NTopic::ECodec::RAW, "Raw codec. No data compression"}, - {NYdb::NTopic::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"}, - {NYdb::NTopic::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"}, - {NYdb::NTopic::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"}, + {NYdb::NTopic::ECodec::RAW, "Raw codec. No data compression."}, + {NYdb::NTopic::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm."}, + {NYdb::NTopic::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm."}, + {NYdb::NTopic::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm."}, }; THashMap<TString, NYdb::NTopic::ECodec> ExistingCodecs = { @@ -34,19 +34,19 @@ namespace NYdb::NConsoleClient { }; THashMap<NTopic::EMeteringMode, TString> MeteringModesDescriptions = { - std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::ReservedCapacity, "Throughput and storage limits on hourly basis, write operations"), - std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::RequestUnits, "Read/write operations valued in request units, storage usage on hourly basis"), + std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::ReservedCapacity, "Throughput and storage limits on hourly basis, write operations."), + std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::RequestUnits, "Read/write operations valued in request units, storage usage on hourly basis."), }; // TODO(shmel1k@): improve docs THashMap<ETopicMetadataField, TString> TopicMetadataFieldsDescriptions = { {ETopicMetadataField::Body, "Message data"}, - {ETopicMetadataField::WriteTime, "Message write time, a UNIX timestamp the message was written to server"}, - {ETopicMetadataField::CreateTime, "Message creation time, a UNIX timestamp provided by the publishing client"}, - {ETopicMetadataField::MessageGroupID, "Message group id. All messages with the same message group id are guaranteed to be read in FIFO order"}, - {ETopicMetadataField::Offset, "Message offset. Offset orders messages in each partition"}, - {ETopicMetadataField::SeqNo, "Message sequence number, used for message deduplication when publishing"}, - {ETopicMetadataField::Meta, "Message additional metadata"}, + {ETopicMetadataField::WriteTime, "Message write time, a UNIX timestamp the message was written to server."}, + {ETopicMetadataField::CreateTime, "Message creation time, a UNIX timestamp provided by the publishing client."}, + {ETopicMetadataField::MessageGroupID, "Message group id. All messages with the same message group id are guaranteed to be read in FIFO order."}, + {ETopicMetadataField::Offset, "Message offset. Offset orders messages in each partition."}, + {ETopicMetadataField::SeqNo, "Message sequence number, used for message deduplication when publishing."}, + {ETopicMetadataField::Meta, "Message additional metadata."}, }; const TVector<ETopicMetadataField> AllTopicMetadataFields = { @@ -75,20 +75,45 @@ namespace NYdb::NConsoleClient { }; constexpr TDuration DefaultIdleTimeout = TDuration::Seconds(1); + + bool IsStreamingFormat(EMessagingFormat format) { + return format == EMessagingFormat::NewlineDelimited || format == EMessagingFormat::Concatenated; + } } // namespace - void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) { - TStringStream description; - description << "Comma-separated list of supported codecs. Available codecs: "; - NColorizer::TColors colors = NColorizer::AutoColors(Cout); - for (const auto& codec : supportedCodecs) { - auto findResult = CodecsDescriptions.find(codec); - Y_VERIFY(findResult != CodecsDescriptions.end(), - "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str()); - description << "\n " << colors.BoldColor() << codec << colors.OldColor() - << "\n " << findResult->second; + namespace { + TString PrepareAllowedCodecsDescription(const TString& descriptionPrefix, const TVector<NTopic::ECodec>& codecs) { + TStringStream description; + description << descriptionPrefix << ". Available codecs: "; + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + for (const auto& codec : codecs) { + auto findResult = CodecsDescriptions.find(codec); + Y_VERIFY(findResult != CodecsDescriptions.end(), + "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str()); + description << "\n " << colors.BoldColor() << codec << colors.OldColor() + << "\n " << findResult->second; + } + + return description.Str(); } - config.Opts->AddLongOption("supported-codecs", description.Str()) + + NTopic::ECodec ParseCodec(const TString& codecStr, const TVector<NTopic::ECodec>& allowedCodecs) { + auto exists = ExistingCodecs.find(to_lower(codecStr)); + if (exists == ExistingCodecs.end()) { + throw TMisuseException() << "Codec " << codecStr << " is not available for this command"; + } + + if (std::find(allowedCodecs.begin(), allowedCodecs.end(), exists->second) == allowedCodecs.end()) { + throw TMisuseException() << "Codec " << codecStr << " is not available for this command"; + } + + return exists->second; + } + } + + void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) { + TString description = PrepareAllowedCodecsDescription("Comma-separated list of supported codecs", supportedCodecs); + config.Opts->AddLongOption("supported-codecs", description) .RequiredArgument("STRING") .StoreResult(&SupportedCodecsStr_); AllowedCodecs_ = supportedCodecs; @@ -102,16 +127,7 @@ namespace NYdb::NConsoleClient { TVector<NTopic::ECodec> parsedCodecs; TVector<TString> split = SplitString(SupportedCodecsStr_, ","); for (const TString& codecStr : split) { - auto exists = ExistingCodecs.find(to_lower(codecStr)); - if (exists == ExistingCodecs.end()) { - throw TMisuseException() << "Supported codec " << codecStr << " is not available for this command"; - } - - if (std::find(AllowedCodecs_.begin(), AllowedCodecs_.end(), exists->second) == AllowedCodecs_.end()) { - throw TMisuseException() << "Supported codec " << codecStr << " is not available for this command"; - } - - SupportedCodecs_.push_back(exists->second); + SupportedCodecs_.push_back(::NYdb::NConsoleClient::ParseCodec(codecStr, AllowedCodecs_)); } } @@ -203,7 +219,7 @@ namespace NYdb::NConsoleClient { int TCommandTopicCreate::Run(TConfig& config) { TDriver driver = CreateDriver(config); - NYdb::NTopic::TTopicClient persQueueClient(driver); + NYdb::NTopic::TTopicClient topicClient(driver); auto settings = NYdb::NTopic::TCreateTopicSettings(); settings.PartitioningSettings(PartitionsCount_, PartitionsCount_); @@ -223,7 +239,7 @@ namespace NYdb::NConsoleClient { settings.RetentionPeriod(TDuration::Hours(RetentionPeriodHours_)); settings.RetentionStorageMb(RetentionStorageMb_ / 1_MB); - auto status = persQueueClient.CreateTopic(TopicName, settings).GetValueSync(); + auto status = topicClient.CreateTopic(TopicName, settings).GetValueSync(); ThrowOnError(status); return EXIT_SUCCESS; } @@ -342,7 +358,7 @@ namespace NYdb::NConsoleClient { void TCommandTopicConsumerAdd::Config(TConfig& config) { TYdbCommand::Config(config); - config.Opts->AddLongOption("consumer-name", "New consumer for topic") + config.Opts->AddLongOption("consumer", "New consumer for topic") .Required() .StoreResult(&ConsumerName_); config.Opts->AddLongOption("service-type", "Service type of reader") @@ -389,7 +405,7 @@ namespace NYdb::NConsoleClient { void TCommandTopicConsumerDrop::Config(TConfig& config) { TYdbCommand::Config(config); - config.Opts->AddLongOption("consumer-name", "Consumer which will be dropped") + config.Opts->AddLongOption("consumer", "Consumer which will be dropped") .Required() .StoreResult(&ConsumerName_); config.Opts->SetFreeArgsNum(1); @@ -413,6 +429,41 @@ namespace NYdb::NConsoleClient { return EXIT_SUCCESS; } + void TCommandWithTransformBody::AddTransform(TClientCommand::TConfig& config) { + TStringStream description; + description << "Conversion between a message data in the topic and the client filesystem/terminal. Available options: "; + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + for (const auto& iter : TransformBodyDescriptions) { + description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second; + } + + config.Opts->AddLongOption("transform", description.Str()) + .Optional() + .DefaultValue("none") + .StoreResult(&TransformStr_); + } + + void TCommandWithTransformBody::ParseTransform() { + if (TransformStr_.empty()) { + return; + } + + TString val = TransformStr_; + if (val == (TStringBuilder() << ETransformBody::None)) { + return; + } + if (val == (TStringBuilder() << ETransformBody::Base64)) { + Transform_ = ETransformBody::Base64; + return; + } + + throw TMisuseException() << "Transform " << TransformStr_ << " not found in available \"transform\" values"; + } + + ETransformBody TCommandWithTransformBody::GetTransform() const { + return Transform_; + } + TCommandTopicRead::TCommandTopicRead() : TYdbCommand("read", {}, "Read from a topic to the client filesystem or terminal") { } @@ -430,20 +481,6 @@ namespace NYdb::NConsoleClient { .StoreResult(&WithMetadataFields_); } - void TCommandTopicRead::AddAllowedTransformFormats(TConfig& config) { - TStringStream description; - description << "Conversion between a message data in the topic and the client filesystem/terminal. Available options: "; - NColorizer::TColors colors = NColorizer::AutoColors(Cout); - for (const auto& iter : TransformBodyDescriptions) { - description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second; - } - - config.Opts->AddLongOption("transform", description.Str()) - .Optional() - .DefaultValue("none") - .StoreResult(&TransformStr_); - } - void TCommandTopicRead::Config(TConfig& config) { TYdbCommand::Config(config); config.Opts->SetFreeArgsNum(1); @@ -460,25 +497,9 @@ namespace NYdb::NConsoleClient { config.Opts->AddLongOption('c', "consumer", "Consumer name.") .Required() .StoreResult(&Consumer_); -// config.Opts->AddLongOption("offset", "Offset to start read from") -// .Optional() -// .StoreResult(&Offset_); -// config.Opts->AddLongOption("partition", "Partition to read from") -// .Optional() -// .StoreResult(&Partition_); config.Opts->AddLongOption('f', "file", "File to write data to. In not specified, data is written to the standard output.") .Optional() .StoreResult(&File_); -// NOTE(shmel1k@): temporary disabled options -// config.Opts->AddLongOption("flush-duration", "Duration for message flushing") -// .Optional() -// .StoreResult(&FlushDuration_); -// config.Opts->AddLongOption("flush-size", "Maximum flush size") // TODO(shmel1k@): improve -// .Optional() -// .StoreResult(&FlushSize_); -// config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve -// .Optional() -// .StoreResult(&FlushMessagesCount_); config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages. Topic is considered empty if no new messages arrive within this period.") .Optional() .DefaultValue(DefaultIdleTimeout) @@ -487,13 +508,6 @@ namespace NYdb::NConsoleClient { .Optional() .DefaultValue(true) .StoreResult(&Commit_); -// NOTE(shmel1k@): temporary disabled option -// config.Opts->AddLongOption("message-size-limit", "Maximum message size") -// .Optional() -// .StoreResult(&MessageSizeLimit_); -// config.Opts->AddLongOption("discard-above-limits", "Do not print messages with size more than defined in 'message-size-limit' option") -// .Optional() -// .StoreResult(&DiscardAboveLimits_); config.Opts->AddLongOption("limit", "Limit on message count to read, 0 - unlimited. " "If avobe 0, processing stops when either topic is empty, or the specified limit reached. " "Must be above 0 for pretty output format." @@ -509,7 +523,7 @@ namespace NYdb::NConsoleClient { .StoreResult(&Timestamp_); AddAllowedMetadataFields(config); - AddAllowedTransformFormats(config); + AddTransform(config); } void TCommandTopicRead::ParseMetadataFields() { @@ -548,29 +562,12 @@ namespace NYdb::NConsoleClient { MetadataFields_ = result; } - void TCommandTopicRead::ParseTransformFormat() { - if (!TransformStr_.Defined()) { - return; - } - - TString val = *TransformStr_; - if (val == (TStringBuilder() << ETransformBody::None)) { - return; - } - if (val == (TStringBuilder() << ETransformBody::Base64)) { - Transform_ = ETransformBody::Base64; - return; - } - - throw TMisuseException() << "Transform " << *TransformStr_ << " not found in available \"transform\" values"; - } - void TCommandTopicRead::Parse(TConfig& config) { TYdbCommand::Parse(config); ParseTopicName(config, 0); ParseMessagingFormats(); ParseMetadataFields(); - ParseTransformFormat(); + ParseTransform(); } NTopic::TReadSessionSettings TCommandTopicRead::PrepareReadSessionSettings() { @@ -590,9 +587,9 @@ namespace NYdb::NConsoleClient { void TCommandTopicRead::ValidateConfig() { // TODO(shmel1k@): add more formats. - if (MessagingFormat != EMessagingFormat::SingleMessage && (Limit_.Defined() && (Limit_ < 0 || Limit_ > 500))) { + if (!IsStreamingFormat(MessagingFormat) && (Limit_.Defined() && (Limit_ <= 0 || Limit_ > 500))) { throw TMisuseException() << "OutputFormat " << OutputFormat << " is not compatible with " - << "limit equal '0' or more than '500': '" << *Limit_ << "' was given"; + << "limit less and equal '0' or more than '500': '" << *Limit_ << "' was given"; } } @@ -600,10 +597,10 @@ namespace NYdb::NConsoleClient { ValidateConfig(); auto driver = std::make_unique<TDriver>(CreateDriver(config)); - NTopic::TTopicClient persQueueClient(*driver); - auto readSession = persQueueClient.CreateReadSession(PrepareReadSessionSettings()); + NTopic::TTopicClient topicClient(*driver); + auto readSession = topicClient.CreateReadSession(PrepareReadSessionSettings()); - TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient); + TTopicInitializationChecker checker = TTopicInitializationChecker(topicClient); checker.CheckTopicExistence(TopicName, Consumer_); { @@ -613,7 +610,7 @@ namespace NYdb::NConsoleClient { Wait_, MessagingFormat, MetadataFields_, - Transform_, + GetTransform(), IdleTimeout_)); reader.Init(); @@ -637,6 +634,27 @@ namespace NYdb::NConsoleClient { return EXIT_SUCCESS; } + void TCommandWithCodec::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NTopic::ECodec>& allowedCodecs) { + TString description = PrepareAllowedCodecsDescription("Client-side compression algorithm. When read, data will be uncompressed transparently with a codec used on write", allowedCodecs); + config.Opts->AddLongOption("codec", description) + .Optional() + .DefaultValue((TStringBuilder() << NTopic::ECodec::RAW)) + .StoreResult(&CodecStr_); + AllowedCodecs_ = allowedCodecs; + } + + void TCommandWithCodec::ParseCodec() { + if (CodecStr_.empty()) { + return; + } + + Codec_ = ::NYdb::NConsoleClient::ParseCodec(CodecStr_, AllowedCodecs_); + } + + NTopic::ECodec TCommandWithCodec::GetCodec() const { + return Codec_; + } + TCommandTopicWrite::TCommandTopicWrite() : TYdbCommand("write", {}, "Write to topic command") { } @@ -652,7 +670,7 @@ namespace NYdb::NConsoleClient { // EOutputFormat::JsonRawStreamConcat, // EOutputFormat::JsonRawArray, }); - AddAllowedCodecs(config, {NTopic::ECodec::RAW}); + AddAllowedCodecs(config, AllowedCodecs); // TODO(shmel1k@): improve help. config.Opts->AddLongOption('d', "delimiter", "Delimiter to split messages") @@ -661,28 +679,19 @@ namespace NYdb::NConsoleClient { config.Opts->AddLongOption('f', "file", "File to read data from") .Optional() .StoreResult(&File_); - config.Opts->AddLongOption("message-size-limit", "Size limit for a single message") - .Optional() - .StoreResult(&MessageSizeLimitStr_); - config.Opts->AddLongOption("batch-duration", "Duration for message batching") - .Optional() - .StoreResult(&BatchDuration_); - config.Opts->AddLongOption("batch-size", "Maximum batch size") // TODO(shmel1k@): improve - .Optional() - .StoreResult(&BatchSize_); - config.Opts->AddLongOption("batch-messages-count", "") // TODO(shmel1k@): improve - .Optional() - .StoreResult(&BatchMessagesCount_); - config.Opts->AddLongOption("message-group-id", "Message Group ID") // TODO(shmel1k@): improve + config.Opts->AddLongOption("message-group-id", "Message group identifier. If not set, all messages from input will get the same identifier based on hex string\nrepresentation of 3 random bytes") .Optional() .StoreResult(&MessageGroupId_); + + AddTransform(config); } void TCommandTopicWrite::Parse(TConfig& config) { TYdbCommand::Parse(config); ParseTopicName(config, 0); ParseMessagingFormats(); - ParseCodecs(); + ParseTransform(); + ParseCodec(); if (Delimiter_.Defined() && MessagingFormat != EMessagingFormat::SingleMessage) { throw TMisuseException() << "Both mutually exclusive options \"delimiter\"(\"--delimiter\", \"-d\" " @@ -692,11 +701,8 @@ namespace NYdb::NConsoleClient { NTopic::TWriteSessionSettings TCommandTopicWrite::PrepareWriteSessionSettings() { NTopic::TWriteSessionSettings settings; -// settings.Codec(GetCodecs()[0]); - settings.Codec(NTopic::ECodec::GZIP); + settings.Codec(GetCodec()); settings.Path(TopicName); - //settings.BatchFlushInterval(BatchDuration_); - //settings.BatchFlushSizeBytes(BatchSize_); if (!MessageGroupId_.Defined()) { const TString rnd = ToString(TInstant::Now().NanoSeconds()); @@ -721,19 +727,15 @@ namespace NYdb::NConsoleClient { SetInterruptHandlers(); auto driver = std::make_unique<TDriver>(CreateDriver(config)); - NTopic::TTopicClient persQueueClient(*driver); - TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient); + NTopic::TTopicClient topicClient(*driver); + TTopicInitializationChecker checker = TTopicInitializationChecker(topicClient); checker.CheckTopicExistence(TopicName); { auto writeSession = NTopic::TTopicClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings())); - auto writer = TTopicWriter(writeSession, std::move(TTopicWriterParams( - MessagingFormat, - Delimiter_, - MessageSizeLimit_, - BatchDuration_, - BatchSize_, - BatchMessagesCount_))); + auto writer = + TTopicWriter(writeSession, std::move(TTopicWriterParams(MessagingFormat, Delimiter_, MessageSizeLimit_, BatchDuration_, + BatchSize_, BatchMessagesCount_, GetTransform()))); if (int status = writer.Init(); status) { return status; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index 3cfde53b576..fb8a40ae264 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -108,15 +108,22 @@ namespace NYdb::NConsoleClient { TString ConsumerName_; }; - class TCommandTopicInternal: public TClientCommandTree { - public: - TCommandTopicInternal(); + class TCommandWithTransformBody { + protected: + void AddTransform(TClientCommand::TConfig& config); + void ParseTransform(); + ETransformBody GetTransform() const; + + private: + TString TransformStr_; + ETransformBody Transform_ = ETransformBody::None; }; class TCommandTopicRead: public TYdbCommand, public TCommandWithFormat, public TInterruptibleCommand, - public TCommandWithTopicName { + public TCommandWithTopicName, + public TCommandWithTransformBody { public: TCommandTopicRead(); void Config(TConfig& config) override; @@ -156,11 +163,24 @@ namespace NYdb::NConsoleClient { NTopic::TReadSessionSettings PrepareReadSessionSettings(); }; + class TCommandWithCodec { + protected: + void AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NTopic::ECodec>& allowedCodecs); + void ParseCodec(); + NTopic::ECodec GetCodec() const; + + private: + TVector<NTopic::ECodec> AllowedCodecs_; + TString CodecStr_; + NTopic::ECodec Codec_ = NTopic::ECodec::RAW; + }; + class TCommandTopicWrite: public TYdbCommand, public TCommandWithFormat, public TInterruptibleCommand, public TCommandWithTopicName, - public TCommandWithSupportedCodecs { + public TCommandWithCodec, + public TCommandWithTransformBody { public: TCommandTopicWrite(); void Config(TConfig& config) override; @@ -180,7 +200,7 @@ namespace NYdb::NConsoleClient { ui64 MessageSizeLimit_ = 0; void ParseMessageSizeLimit(); - void CheckOptions(NTopic::TTopicClient& persQueueClient); + void CheckOptions(NTopic::TTopicClient& topicClient); private: NTopic::TWriteSessionSettings PrepareWriteSessionSettings(); diff --git a/ydb/public/lib/ydb_cli/topic/topic_util.h b/ydb/public/lib/ydb_cli/topic/topic_util.h index 54fd9ee7dec..5fef3334c5a 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_util.h +++ b/ydb/public/lib/ydb_cli/topic/topic_util.h @@ -1,4 +1,5 @@ #include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/lib/ydb_cli/common/format.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> namespace NYdb::NConsoleClient { diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.cpp b/ydb/public/lib/ydb_cli/topic/topic_write.cpp index ca74f4acf05..045b211f4bc 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_write.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_write.cpp @@ -4,6 +4,7 @@ #include <util/generic/overloaded.h> #include <util/stream/tokenizer.h> #include <util/string/hex.h> +#include <library/cpp/string_utils/base64/base64.h> namespace NYdb::NConsoleClient { namespace { @@ -13,13 +14,14 @@ namespace NYdb::NConsoleClient { TTopicWriterParams::TTopicWriterParams() { } - TTopicWriterParams::TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter, - ui64 messageSizeLimit, TMaybe<TDuration> batchDuration, - TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount) + TTopicWriterParams::TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter, ui64 messageSizeLimit, + TMaybe<TDuration> batchDuration, TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount, + ETransformBody transform) : MessagingFormat_(inputFormat) , BatchDuration_(batchDuration) , BatchSize_(batchSize) , BatchMessagesCount_(batchMessagesCount) + , Transform_(transform) , MessageSizeLimit_(messageSizeLimit) { if (inputFormat == EMessagingFormat::NewlineDelimited || inputFormat == EMessagingFormat::Concatenated) { Delimiter_ = TMaybe<char>('\n'); @@ -88,34 +90,41 @@ namespace NYdb::NConsoleClient { return EXIT_SUCCESS; } - int TTopicWriter::HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent& event) { + int TTopicWriter::HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent* event) { Y_UNUSED(event); return EXIT_SUCCESS; } - int TTopicWriter::HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) { - ContinuationToken_ = std::move(event.ContinuationToken); + int TTopicWriter::HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent* event) { + ContinuationToken_ = std::move(event->ContinuationToken); return EXIT_SUCCESS; } - int TTopicWriter::HandleSessionClosedEvent(const NTopic::TSessionClosedEvent& event) { - ThrowOnError(event); + int TTopicWriter::HandleSessionClosedEvent(const NTopic::TSessionClosedEvent* event) { + ThrowOnError(*event); return EXIT_FAILURE; } int TTopicWriter::HandleEvent(NTopic::TWriteSessionEvent::TEvent& event) { - return std::visit(TOverloaded{ - [&](const NTopic::TWriteSessionEvent::TAcksEvent& event) { - return HandleAcksEvent(event); - }, - [&](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) { - return HandleReadyToAcceptEvent(event); - }, - [&](const NTopic::TSessionClosedEvent& event) { - return HandleSessionClosedEvent(event); - }, - }, - event); + if (auto* acksEvent = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) { + return HandleAcksEvent(acksEvent); + } else if (auto* readyToAcceptEvent = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) { + return HandleReadyToAcceptEvent(readyToAcceptEvent); + } else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&event)) { + return HandleSessionClosedEvent(sessionClosedEvent); + } + + return EXIT_SUCCESS; + } + + namespace { + TString TransformBody(const TString& body, ETransformBody transform) { + if (transform == ETransformBody::None) { + return body; + } + + return Base64Decode(body); + } } TTopicWriter::TSendMessageData TTopicWriter::EnterMessage(IInputStream& input) { @@ -123,9 +132,9 @@ namespace NYdb::NConsoleClient { // TODO(shmel1k@): add JSONStreamReader & etc interfaces. // TODO(shmel1k@): add stream parsing here & improve performance. if (!WriterParams_.Delimiter().Defined()) { - // TODO(shmel1k@): interruption? + TString body = input.ReadAll(); return TSendMessageData{ - .Data = input.ReadAll(), + .Data = TransformBody(body, WriterParams_.Transform()), .NeedSend = true, .ContinueSending = false, }; @@ -142,7 +151,7 @@ namespace NYdb::NConsoleClient { }; } return TSendMessageData{ - .Data = buffer, + .Data = TransformBody(buffer, WriterParams_.Transform()), .NeedSend = true, .ContinueSending = true, }; @@ -175,8 +184,7 @@ namespace NYdb::NConsoleClient { } bool TTopicWriter::Close(TDuration closeTimeout) { - Y_UNUSED(closeTimeout); - if (WriteSession_->Close(TDuration::Hours(12))) { + if (WriteSession_->Close(closeTimeout)) { return true; } TVector<NTopic::TWriteSessionEvent::TEvent> events = WriteSession_->GetEvents(true); @@ -184,23 +192,7 @@ namespace NYdb::NConsoleClient { return false; } for (auto& evt : events) { - bool hasFailure = false; - std::visit(TOverloaded{ - [&](const NTopic::TWriteSessionEvent::TAcksEvent& event) { - Y_UNUSED(event); - }, - [&](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) { - Y_UNUSED(event); - }, - [&](const NTopic::TSessionClosedEvent& event) { - int result = HandleSessionClosedEvent(event); - if (result == EXIT_FAILURE) { - hasFailure = true; - } - }, - }, - evt); - if (hasFailure) { + if (HandleEvent(evt)) { return false; } } diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.h b/ydb/public/lib/ydb_cli/topic/topic_write.h index 78062c1abc1..822acc98c0a 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_write.h +++ b/ydb/public/lib/ydb_cli/topic/topic_write.h @@ -4,6 +4,7 @@ #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/public/lib/ydb_cli/common/format.h> #include <ydb/public/lib/ydb_cli/common/interruptible.h> +#include <ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h> namespace NYdb::NConsoleClient { #define GETTER(TYPE, NAME) \ @@ -16,7 +17,8 @@ namespace NYdb::NConsoleClient { TTopicWriterParams(); TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter, ui64 messageSizeLimit, TMaybe<TDuration> batchDuration, - TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount); + TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount, + ETransformBody transform); TTopicWriterParams(const TTopicWriterParams&) = default; TTopicWriterParams(TTopicWriterParams&&) = default; @@ -26,6 +28,8 @@ namespace NYdb::NConsoleClient { GETTER(TMaybe<ui64>, BatchMessagesCount); GETTER(ui64, MessageSizeLimit); GETTER(EMessagingFormat, MessagingFormat); + GETTER(NTopic::ECodec, Codec); + GETTER(ETransformBody, Transform); private: TMaybe<TString> File_; @@ -36,6 +40,8 @@ namespace NYdb::NConsoleClient { TMaybe<TDuration> BatchDuration_; TMaybe<ui64> BatchSize_; TMaybe<ui64> BatchMessagesCount_; + NTopic::ECodec Codec_ = NTopic::ECodec::RAW; + ETransformBody Transform_ = ETransformBody::None; ui64 MessageSizeLimit_ = 0; }; @@ -62,9 +68,9 @@ namespace NYdb::NConsoleClient { }; int HandleEvent(NTopic::TWriteSessionEvent::TEvent&); - int HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent&); - int HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent&); - int HandleSessionClosedEvent(const NTopic::TSessionClosedEvent&); + int HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent*); + int HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent*); + int HandleSessionClosedEvent(const NTopic::TSessionClosedEvent*); TTopicWriter::TSendMessageData EnterMessage(IInputStream&); // TODO(shmel1k@): make static or like a helper function std::shared_ptr<NTopic::IWriteSession> WriteSession_; diff --git a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp index 57dbc5e32f0..aaefbaaf597 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp @@ -15,6 +15,10 @@ namespace NYdb::NConsoleClient { UNIT_TEST(TestEnterMessage_SomeBinaryData); UNIT_TEST(TestEnterMessage_ZeroSymbol_Delimited); UNIT_TEST(TestEnterMessage_Custom_Delimiter_Delimited); + UNIT_TEST(TestEnterMessage_No_Base64_Transform); + UNIT_TEST(TestEnterMessage_With_Base64_Transform_Invalid_Encode); + UNIT_TEST(TestEnterMessage_With_Base64_Transform); + UNIT_TEST(TestEnterMessage_With_Base64_Transform_NewlineDelimited); UNIT_TEST(TestTopicWriterParams_Format_NewlineDelimited); UNIT_TEST(TestTopicWriterParams_Format_Concatenated); @@ -36,7 +40,7 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_OnlyDelimiters() { TStringStream str = TString("\n") * 6; - TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None)); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -97,7 +101,7 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_1KiB_Newline_Delimiter() { TStringStream str = TString("a") * 512 + "\n" + TString("b") * 512; - TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None)); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -121,7 +125,8 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_1KiB_Newline_Delimited_With_Two_Delimiters_In_A_Row() { TStringStream str = TString("a") * 512 + "\n\n" + TString("b") * 512; - TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing(), + ETransformBody::None)); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -165,7 +170,7 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_ZeroSymbol_Delimited() { auto& s = "\0\0\0\0\n\nprivet"; TStringStream str = TString(std::begin(s), std::end(s)); - TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\0", 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\0", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None)); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -204,7 +209,7 @@ namespace NYdb::NConsoleClient { void TestEnterMessage_Custom_Delimiter_Delimited() { TStringStream str = TString("privet_vasya_kak_dela?"); - TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "_", 0, Nothing(), Nothing(), Nothing())); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "_", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None)); TMessages got = EnterMessageHelper(wr, str); AssertMessagesEqual({ { @@ -236,26 +241,88 @@ namespace NYdb::NConsoleClient { got); } + void TestEnterMessage_No_Base64_Transform() { + TStringStream str = TString("privet_vasya_kak_dela?"); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::None)); + TMessages got = EnterMessageHelper(wr, str); + + AssertMessagesEqual({ + { + .Data = "privet_vasya_kak_dela?", + .NeedSend = true, + .ContinueSending = false, + }, + }, got); + } + + void TestEnterMessage_With_Base64_Transform_Invalid_Encode() { + TStringStream str = TString("cHJpdmV0X3Zhc3lhX2tha19kZWxPw=="); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::Base64)); + UNIT_ASSERT_EXCEPTION(EnterMessageHelper(wr, str), yexception); + } + + void TestEnterMessage_With_Base64_Transform() { + TStringStream str = TString("cHJpdmV0X3Zhc3lhX2tha19kZWxhPw=="); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::Base64)); + TMessages got = EnterMessageHelper(wr, str); + + AssertMessagesEqual( + { + { + .Data = "privet_vasya_kak_dela?", + .NeedSend = true, + .ContinueSending = false, + }, + }, + got); + } + + void TestEnterMessage_With_Base64_Transform_NewlineDelimited() { + TStringStream str = TString("aG93IGRvIHlvdSBkbw==\ncHJpdmV0X3Zhc3lhX2tha19kZWxhPw=="); + TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::Base64)); + TMessages got = EnterMessageHelper(wr, str); + + AssertMessagesEqual( + { + { + .Data = "how do you do", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "privet_vasya_kak_dela?", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = false, + .ContinueSending = false, + }, + }, + got); + } + void TestTopicWriterParams_Format_NewlineDelimited() { - TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing()); + TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::None); UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::NewlineDelimited); UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n'); } void TestTopicWriterParams_Format_Concatenated() { - TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing()); + TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::None); UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::Concatenated); UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n'); } void TestTopicWriterParams_No_Delimiter() { - TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing()); + TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::None); UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::SingleMessage); UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), Nothing()); } void TestTopicWriterParams_InvalidDelimiter() { - UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EMessagingFormat::SingleMessage, "invalid", 0, Nothing(), Nothing(), Nothing()), yexception); + UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EMessagingFormat::SingleMessage, "invalid", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None), yexception); } private: |