aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2022-09-06 16:22:58 +0300
committershmel1k <shmel1k@ydb.tech>2022-09-06 16:22:58 +0300
commite2c2814a65d6078ad550c13277d44f59d9c49ec1 (patch)
treea57ac29f9e9146a7730d5f864577480ddd27536b
parentfd3caafaa86763347c93e949299cd9440201d79b (diff)
downloadydb-e2c2814a65d6078ad550c13277d44f59d9c49ec1.tar.gz
[] migrate to new topic service
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp56
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.h66
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write.cpp30
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write.h16
4 files changed, 39 insertions, 129 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
index e8cae0e4123..25be32e94ec 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
@@ -164,6 +164,7 @@ namespace NYdb::NConsoleClient {
AddCommand(std::make_unique<TCommandTopicDrop>());
AddCommand(std::make_unique<TCommandTopicConsumer>());
AddCommand(std::make_unique<TCommandTopicRead>());
+ AddCommand(std::make_unique<TCommandTopicWrite>());
}
TCommandTopicCreate::TCommandTopicCreate()
@@ -404,11 +405,6 @@ namespace NYdb::NConsoleClient {
return EXIT_SUCCESS;
}
- TCommandTopicInternal::TCommandTopicInternal()
- : TClientCommandTree("topic", {}, "Experimental topic operations") {
- AddCommand(std::make_unique<TCommandTopicWrite>());
- }
-
TCommandTopicRead::TCommandTopicRead()
: TYdbCommand("read", {}, "Read from a topic to the client filesystem or terminal") {
}
@@ -648,7 +644,7 @@ namespace NYdb::NConsoleClient {
// EOutputFormat::JsonRawStreamConcat,
// EOutputFormat::JsonRawArray,
});
- AddAllowedCodecs(config);
+ AddAllowedCodecs(config, {NTopic::ECodec::RAW});
// TODO(shmel1k@): improve help.
config.Opts->AddLongOption('d', "delimiter", "Delimiter to split messages")
@@ -678,7 +674,7 @@ namespace NYdb::NConsoleClient {
TYdbCommand::Parse(config);
ParseTopicName(config, 0);
ParseMessagingFormats();
- ParseCodec();
+ ParseCodecs();
if (Delimiter_.Defined() && MessagingFormat != EMessagingFormat::SingleMessage) {
throw TMisuseException() << "Both mutually exclusive options \"delimiter\"(\"--delimiter\", \"-d\" "
@@ -686,13 +682,13 @@ namespace NYdb::NConsoleClient {
}
}
- NPersQueue::TWriteSessionSettings TCommandTopicWrite::PrepareWriteSessionSettings() {
- NPersQueue::TWriteSessionSettings settings;
- settings.Codec(GetCodec()); // TODO(shmel1k@): codecs?
+ NTopic::TWriteSessionSettings TCommandTopicWrite::PrepareWriteSessionSettings() {
+ NTopic::TWriteSessionSettings settings;
+// settings.Codec(GetCodecs()[0]);
+ settings.Codec(NTopic::ECodec::GZIP);
settings.Path(TopicName);
//settings.BatchFlushInterval(BatchDuration_);
//settings.BatchFlushSizeBytes(BatchSize_);
- settings.ClusterDiscoveryMode(NPersQueue::EClusterDiscoveryMode::Auto);
if (!MessageGroupId_.Defined()) {
const TString rnd = ToString(TInstant::Now().NanoSeconds());
@@ -708,49 +704,21 @@ namespace NYdb::NConsoleClient {
}
settings.MessageGroupId(*MessageGroupId_);
+ settings.ProducerId(*MessageGroupId_);
return settings;
}
- void TCommandTopicWrite::CheckOptions(NPersQueue::TPersQueueClient& persQueueClient) {
- NPersQueue::TAsyncDescribeTopicResult descriptionFuture = persQueueClient.DescribeTopic(TopicName);
- descriptionFuture.Wait(TDuration::Seconds(1));
- NPersQueue::TDescribeTopicResult description = descriptionFuture.GetValueSync();
- ThrowOnError(description);
-
- NPersQueue::ECodec codec = GetCodec();
- bool exists = false;
- for (const auto c : description.TopicSettings().SupportedCodecs()) {
- if (c == codec) {
- exists = true;
- break;
- }
- }
- if (exists) {
- return;
- }
- TStringStream errorMessage;
- errorMessage << "Codec \"" << (TStringBuilder() << codec) << "\" is not available for topic. Available codecs:\n";
- for (const auto c : description.TopicSettings().SupportedCodecs()) {
- errorMessage << " " << (TStringBuilder() << c) << "\n";
- }
- throw TMisuseException() << errorMessage.Str();
- }
-
int TCommandTopicWrite::Run(TConfig& config) {
SetInterruptHandlers();
auto driver = std::make_unique<TDriver>(CreateDriver(config));
- NPersQueue::TPersQueueClient persQueueClient(*driver);
-
- CheckOptions(persQueueClient);
-
- // TODO(shmel1k@): return back after IWriteSession in TopicService SDK
- // TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient);
- // checker.CheckTopicExistence(TopicName);
+ NTopic::TTopicClient persQueueClient(*driver);
+ TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient);
+ checker.CheckTopicExistence(TopicName);
{
- auto writeSession = NPersQueue::TPersQueueClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings()));
+ auto writeSession = NTopic::TTopicClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings()));
auto writer = TTopicWriter(writeSession, std::move(TTopicWriterParams(
MessagingFormat,
Delimiter_,
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
index 87475f5901e..baea8b9dd1d 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
@@ -154,71 +154,11 @@ namespace NYdb::NConsoleClient {
NTopic::TReadSessionSettings PrepareReadSessionSettings();
};
- namespace {
- const THashMap<NYdb::NPersQueue::ECodec, TString> CodecsDescriptionsMigration = {
- {NYdb::NPersQueue::ECodec::RAW, "Raw codec. No data compression(default)"},
- {NYdb::NPersQueue::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"},
- {NYdb::NPersQueue::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"},
- {NYdb::NPersQueue::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"},
- };
- }
-
- const TVector<NYdb::NPersQueue::ECodec> AllowedCodecsMigration = {
- NPersQueue::ECodec::RAW,
- NPersQueue::ECodec::GZIP,
- NPersQueue::ECodec::ZSTD,
- };
-
- class TCommandWithCodecMigration {
- // TODO(shmel1k@): remove after TopicService C++ SDK supports IWriteSession
- protected:
- void AddAllowedCodecs(TClientCommand::TConfig& config) {
- TStringStream description;
- description << "Codec used for writing. Available codecs: ";
- NColorizer::TColors colors = NColorizer::AutoColors(Cout);
- for (const auto& codec : AllowedCodecsMigration) {
- auto findResult = CodecsDescriptionsMigration.find(codec);
- Y_VERIFY(findResult != CodecsDescriptionsMigration.end(),
- "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str());
- description << "\n " << colors.BoldColor() << codec << colors.OldColor()
- << "\n " << findResult->second;
- }
- config.Opts->AddLongOption("codec", description.Str())
- .RequiredArgument("STRING")
- .StoreResult(&CodecStr_);
- }
-
- void ParseCodec() {
- TString codec = to_lower(CodecStr_);
- if (codec == "raw") {
- Codec_ = NPersQueue::ECodec::RAW;
- }
- if (codec == "gzip") {
- Codec_ = NPersQueue::ECodec::GZIP;
- }
- if (codec = "zstd") {
- Codec_ = NPersQueue::ECodec::ZSTD;
- }
- }
-
- NPersQueue::ECodec GetCodec() const {
- if (CodecStr_ == "") {
- return DefaultCodec_;
- }
- return Codec_;
- }
-
- private:
- NPersQueue::ECodec DefaultCodec_ = NPersQueue::ECodec::RAW;
- TString CodecStr_;
- NPersQueue::ECodec Codec_;
- };
-
class TCommandTopicWrite: public TYdbCommand,
public TCommandWithFormat,
public TInterruptibleCommand,
public TCommandWithTopicName,
- public TCommandWithCodecMigration {
+ public TCommandWithSupportedCodecs {
public:
TCommandTopicWrite();
void Config(TConfig& config) override;
@@ -238,9 +178,9 @@ namespace NYdb::NConsoleClient {
ui64 MessageSizeLimit_ = 0;
void ParseMessageSizeLimit();
- void CheckOptions(NPersQueue::TPersQueueClient& persQueueClient);
+ void CheckOptions(NTopic::TTopicClient& persQueueClient);
private:
- NPersQueue::TWriteSessionSettings PrepareWriteSessionSettings();
+ NTopic::TWriteSessionSettings PrepareWriteSessionSettings();
};
} // namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.cpp b/ydb/public/lib/ydb_cli/topic/topic_write.cpp
index f4bbdf5cb60..ca74f4acf05 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_write.cpp
+++ b/ydb/public/lib/ydb_cli/topic/topic_write.cpp
@@ -56,7 +56,7 @@ namespace NYdb::NConsoleClient {
TTopicWriter::TTopicWriter() {
}
- TTopicWriter::TTopicWriter(std::shared_ptr<NYdb::NPersQueue::IWriteSession> writeSession,
+ TTopicWriter::TTopicWriter(std::shared_ptr<NYdb::NTopic::IWriteSession> writeSession,
TTopicWriterParams params)
: WriteSession_(writeSession)
, WriterParams_(params) {
@@ -78,6 +78,9 @@ namespace NYdb::NConsoleClient {
if (!initSeqNo.HasValue()) {
// TODO(shmel1k@): logging
+ if (initSeqNo.HasException()) {
+ initSeqNo.TryRethrow();
+ }
return EXIT_FAILURE;
}
@@ -85,30 +88,30 @@ namespace NYdb::NConsoleClient {
return EXIT_SUCCESS;
}
- int TTopicWriter::HandleAcksEvent(const NPersQueue::TWriteSessionEvent::TAcksEvent& event) {
+ int TTopicWriter::HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent& event) {
Y_UNUSED(event);
return EXIT_SUCCESS;
}
- int TTopicWriter::HandleReadyToAcceptEvent(NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ int TTopicWriter::HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) {
ContinuationToken_ = std::move(event.ContinuationToken);
return EXIT_SUCCESS;
}
- int TTopicWriter::HandleSessionClosedEvent(const NPersQueue::TSessionClosedEvent& event) {
+ int TTopicWriter::HandleSessionClosedEvent(const NTopic::TSessionClosedEvent& event) {
ThrowOnError(event);
return EXIT_FAILURE;
}
- int TTopicWriter::HandleEvent(NPersQueue::TWriteSessionEvent::TEvent& event) {
+ int TTopicWriter::HandleEvent(NTopic::TWriteSessionEvent::TEvent& event) {
return std::visit(TOverloaded{
- [&](const NPersQueue::TWriteSessionEvent::TAcksEvent& event) {
+ [&](const NTopic::TWriteSessionEvent::TAcksEvent& event) {
return HandleAcksEvent(event);
},
- [&](NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ [&](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) {
return HandleReadyToAcceptEvent(event);
},
- [&](const NPersQueue::TSessionClosedEvent& event) {
+ [&](const NTopic::TSessionClosedEvent& event) {
return HandleSessionClosedEvent(event);
},
},
@@ -150,7 +153,7 @@ namespace NYdb::NConsoleClient {
bool continueSending = true;
while (continueSending) {
while (!ContinuationToken_.Defined()) {
- TMaybe<NPersQueue::TWriteSessionEvent::TEvent> event = WriteSession_->GetEvent(true);
+ TMaybe<NTopic::TWriteSessionEvent::TEvent> event = WriteSession_->GetEvent(true);
if (event.Empty()) {
continue;
}
@@ -176,25 +179,24 @@ namespace NYdb::NConsoleClient {
if (WriteSession_->Close(TDuration::Hours(12))) {
return true;
}
- TVector<NPersQueue::TWriteSessionEvent::TEvent> events = WriteSession_->GetEvents(true);
+ TVector<NTopic::TWriteSessionEvent::TEvent> events = WriteSession_->GetEvents(true);
if (events.empty()) {
return false;
}
for (auto& evt : events) {
bool hasFailure = false;
std::visit(TOverloaded{
- [&](const NPersQueue::TWriteSessionEvent::TAcksEvent& event) {
+ [&](const NTopic::TWriteSessionEvent::TAcksEvent& event) {
Y_UNUSED(event);
},
- [&](NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ [&](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& event) {
Y_UNUSED(event);
},
- [&](const NPersQueue::TSessionClosedEvent& event) {
+ [&](const NTopic::TSessionClosedEvent& event) {
int result = HandleSessionClosedEvent(event);
if (result == EXIT_FAILURE) {
hasFailure = true;
}
- Y_UNUSED(result);
},
},
evt);
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.h b/ydb/public/lib/ydb_cli/topic/topic_write.h
index 131c0a7ea26..78062c1abc1 100644
--- a/ydb/public/lib/ydb_cli/topic/topic_write.h
+++ b/ydb/public/lib/ydb_cli/topic/topic_write.h
@@ -1,7 +1,7 @@
#pragma once
#include "ydb/public/lib/ydb_cli/commands/ydb_command.h"
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/lib/ydb_cli/common/format.h>
#include <ydb/public/lib/ydb_cli/common/interruptible.h>
@@ -48,7 +48,7 @@ namespace NYdb::NConsoleClient {
TTopicWriter();
TTopicWriter(const TTopicWriter&) = default;
TTopicWriter(TTopicWriter&&) = default;
- TTopicWriter(std::shared_ptr<NPersQueue::IWriteSession>, TTopicWriterParams);
+ TTopicWriter(std::shared_ptr<NTopic::IWriteSession>, TTopicWriterParams);
bool Close(TDuration closeTimeout = TDuration::Max());
int Init();
@@ -61,16 +61,16 @@ namespace NYdb::NConsoleClient {
bool ContinueSending = false;
};
- int HandleEvent(NPersQueue::TWriteSessionEvent::TEvent&);
- int HandleAcksEvent(const NPersQueue::TWriteSessionEvent::TAcksEvent&);
- int HandleReadyToAcceptEvent(NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent&);
- int HandleSessionClosedEvent(const NPersQueue::TSessionClosedEvent&);
+ int HandleEvent(NTopic::TWriteSessionEvent::TEvent&);
+ int HandleAcksEvent(const NTopic::TWriteSessionEvent::TAcksEvent&);
+ int HandleReadyToAcceptEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent&);
+ int HandleSessionClosedEvent(const NTopic::TSessionClosedEvent&);
TTopicWriter::TSendMessageData EnterMessage(IInputStream&); // TODO(shmel1k@): make static or like a helper function
- std::shared_ptr<NPersQueue::IWriteSession> WriteSession_;
+ std::shared_ptr<NTopic::IWriteSession> WriteSession_;
const TTopicWriterParams WriterParams_;
- TMaybe<NPersQueue::TContinuationToken> ContinuationToken_ = Nothing();
+ TMaybe<NTopic::TContinuationToken> ContinuationToken_ = Nothing();
ui64 CurrentSeqNo_ = 0;