diff options
author | shmel1k <shmel1k@ydb.tech> | 2022-09-06 16:22:58 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2022-09-06 16:22:58 +0300 |
commit | e2c2814a65d6078ad550c13277d44f59d9c49ec1 (patch) | |
tree | a57ac29f9e9146a7730d5f864577480ddd27536b | |
parent | fd3caafaa86763347c93e949299cd9440201d79b (diff) | |
download | ydb-e2c2814a65d6078ad550c13277d44f59d9c49ec1.tar.gz |
[] migrate to new topic service
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp | 56 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_topic.h | 66 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_write.cpp | 30 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/topic/topic_write.h | 16 |
4 files changed, 39 insertions, 129 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index e8cae0e4123..25be32e94ec 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -164,6 +164,7 @@ namespace NYdb::NConsoleClient { AddCommand(std::make_unique<TCommandTopicDrop>()); AddCommand(std::make_unique<TCommandTopicConsumer>()); AddCommand(std::make_unique<TCommandTopicRead>()); + AddCommand(std::make_unique<TCommandTopicWrite>()); } TCommandTopicCreate::TCommandTopicCreate() @@ -404,11 +405,6 @@ namespace NYdb::NConsoleClient { return EXIT_SUCCESS; } - TCommandTopicInternal::TCommandTopicInternal() - : TClientCommandTree("topic", {}, "Experimental topic operations") { - AddCommand(std::make_unique<TCommandTopicWrite>()); - } - TCommandTopicRead::TCommandTopicRead() : TYdbCommand("read", {}, "Read from a topic to the client filesystem or terminal") { } @@ -648,7 +644,7 @@ namespace NYdb::NConsoleClient { // EOutputFormat::JsonRawStreamConcat, // EOutputFormat::JsonRawArray, }); - AddAllowedCodecs(config); + AddAllowedCodecs(config, {NTopic::ECodec::RAW}); // TODO(shmel1k@): improve help. config.Opts->AddLongOption('d', "delimiter", "Delimiter to split messages") @@ -678,7 +674,7 @@ namespace NYdb::NConsoleClient { TYdbCommand::Parse(config); ParseTopicName(config, 0); ParseMessagingFormats(); - ParseCodec(); + ParseCodecs(); if (Delimiter_.Defined() && MessagingFormat != EMessagingFormat::SingleMessage) { throw TMisuseException() << "Both mutually exclusive options \"delimiter\"(\"--delimiter\", \"-d\" " @@ -686,13 +682,13 @@ namespace NYdb::NConsoleClient { } } - NPersQueue::TWriteSessionSettings TCommandTopicWrite::PrepareWriteSessionSettings() { - NPersQueue::TWriteSessionSettings settings; - settings.Codec(GetCodec()); // TODO(shmel1k@): codecs? + NTopic::TWriteSessionSettings TCommandTopicWrite::PrepareWriteSessionSettings() { + NTopic::TWriteSessionSettings settings; +// settings.Codec(GetCodecs()[0]); + settings.Codec(NTopic::ECodec::GZIP); settings.Path(TopicName); //settings.BatchFlushInterval(BatchDuration_); //settings.BatchFlushSizeBytes(BatchSize_); - settings.ClusterDiscoveryMode(NPersQueue::EClusterDiscoveryMode::Auto); if (!MessageGroupId_.Defined()) { const TString rnd = ToString(TInstant::Now().NanoSeconds()); @@ -708,49 +704,21 @@ namespace NYdb::NConsoleClient { } settings.MessageGroupId(*MessageGroupId_); + settings.ProducerId(*MessageGroupId_); return settings; } - void TCommandTopicWrite::CheckOptions(NPersQueue::TPersQueueClient& persQueueClient) { - NPersQueue::TAsyncDescribeTopicResult descriptionFuture = persQueueClient.DescribeTopic(TopicName); - descriptionFuture.Wait(TDuration::Seconds(1)); - NPersQueue::TDescribeTopicResult description = descriptionFuture.GetValueSync(); - ThrowOnError(description); - - NPersQueue::ECodec codec = GetCodec(); - bool exists = false; - for (const auto c : description.TopicSettings().SupportedCodecs()) { - if (c == codec) { - exists = true; - break; - } - } - if (exists) { - return; - } - TStringStream errorMessage; - errorMessage << "Codec \"" << (TStringBuilder() << codec) << "\" is not available for topic. Available codecs:\n"; - for (const auto c : description.TopicSettings().SupportedCodecs()) { - errorMessage << " " << (TStringBuilder() << c) << "\n"; - } - throw TMisuseException() << errorMessage.Str(); - } - int TCommandTopicWrite::Run(TConfig& config) { SetInterruptHandlers(); auto driver = std::make_unique<TDriver>(CreateDriver(config)); - NPersQueue::TPersQueueClient persQueueClient(*driver); - - CheckOptions(persQueueClient); - - // TODO(shmel1k@): return back after IWriteSession in TopicService SDK - // TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient); - // checker.CheckTopicExistence(TopicName); + NTopic::TTopicClient persQueueClient(*driver); + TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient); + checker.CheckTopicExistence(TopicName); { - auto writeSession = NPersQueue::TPersQueueClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings())); + auto writeSession = NTopic::TTopicClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings())); auto writer = TTopicWriter(writeSession, std::move(TTopicWriterParams( MessagingFormat, Delimiter_, diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index 87475f5901e..baea8b9dd1d 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -154,71 +154,11 @@ namespace NYdb::NConsoleClient { NTopic::TReadSessionSettings PrepareReadSessionSettings(); }; - namespace { - const THashMap<NYdb::NPersQueue::ECodec, TString> CodecsDescriptionsMigration = { - {NYdb::NPersQueue::ECodec::RAW, "Raw codec. No data compression(default)"}, - {NYdb::NPersQueue::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"}, - {NYdb::NPersQueue::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"}, - {NYdb::NPersQueue::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"}, - }; - } - - const TVector<NYdb::NPersQueue::ECodec> AllowedCodecsMigration = { - NPersQueue::ECodec::RAW, - NPersQueue::ECodec::GZIP, - NPersQueue::ECodec::ZSTD, - }; - - class TCommandWithCodecMigration { - // TODO(shmel1k@): remove after TopicService C++ SDK supports IWriteSession - protected: - void AddAllowedCodecs(TClientCommand::TConfig& config) { - TStringStream description; - description << "Codec used for writing. Available codecs: "; - NColorizer::TColors colors = NColorizer::AutoColors(Cout); - for (const auto& codec : AllowedCodecsMigration) { - auto findResult = CodecsDescriptionsMigration.find(codec); - Y_VERIFY(findResult != CodecsDescriptionsMigration.end(), - "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str()); - description << "\n " << colors.BoldColor() << codec << colors.OldColor() - << "\n " << findResult->second; - } - config.Opts->AddLongOption("codec", description.Str()) - .RequiredArgument("STRING") - .StoreResult(&CodecStr_); - } - - void ParseCodec() { - TString codec = to_lower(CodecStr_); - if (codec == "raw") { - Codec_ = NPersQueue::ECodec::RAW; - } - if (codec == "gzip") { - Codec_ = NPersQueue::ECodec::GZIP; - } - if (codec = "zstd") { - Codec_ = NPersQueue::ECodec::ZSTD; - } - } - - NPersQueue::ECodec GetCodec() const { - if (CodecStr_ == "") { - return DefaultCodec_; - } - return Codec_; - } - - private: - NPersQueue::ECodec DefaultCodec_ = NPersQueue::ECodec::RAW; - TString CodecStr_; - NPersQueue::ECodec Codec_; - }; - class TCommandTopicWrite: public TYdbCommand, public TCommandWithFormat, public TInterruptibleCommand, public TCommandWithTopicName, - public TCommandWithCodecMigration { + public TCommandWithSupportedCodecs { public: TCommandTopicWrite(); void Config(TConfig& config) override; @@ -238,9 +178,9 @@ namespace NYdb::NConsoleClient { ui64 MessageSizeLimit_ = 0; void ParseMessageSizeLimit(); - void CheckOptions(NPersQueue::TPersQueueClient& persQueueClient); + void CheckOptions(NTopic::TTopicClient& persQueueClient); private: - NPersQueue::TWriteSessionSettings PrepareWriteSessionSettings(); + NTopic::TWriteSessionSettings PrepareWriteSessionSettings(); }; } // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.cpp b/ydb/public/lib/ydb_cli/topic/topic_write.cpp index f4bbdf5cb60..ca74f4acf05 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_write.cpp +++ b/ydb/public/lib/ydb_cli/topic/topic_write.cpp @@ -56,7 +56,7 @@ namespace NYdb::NConsoleClient { TTopicWriter::TTopicWriter() { } - TTopicWriter::TTopicWriter(std::shared_ptr<NYdb::NPersQueue::IWriteSession> writeSession, + TTopicWriter::TTopicWriter(std::shared_ptr<NYdb::NTopic::IWriteSession> writeSession, TTopicWriterParams params) : WriteSession_(writeSession) , WriterParams_(params) { @@ -78,6 +78,9 @@ namespace NYdb::NConsoleClient { if (!initSeqNo.HasValue()) { // TODO(shmel1k@): logging + if (initSeqNo.HasException()) { + initSeqNo.TryRethrow(); + } return EXIT_FAILURE; } @@ -85,30 +88,30 @@ namespace NYdb::NConsoleClient { return EXIT_SUCCESS; } - int TTopicWriter::HandleAcksEvent(const NPersQueue::TWriteSessionEvent::TAcksEvent& event) { + int TTopicWriter::HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent& event) { Y_UNUSED(event); return EXIT_SUCCESS; } - int TTopicWriter::HandleReadyToAcceptEvent(NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) { + int TTopicWriter::HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) { ContinuationToken_ = std::move(event.ContinuationToken); return EXIT_SUCCESS; } - int TTopicWriter::HandleSessionClosedEvent(const NPersQueue::TSessionClosedEvent& event) { + int TTopicWriter::HandleSessionClosedEvent(const NTopic::TSessionClosedEvent& event) { ThrowOnError(event); return EXIT_FAILURE; } - int TTopicWriter::HandleEvent(NPersQueue::TWriteSessionEvent::TEvent& event) { + int TTopicWriter::HandleEvent(NTopic::TWriteSessionEvent::TEvent& event) { return std::visit(TOverloaded{ - [&](const NPersQueue::TWriteSessionEvent::TAcksEvent& event) { + [&](const NTopic::TWriteSessionEvent::TAcksEvent& event) { return HandleAcksEvent(event); }, - [&](NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) { + [&](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) { return HandleReadyToAcceptEvent(event); }, - [&](const NPersQueue::TSessionClosedEvent& event) { + [&](const NTopic::TSessionClosedEvent& event) { return HandleSessionClosedEvent(event); }, }, @@ -150,7 +153,7 @@ namespace NYdb::NConsoleClient { bool continueSending = true; while (continueSending) { while (!ContinuationToken_.Defined()) { - TMaybe<NPersQueue::TWriteSessionEvent::TEvent> event = WriteSession_->GetEvent(true); + TMaybe<NTopic::TWriteSessionEvent::TEvent> event = WriteSession_->GetEvent(true); if (event.Empty()) { continue; } @@ -176,25 +179,24 @@ namespace NYdb::NConsoleClient { if (WriteSession_->Close(TDuration::Hours(12))) { return true; } - TVector<NPersQueue::TWriteSessionEvent::TEvent> events = WriteSession_->GetEvents(true); + TVector<NTopic::TWriteSessionEvent::TEvent> events = WriteSession_->GetEvents(true); if (events.empty()) { return false; } for (auto& evt : events) { bool hasFailure = false; std::visit(TOverloaded{ - [&](const NPersQueue::TWriteSessionEvent::TAcksEvent& event) { + [&](const NTopic::TWriteSessionEvent::TAcksEvent& event) { Y_UNUSED(event); }, - [&](NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) { + [&](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) { Y_UNUSED(event); }, - [&](const NPersQueue::TSessionClosedEvent& event) { + [&](const NTopic::TSessionClosedEvent& event) { int result = HandleSessionClosedEvent(event); if (result == EXIT_FAILURE) { hasFailure = true; } - Y_UNUSED(result); }, }, evt); diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.h b/ydb/public/lib/ydb_cli/topic/topic_write.h index 131c0a7ea26..78062c1abc1 100644 --- a/ydb/public/lib/ydb_cli/topic/topic_write.h +++ b/ydb/public/lib/ydb_cli/topic/topic_write.h @@ -1,7 +1,7 @@ #pragma once #include "ydb/public/lib/ydb_cli/commands/ydb_command.h" -#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/public/lib/ydb_cli/common/format.h> #include <ydb/public/lib/ydb_cli/common/interruptible.h> @@ -48,7 +48,7 @@ namespace NYdb::NConsoleClient { TTopicWriter(); TTopicWriter(const TTopicWriter&) = default; TTopicWriter(TTopicWriter&&) = default; - TTopicWriter(std::shared_ptr<NPersQueue::IWriteSession>, TTopicWriterParams); + TTopicWriter(std::shared_ptr<NTopic::IWriteSession>, TTopicWriterParams); bool Close(TDuration closeTimeout = TDuration::Max()); int Init(); @@ -61,16 +61,16 @@ namespace NYdb::NConsoleClient { bool ContinueSending = false; }; - int HandleEvent(NPersQueue::TWriteSessionEvent::TEvent&); - int HandleAcksEvent(const NPersQueue::TWriteSessionEvent::TAcksEvent&); - int HandleReadyToAcceptEvent(NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent&); - int HandleSessionClosedEvent(const NPersQueue::TSessionClosedEvent&); + int HandleEvent(NTopic::TWriteSessionEvent::TEvent&); + int HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent&); + int HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent&); + int HandleSessionClosedEvent(const NTopic::TSessionClosedEvent&); TTopicWriter::TSendMessageData EnterMessage(IInputStream&); // TODO(shmel1k@): make static or like a helper function - std::shared_ptr<NPersQueue::IWriteSession> WriteSession_; + std::shared_ptr<NTopic::IWriteSession> WriteSession_; const TTopicWriterParams WriterParams_; - TMaybe<NPersQueue::TContinuationToken> ContinuationToken_ = Nothing(); + TMaybe<NTopic::TContinuationToken> ContinuationToken_ = Nothing(); ui64 CurrentSeqNo_ = 0; |