aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2022-09-09 12:55:53 +0300
committershmel1k <shmel1k@ydb.tech>2022-09-09 12:55:53 +0300
commit422aefd03b121a1966b80756d8a06a86d9787e04 (patch)
treee90bdbb2ce02363373b929fae08f72dd308ce1b3
parent75a5e826e2f818015c40fc7d76f3cdb4d5116151 (diff)
downloadydb-422aefd03b121a1966b80756d8a06a86d9787e04.tar.gz
PR from branch users/shmel1k/LOGBROKER-7217_improve_writing
[] fix tests [] improved writing: added transform option [] improve 'topic write' options
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp256
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.h32
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_util.h1
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write.cpp76
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write.h14
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp85
6 files changed, 276 insertions, 188 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
index 7585a101331..2e4e0f59d18 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
@@ -15,10 +15,10 @@
namespace NYdb::NConsoleClient {
namespace {
THashMap<NYdb::NTopic::ECodec, TString> CodecsDescriptions = {
- {NYdb::NTopic::ECodec::RAW, "Raw codec. No data compression"},
- {NYdb::NTopic::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"},
- {NYdb::NTopic::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"},
- {NYdb::NTopic::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"},
+ {NYdb::NTopic::ECodec::RAW, "Raw codec. No data compression."},
+ {NYdb::NTopic::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm."},
+ {NYdb::NTopic::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm."},
+ {NYdb::NTopic::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm."},
};
THashMap<TString, NYdb::NTopic::ECodec> ExistingCodecs = {
@@ -34,19 +34,19 @@ namespace NYdb::NConsoleClient {
};
THashMap<NTopic::EMeteringMode, TString> MeteringModesDescriptions = {
- std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::ReservedCapacity, "Throughput and storage limits on hourly basis, write operations"),
- std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::RequestUnits, "Read/write operations valued in request units, storage usage on hourly basis"),
+ std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::ReservedCapacity, "Throughput and storage limits on hourly basis, write operations."),
+ std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::RequestUnits, "Read/write operations valued in request units, storage usage on hourly basis."),
};
// TODO(shmel1k@): improve docs
THashMap<ETopicMetadataField, TString> TopicMetadataFieldsDescriptions = {
{ETopicMetadataField::Body, "Message data"},
- {ETopicMetadataField::WriteTime, "Message write time, a UNIX timestamp the message was written to server"},
- {ETopicMetadataField::CreateTime, "Message creation time, a UNIX timestamp provided by the publishing client"},
- {ETopicMetadataField::MessageGroupID, "Message group id. All messages with the same message group id are guaranteed to be read in FIFO order"},
- {ETopicMetadataField::Offset, "Message offset. Offset orders messages in each partition"},
- {ETopicMetadataField::SeqNo, "Message sequence number, used for message deduplication when publishing"},
- {ETopicMetadataField::Meta, "Message additional metadata"},
+ {ETopicMetadataField::WriteTime, "Message write time, a UNIX timestamp the message was written to server."},
+ {ETopicMetadataField::CreateTime, "Message creation time, a UNIX timestamp provided by the publishing client."},
+ {ETopicMetadataField::MessageGroupID, "Message group id. All messages with the same message group id are guaranteed to be read in FIFO order."},
+ {ETopicMetadataField::Offset, "Message offset. Offset orders messages in each partition."},
+ {ETopicMetadataField::SeqNo, "Message sequence number, used for message deduplication when publishing."},
+ {ETopicMetadataField::Meta, "Message additional metadata."},
};
const TVector<ETopicMetadataField> AllTopicMetadataFields = {
@@ -75,20 +75,45 @@ namespace NYdb::NConsoleClient {
};
constexpr TDuration DefaultIdleTimeout = TDuration::Seconds(1);
+
+ bool IsStreamingFormat(EMessagingFormat format) {
+ return format == EMessagingFormat::NewlineDelimited || format == EMessagingFormat::Concatenated;
+ }
} // namespace
- void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) {
- TStringStream description;
- description << "Comma-separated list of supported codecs. Available codecs: ";
- NColorizer::TColors colors = NColorizer::AutoColors(Cout);
- for (const auto& codec : supportedCodecs) {
- auto findResult = CodecsDescriptions.find(codec);
- Y_VERIFY(findResult != CodecsDescriptions.end(),
- "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str());
- description << "\n " << colors.BoldColor() << codec << colors.OldColor()
- << "\n " << findResult->second;
+ namespace {
+ TString PrepareAllowedCodecsDescription(const TString& descriptionPrefix, const TVector<NTopic::ECodec>& codecs) {
+ TStringStream description;
+ description << descriptionPrefix << ". Available codecs: ";
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+ for (const auto& codec : codecs) {
+ auto findResult = CodecsDescriptions.find(codec);
+ Y_VERIFY(findResult != CodecsDescriptions.end(),
+ "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str());
+ description << "\n " << colors.BoldColor() << codec << colors.OldColor()
+ << "\n " << findResult->second;
+ }
+
+ return description.Str();
}
- config.Opts->AddLongOption("supported-codecs", description.Str())
+
+ NTopic::ECodec ParseCodec(const TString& codecStr, const TVector<NTopic::ECodec>& allowedCodecs) {
+ auto exists = ExistingCodecs.find(to_lower(codecStr));
+ if (exists == ExistingCodecs.end()) {
+ throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
+ }
+
+ if (std::find(allowedCodecs.begin(), allowedCodecs.end(), exists->second) == allowedCodecs.end()) {
+ throw TMisuseException() << "Codec " << codecStr << " is not available for this command";
+ }
+
+ return exists->second;
+ }
+ }
+
+ void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) {
+ TString description = PrepareAllowedCodecsDescription("Comma-separated list of supported codecs", supportedCodecs);
+ config.Opts->AddLongOption("supported-codecs", description)
.RequiredArgument("STRING")
.StoreResult(&SupportedCodecsStr_);
AllowedCodecs_ = supportedCodecs;
@@ -102,16 +127,7 @@ namespace NYdb::NConsoleClient {
TVector<NTopic::ECodec> parsedCodecs;
TVector<TString> split = SplitString(SupportedCodecsStr_, ",");
for (const TString& codecStr : split) {
- auto exists = ExistingCodecs.find(to_lower(codecStr));
- if (exists == ExistingCodecs.end()) {
- throw TMisuseException() << "Supported codec " << codecStr << " is not available for this command";
- }
-
- if (std::find(AllowedCodecs_.begin(), AllowedCodecs_.end(), exists->second) == AllowedCodecs_.end()) {
- throw TMisuseException() << "Supported codec " << codecStr << " is not available for this command";
- }
-
- SupportedCodecs_.push_back(exists->second);
+ SupportedCodecs_.push_back(::NYdb::NConsoleClient::ParseCodec(codecStr, AllowedCodecs_));
}
}
@@ -203,7 +219,7 @@ namespace NYdb::NConsoleClient {
int TCommandTopicCreate::Run(TConfig& config) {
TDriver driver = CreateDriver(config);
- NYdb::NTopic::TTopicClient persQueueClient(driver);
+ NYdb::NTopic::TTopicClient topicClient(driver);
auto settings = NYdb::NTopic::TCreateTopicSettings();
settings.PartitioningSettings(PartitionsCount_, PartitionsCount_);
@@ -223,7 +239,7 @@ namespace NYdb::NConsoleClient {
settings.RetentionPeriod(TDuration::Hours(RetentionPeriodHours_));
settings.RetentionStorageMb(RetentionStorageMb_ / 1_MB);
- auto status = persQueueClient.CreateTopic(TopicName, settings).GetValueSync();
+ auto status = topicClient.CreateTopic(TopicName, settings).GetValueSync();
ThrowOnError(status);
return EXIT_SUCCESS;
}
@@ -342,7 +358,7 @@ namespace NYdb::NConsoleClient {
void TCommandTopicConsumerAdd::Config(TConfig& config) {
TYdbCommand::Config(config);
- config.Opts->AddLongOption("consumer-name", "New consumer for topic")
+ config.Opts->AddLongOption("consumer", "New consumer for topic")
.Required()
.StoreResult(&ConsumerName_);
config.Opts->AddLongOption("service-type", "Service type of reader")
@@ -389,7 +405,7 @@ namespace NYdb::NConsoleClient {
void TCommandTopicConsumerDrop::Config(TConfig& config) {
TYdbCommand::Config(config);
- config.Opts->AddLongOption("consumer-name", "Consumer which will be dropped")
+ config.Opts->AddLongOption("consumer", "Consumer which will be dropped")
.Required()
.StoreResult(&ConsumerName_);
config.Opts->SetFreeArgsNum(1);
@@ -413,6 +429,41 @@ namespace NYdb::NConsoleClient {
return EXIT_SUCCESS;
}
+ void TCommandWithTransformBody::AddTransform(TClientCommand::TConfig& config) {
+ TStringStream description;
+ description << "Conversion between a message data in the topic and the client filesystem/terminal. Available options: ";
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+ for (const auto& iter : TransformBodyDescriptions) {
+ description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second;
+ }
+
+ config.Opts->AddLongOption("transform", description.Str())
+ .Optional()
+ .DefaultValue("none")
+ .StoreResult(&TransformStr_);
+ }
+
+ void TCommandWithTransformBody::ParseTransform() {
+ if (TransformStr_.empty()) {
+ return;
+ }
+
+ TString val = TransformStr_;
+ if (val == (TStringBuilder() << ETransformBody::None)) {
+ return;
+ }
+ if (val == (TStringBuilder() << ETransformBody::Base64)) {
+ Transform_ = ETransformBody::Base64;
+ return;
+ }
+
+ throw TMisuseException() << "Transform " << TransformStr_ << " not found in available \"transform\" values";
+ }
+
+ ETransformBody TCommandWithTransformBody::GetTransform() const {
+ return Transform_;
+ }
+
TCommandTopicRead::TCommandTopicRead()
: TYdbCommand("read", {}, "Read from a topic to the client filesystem or terminal") {
}
@@ -430,20 +481,6 @@ namespace NYdb::NConsoleClient {
.StoreResult(&WithMetadataFields_);
}
- void TCommandTopicRead::AddAllowedTransformFormats(TConfig& config) {
- TStringStream description;
- description << "Conversion between a message data in the topic and the client filesystem/terminal. Available options: ";
- NColorizer::TColors colors = NColorizer::AutoColors(Cout);
- for (const auto& iter : TransformBodyDescriptions) {
- description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second;
- }
-
- config.Opts->AddLongOption("transform", description.Str())
- .Optional()
- .DefaultValue("none")
- .StoreResult(&TransformStr_);
- }
-
void TCommandTopicRead::Config(TConfig& config) {
TYdbCommand::Config(config);
config.Opts->SetFreeArgsNum(1);
@@ -460,25 +497,9 @@ namespace NYdb::NConsoleClient {
config.Opts->AddLongOption('c', "consumer", "Consumer name.")
.Required()
.StoreResult(&Consumer_);
-// config.Opts->AddLongOption("offset", "Offset to start read from")
-// .Optional()
-// .StoreResult(&Offset_);
-// config.Opts->AddLongOption("partition", "Partition to read from")
-// .Optional()
-// .StoreResult(&Partition_);
config.Opts->AddLongOption('f', "file", "File to write data to. In not specified, data is written to the standard output.")
.Optional()
.StoreResult(&File_);
-// NOTE(shmel1k@): temporary disabled options
-// config.Opts->AddLongOption("flush-duration", "Duration for message flushing")
-// .Optional()
-// .StoreResult(&FlushDuration_);
-// config.Opts->AddLongOption("flush-size", "Maximum flush size") // TODO(shmel1k@): improve
-// .Optional()
-// .StoreResult(&FlushSize_);
-// config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve
-// .Optional()
-// .StoreResult(&FlushMessagesCount_);
config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages. Topic is considered empty if no new messages arrive within this period.")
.Optional()
.DefaultValue(DefaultIdleTimeout)
@@ -487,13 +508,6 @@ namespace NYdb::NConsoleClient {
.Optional()
.DefaultValue(true)
.StoreResult(&Commit_);
-// NOTE(shmel1k@): temporary disabled option
-// config.Opts->AddLongOption("message-size-limit", "Maximum message size")
-// .Optional()
-// .StoreResult(&MessageSizeLimit_);
-// config.Opts->AddLongOption("discard-above-limits", "Do not print messages with size more than defined in 'message-size-limit' option")
-// .Optional()
-// .StoreResult(&DiscardAboveLimits_);
config.Opts->AddLongOption("limit", "Limit on message count to read, 0 - unlimited. "
"If avobe 0, processing stops when either topic is empty, or the specified limit reached. "
"Must be above 0 for pretty output format."
@@ -509,7 +523,7 @@ namespace NYdb::NConsoleClient {
.StoreResult(&Timestamp_);
AddAllowedMetadataFields(config);
- AddAllowedTransformFormats(config);
+ AddTransform(config);
}
void TCommandTopicRead::ParseMetadataFields() {
@@ -548,29 +562,12 @@ namespace NYdb::NConsoleClient {
MetadataFields_ = result;
}
- void TCommandTopicRead::ParseTransformFormat() {
- if (!TransformStr_.Defined()) {
- return;
- }
-
- TString val = *TransformStr_;
- if (val == (TStringBuilder() << ETransformBody::None)) {
- return;
- }
- if (val == (TStringBuilder() << ETransformBody::Base64)) {
- Transform_ = ETransformBody::Base64;
- return;
- }
-
- throw TMisuseException() << "Transform " << *TransformStr_ << " not found in available \"transform\" values";
- }
-
void TCommandTopicRead::Parse(TConfig& config) {
TYdbCommand::Parse(config);
ParseTopicName(config, 0);
ParseMessagingFormats();
ParseMetadataFields();
- ParseTransformFormat();
+ ParseTransform();
}
NTopic::TReadSessionSettings TCommandTopicRead::PrepareReadSessionSettings() {
@@ -590,9 +587,9 @@ namespace NYdb::NConsoleClient {
void TCommandTopicRead::ValidateConfig() {
// TODO(shmel1k@): add more formats.
- if (MessagingFormat != EMessagingFormat::SingleMessage && (Limit_.Defined() && (Limit_ < 0 || Limit_ > 500))) {
+ if (!IsStreamingFormat(MessagingFormat) && (Limit_.Defined() && (Limit_ <= 0 || Limit_ > 500))) {
throw TMisuseException() << "OutputFormat " << OutputFormat << " is not compatible with "
- << "limit equal '0' or more than '500': '" << *Limit_ << "' was given";
+ << "limit less and equal '0' or more than '500': '" << *Limit_ << "' was given";
}
}
@@ -600,10 +597,10 @@ namespace NYdb::NConsoleClient {
ValidateConfig();
auto driver = std::make_unique<TDriver>(CreateDriver(config));
- NTopic::TTopicClient persQueueClient(*driver);
- auto readSession = persQueueClient.CreateReadSession(PrepareReadSessionSettings());
+ NTopic::TTopicClient topicClient(*driver);
+ auto readSession = topicClient.CreateReadSession(PrepareReadSessionSettings());
- TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient);
+ TTopicInitializationChecker checker = TTopicInitializationChecker(topicClient);
checker.CheckTopicExistence(TopicName, Consumer_);
{
@@ -613,7 +610,7 @@ namespace NYdb::NConsoleClient {
Wait_,
MessagingFormat,
MetadataFields_,
- Transform_,
+ GetTransform(),
IdleTimeout_));
reader.Init();
@@ -637,6 +634,27 @@ namespace NYdb::NConsoleClient {
return EXIT_SUCCESS;
}
+ void TCommandWithCodec::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NTopic::ECodec>& allowedCodecs) {
+ TString description = PrepareAllowedCodecsDescription("Client-side compression algorithm. When read, data will be uncompressed transparently with a codec used on write", allowedCodecs);
+ config.Opts->AddLongOption("codec", description)
+ .Optional()
+ .DefaultValue((TStringBuilder() << NTopic::ECodec::RAW))
+ .StoreResult(&CodecStr_);
+ AllowedCodecs_ = allowedCodecs;
+ }
+
+ void TCommandWithCodec::ParseCodec() {
+ if (CodecStr_.empty()) {
+ return;
+ }
+
+ Codec_ = ::NYdb::NConsoleClient::ParseCodec(CodecStr_, AllowedCodecs_);
+ }
+
+ NTopic::ECodec TCommandWithCodec::GetCodec() const {
+ return Codec_;
+ }
+
TCommandTopicWrite::TCommandTopicWrite()
: TYdbCommand("write", {}, "Write to topic command") {
}
@@ -652,7 +670,7 @@ namespace NYdb::NConsoleClient {
// EOutputFormat::JsonRawStreamConcat,
// EOutputFormat::JsonRawArray,
});
- AddAllowedCodecs(config, {NTopic::ECodec::RAW});
+ AddAllowedCodecs(config, AllowedCodecs);
// TODO(shmel1k@): improve help.
config.Opts->AddLongOption('d', "delimiter", "Delimiter to split messages")
@@ -661,28 +679,19 @@ namespace NYdb::NConsoleClient {
config.Opts->AddLongOption('f', "file", "File to read data from")
.Optional()
.StoreResult(&File_);
- config.Opts->AddLongOption("message-size-limit", "Size limit for a single message")
- .Optional()
- .StoreResult(&MessageSizeLimitStr_);
- config.Opts->AddLongOption("batch-duration", "Duration for message batching")
- .Optional()
- .StoreResult(&BatchDuration_);
- config.Opts->AddLongOption("batch-size", "Maximum batch size") // TODO(shmel1k@): improve
- .Optional()
- .StoreResult(&BatchSize_);
- config.Opts->AddLongOption("batch-messages-count", "") // TODO(shmel1k@): improve
- .Optional()
- .StoreResult(&BatchMessagesCount_);
- config.Opts->AddLongOption("message-group-id", "Message Group ID") // TODO(shmel1k@): improve
+ config.Opts->AddLongOption("message-group-id", "Message group identifier. If not set, all messages from input will get the same identifier based on hex string\nrepresentation of 3 random bytes")
.Optional()
.StoreResult(&MessageGroupId_);
+
+ AddTransform(config);
}
void TCommandTopicWrite::Parse(TConfig& config) {
TYdbCommand::Parse(config);
ParseTopicName(config, 0);
ParseMessagingFormats();
- ParseCodecs();
+ ParseTransform();
+ ParseCodec();
if (Delimiter_.Defined() && MessagingFormat != EMessagingFormat::SingleMessage) {
throw TMisuseException() << "Both mutually exclusive options \"delimiter\"(\"--delimiter\", \"-d\" "
@@ -692,11 +701,8 @@ namespace NYdb::NConsoleClient {
NTopic::TWriteSessionSettings TCommandTopicWrite::PrepareWriteSessionSettings() {
NTopic::TWriteSessionSettings settings;
-// settings.Codec(GetCodecs()[0]);
- settings.Codec(NTopic::ECodec::GZIP);
+ settings.Codec(GetCodec());
settings.Path(TopicName);
- //settings.BatchFlushInterval(BatchDuration_);
- //settings.BatchFlushSizeBytes(BatchSize_);
if (!MessageGroupId_.Defined()) {
const TString rnd = ToString(TInstant::Now().NanoSeconds());
@@ -721,19 +727,15 @@ namespace NYdb::NConsoleClient {
SetInterruptHandlers();
auto driver = std::make_unique<TDriver>(CreateDriver(config));
- NTopic::TTopicClient persQueueClient(*driver);
- TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient);
+ NTopic::TTopicClient topicClient(*driver);
+ TTopicInitializationChecker checker = TTopicInitializationChecker(topicClient);
checker.CheckTopicExistence(TopicName);
{
auto writeSession = NTopic::TTopicClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings()));
- auto writer = TTopicWriter(writeSession, std::move(TTopicWriterParams(
- MessagingFormat,
- Delimiter_,
- MessageSizeLimit_,
- BatchDuration_,
- BatchSize_,
- BatchMessagesCount_)));
+ auto writer =
+ TTopicWriter(writeSession, std::move(TTopicWriterParams(MessagingFormat, Delimiter_, MessageSizeLimit_, BatchDuration_,
+ BatchSize_, BatchMessagesCount_, GetTransform())));
if (int status = writer.Init(); status) {
return status;
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
index 3cfde53b576..fb8a40ae264 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
@@ -108,15 +108,22 @@ namespace NYdb::NConsoleClient {
TString ConsumerName_;
};
- class TCommandTopicInternal: public TClientCommandTree {
- public:
- TCommandTopicInternal();
+ class TCommandWithTransformBody {
+ protected:
+ void AddTransform(TClientCommand::TConfig& config);
+ void ParseTransform();
+ ETransformBody GetTransform() const;
+
+ private:
+ TString TransformStr_;
+ ETransformBody Transform_ = ETransformBody::None;
};
class TCommandTopicRead: public TYdbCommand,
public TCommandWithFormat,
public TInterruptibleCommand,
- public TCommandWithTopicName {
+ public TCommandWithTopicName,
+ public TCommandWithTransformBody {
public:
TCommandTopicRead();
void Config(TConfig& config) override;
@@ -156,11 +163,24 @@ namespace NYdb::NConsoleClient {
NTopic::TReadSessionSettings PrepareReadSessionSettings();
};
+ class TCommandWithCodec {
+ protected:
+ void AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NTopic::ECodec>& allowedCodecs);
+ void ParseCodec();
+ NTopic::ECodec GetCodec() const;
+
+ private:
+ TVector<NTopic::ECodec> AllowedCodecs_;
+ TString CodecStr_;
+ NTopic::ECodec Codec_ = NTopic::ECodec::RAW;
+ };
+
class TCommandTopicWrite: public TYdbCommand,
public TCommandWithFormat,
public TInterruptibleCommand,
public TCommandWithTopicName,
- public TCommandWithSupportedCodecs {
+ public TCommandWithCodec,
+ public TCommandWithTransformBody {
public:
TCommandTopicWrite();
void Config(TConfig& config) override;
@@ -180,7 +200,7 @@ namespace NYdb::NConsoleClient {
ui64 MessageSizeLimit_ = 0;
void ParseMessageSizeLimit();
- void CheckOptions(NTopic::TTopicClient& persQueueClient);
+ void CheckOptions(NTopic::TTopicClient& topicClient);
private:
NTopic::TWriteSessionSettings PrepareWriteSessionSettings();
diff --git a/ydb/public/lib/ydb_cli/topic/topic_util.h b/ydb/public/lib/ydb_cli/topic/topic_util.h
index 54fd9ee7dec..5fef3334c5a 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_util.h
+++ b/ydb/public/lib/ydb_cli/topic/topic_util.h
@@ -1,4 +1,5 @@
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/lib/ydb_cli/common/format.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
namespace NYdb::NConsoleClient {
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.cpp b/ydb/public/lib/ydb_cli/topic/topic_write.cpp
index ca74f4acf05..045b211f4bc 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_write.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_write.cpp
@@ -4,6 +4,7 @@
#include <util/generic/overloaded.h>
#include <util/stream/tokenizer.h>
#include <util/string/hex.h>
+#include <library/cpp/string_utils/base64/base64.h>
namespace NYdb::NConsoleClient {
namespace {
@@ -13,13 +14,14 @@ namespace NYdb::NConsoleClient {
TTopicWriterParams::TTopicWriterParams() {
}
- TTopicWriterParams::TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter,
- ui64 messageSizeLimit, TMaybe<TDuration> batchDuration,
- TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount)
+ TTopicWriterParams::TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter, ui64 messageSizeLimit,
+ TMaybe<TDuration> batchDuration, TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount,
+ ETransformBody transform)
: MessagingFormat_(inputFormat)
, BatchDuration_(batchDuration)
, BatchSize_(batchSize)
, BatchMessagesCount_(batchMessagesCount)
+ , Transform_(transform)
, MessageSizeLimit_(messageSizeLimit) {
if (inputFormat == EMessagingFormat::NewlineDelimited || inputFormat == EMessagingFormat::Concatenated) {
Delimiter_ = TMaybe<char>('\n');
@@ -88,34 +90,41 @@ namespace NYdb::NConsoleClient {
return EXIT_SUCCESS;
}
- int TTopicWriter::HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent& event) {
+ int TTopicWriter::HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent* event) {
Y_UNUSED(event);
return EXIT_SUCCESS;
}
- int TTopicWriter::HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) {
- ContinuationToken_ = std::move(event.ContinuationToken);
+ int TTopicWriter::HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent* event) {
+ ContinuationToken_ = std::move(event->ContinuationToken);
return EXIT_SUCCESS;
}
- int TTopicWriter::HandleSessionClosedEvent(const NTopic::TSessionClosedEvent& event) {
- ThrowOnError(event);
+ int TTopicWriter::HandleSessionClosedEvent(const NTopic::TSessionClosedEvent* event) {
+ ThrowOnError(*event);
return EXIT_FAILURE;
}
int TTopicWriter::HandleEvent(NTopic::TWriteSessionEvent::TEvent& event) {
- return std::visit(TOverloaded{
- [&](const NTopic::TWriteSessionEvent::TAcksEvent& event) {
- return HandleAcksEvent(event);
- },
- [&](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) {
- return HandleReadyToAcceptEvent(event);
- },
- [&](const NTopic::TSessionClosedEvent& event) {
- return HandleSessionClosedEvent(event);
- },
- },
- event);
+ if (auto* acksEvent = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
+ return HandleAcksEvent(acksEvent);
+ } else if (auto* readyToAcceptEvent = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
+ return HandleReadyToAcceptEvent(readyToAcceptEvent);
+ } else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
+ return HandleSessionClosedEvent(sessionClosedEvent);
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ namespace {
+ TString TransformBody(const TString& body, ETransformBody transform) {
+ if (transform == ETransformBody::None) {
+ return body;
+ }
+
+ return Base64Decode(body);
+ }
}
TTopicWriter::TSendMessageData TTopicWriter::EnterMessage(IInputStream& input) {
@@ -123,9 +132,9 @@ namespace NYdb::NConsoleClient {
// TODO(shmel1k@): add JSONStreamReader & etc interfaces.
// TODO(shmel1k@): add stream parsing here & improve performance.
if (!WriterParams_.Delimiter().Defined()) {
- // TODO(shmel1k@): interruption?
+ TString body = input.ReadAll();
return TSendMessageData{
- .Data = input.ReadAll(),
+ .Data = TransformBody(body, WriterParams_.Transform()),
.NeedSend = true,
.ContinueSending = false,
};
@@ -142,7 +151,7 @@ namespace NYdb::NConsoleClient {
};
}
return TSendMessageData{
- .Data = buffer,
+ .Data = TransformBody(buffer, WriterParams_.Transform()),
.NeedSend = true,
.ContinueSending = true,
};
@@ -175,8 +184,7 @@ namespace NYdb::NConsoleClient {
}
bool TTopicWriter::Close(TDuration closeTimeout) {
- Y_UNUSED(closeTimeout);
- if (WriteSession_->Close(TDuration::Hours(12))) {
+ if (WriteSession_->Close(closeTimeout)) {
return true;
}
TVector<NTopic::TWriteSessionEvent::TEvent> events = WriteSession_->GetEvents(true);
@@ -184,23 +192,7 @@ namespace NYdb::NConsoleClient {
return false;
}
for (auto& evt : events) {
- bool hasFailure = false;
- std::visit(TOverloaded{
- [&](const NTopic::TWriteSessionEvent::TAcksEvent& event) {
- Y_UNUSED(event);
- },
- [&](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) {
- Y_UNUSED(event);
- },
- [&](const NTopic::TSessionClosedEvent& event) {
- int result = HandleSessionClosedEvent(event);
- if (result == EXIT_FAILURE) {
- hasFailure = true;
- }
- },
- },
- evt);
- if (hasFailure) {
+ if (HandleEvent(evt)) {
return false;
}
}
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.h b/ydb/public/lib/ydb_cli/topic/topic_write.h
index 78062c1abc1..822acc98c0a 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_write.h
+++ b/ydb/public/lib/ydb_cli/topic/topic_write.h
@@ -4,6 +4,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/lib/ydb_cli/common/format.h>
#include <ydb/public/lib/ydb_cli/common/interruptible.h>
+#include <ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h>
namespace NYdb::NConsoleClient {
#define GETTER(TYPE, NAME) \
@@ -16,7 +17,8 @@ namespace NYdb::NConsoleClient {
TTopicWriterParams();
TTopicWriterParams(EMessagingFormat inputFormat, TMaybe<TString> delimiter,
ui64 messageSizeLimit, TMaybe<TDuration> batchDuration,
- TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount);
+ TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount,
+ ETransformBody transform);
TTopicWriterParams(const TTopicWriterParams&) = default;
TTopicWriterParams(TTopicWriterParams&&) = default;
@@ -26,6 +28,8 @@ namespace NYdb::NConsoleClient {
GETTER(TMaybe<ui64>, BatchMessagesCount);
GETTER(ui64, MessageSizeLimit);
GETTER(EMessagingFormat, MessagingFormat);
+ GETTER(NTopic::ECodec, Codec);
+ GETTER(ETransformBody, Transform);
private:
TMaybe<TString> File_;
@@ -36,6 +40,8 @@ namespace NYdb::NConsoleClient {
TMaybe<TDuration> BatchDuration_;
TMaybe<ui64> BatchSize_;
TMaybe<ui64> BatchMessagesCount_;
+ NTopic::ECodec Codec_ = NTopic::ECodec::RAW;
+ ETransformBody Transform_ = ETransformBody::None;
ui64 MessageSizeLimit_ = 0;
};
@@ -62,9 +68,9 @@ namespace NYdb::NConsoleClient {
};
int HandleEvent(NTopic::TWriteSessionEvent::TEvent&);
- int HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent&);
- int HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent&);
- int HandleSessionClosedEvent(const NTopic::TSessionClosedEvent&);
+ int HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent*);
+ int HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent*);
+ int HandleSessionClosedEvent(const NTopic::TSessionClosedEvent*);
TTopicWriter::TSendMessageData EnterMessage(IInputStream&); // TODO(shmel1k@): make static or like a helper function
std::shared_ptr<NTopic::IWriteSession> WriteSession_;
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
index 57dbc5e32f0..aaefbaaf597 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
@@ -15,6 +15,10 @@ namespace NYdb::NConsoleClient {
UNIT_TEST(TestEnterMessage_SomeBinaryData);
UNIT_TEST(TestEnterMessage_ZeroSymbol_Delimited);
UNIT_TEST(TestEnterMessage_Custom_Delimiter_Delimited);
+ UNIT_TEST(TestEnterMessage_No_Base64_Transform);
+ UNIT_TEST(TestEnterMessage_With_Base64_Transform_Invalid_Encode);
+ UNIT_TEST(TestEnterMessage_With_Base64_Transform);
+ UNIT_TEST(TestEnterMessage_With_Base64_Transform_NewlineDelimited);
UNIT_TEST(TestTopicWriterParams_Format_NewlineDelimited);
UNIT_TEST(TestTopicWriterParams_Format_Concatenated);
@@ -36,7 +40,7 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_OnlyDelimiters() {
TStringStream str = TString("\n") * 6;
- TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -97,7 +101,7 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_1KiB_Newline_Delimiter() {
TStringStream str = TString("a") * 512 + "\n" + TString("b") * 512;
- TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\n", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -121,7 +125,8 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_1KiB_Newline_Delimited_With_Two_Delimiters_In_A_Row() {
TStringStream str = TString("a") * 512 + "\n\n" + TString("b") * 512;
- TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing(),
+ ETransformBody::None));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -165,7 +170,7 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_ZeroSymbol_Delimited() {
auto& s = "\0\0\0\0\n\nprivet";
TStringStream str = TString(std::begin(s), std::end(s));
- TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\0", 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "\0", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -204,7 +209,7 @@ namespace NYdb::NConsoleClient {
void TestEnterMessage_Custom_Delimiter_Delimited() {
TStringStream str = TString("privet_vasya_kak_dela?");
- TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "_", 0, Nothing(), Nothing(), Nothing()));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, "_", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None));
TMessages got = EnterMessageHelper(wr, str);
AssertMessagesEqual({
{
@@ -236,26 +241,88 @@ namespace NYdb::NConsoleClient {
got);
}
+ void TestEnterMessage_No_Base64_Transform() {
+ TStringStream str = TString("privet_vasya_kak_dela?");
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::None));
+ TMessages got = EnterMessageHelper(wr, str);
+
+ AssertMessagesEqual({
+ {
+ .Data = "privet_vasya_kak_dela?",
+ .NeedSend = true,
+ .ContinueSending = false,
+ },
+ }, got);
+ }
+
+ void TestEnterMessage_With_Base64_Transform_Invalid_Encode() {
+ TStringStream str = TString("cHJpdmV0X3Zhc3lhX2tha19kZWxPw==");
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::Base64));
+ UNIT_ASSERT_EXCEPTION(EnterMessageHelper(wr, str), yexception);
+ }
+
+ void TestEnterMessage_With_Base64_Transform() {
+ TStringStream str = TString("cHJpdmV0X3Zhc3lhX2tha19kZWxhPw==");
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::Base64));
+ TMessages got = EnterMessageHelper(wr, str);
+
+ AssertMessagesEqual(
+ {
+ {
+ .Data = "privet_vasya_kak_dela?",
+ .NeedSend = true,
+ .ContinueSending = false,
+ },
+ },
+ got);
+ }
+
+ void TestEnterMessage_With_Base64_Transform_NewlineDelimited() {
+ TStringStream str = TString("aG93IGRvIHlvdSBkbw==\ncHJpdmV0X3Zhc3lhX2tha19kZWxhPw==");
+ TTopicWriter wr(nullptr, TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::Base64));
+ TMessages got = EnterMessageHelper(wr, str);
+
+ AssertMessagesEqual(
+ {
+ {
+ .Data = "how do you do",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "privet_vasya_kak_dela?",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = false,
+ .ContinueSending = false,
+ },
+ },
+ got);
+ }
+
void TestTopicWriterParams_Format_NewlineDelimited() {
- TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing());
+ TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::None);
UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::NewlineDelimited);
UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n');
}
void TestTopicWriterParams_Format_Concatenated() {
- TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing());
+ TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::None);
UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::Concatenated);
UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n');
}
void TestTopicWriterParams_No_Delimiter() {
- TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing());
+ TTopicWriterParams p = TTopicWriterParams(EMessagingFormat::SingleMessage, Nothing(), 0, Nothing(), Nothing(), Nothing(), ETransformBody::None);
UNIT_ASSERT_VALUES_EQUAL(p.MessagingFormat(), EMessagingFormat::SingleMessage);
UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), Nothing());
}
void TestTopicWriterParams_InvalidDelimiter() {
- UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EMessagingFormat::SingleMessage, "invalid", 0, Nothing(), Nothing(), Nothing()), yexception);
+ UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EMessagingFormat::SingleMessage, "invalid", 0, Nothing(), Nothing(), Nothing(), ETransformBody::None), yexception);
}
private: