diff options
author | shmel1k <shmel1k@ydb.tech> | 2022-08-11 21:05:04 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2022-08-11 21:05:04 +0300 |
commit | d6afcd79c70c70a1c877508d75547fd4c36fc168 (patch) | |
tree | 0408f5c01c64fc8ee7739ebd2dc781bd3c866d21 | |
parent | 58e18112b989706d6bb7a46a25b6ec02400cc60e (diff) | |
download | ydb-d6afcd79c70c70a1c877508d75547fd4c36fc168.tar.gz |
[] fixed kikimr import
31 files changed, 2575 insertions, 285 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 11de0310a3..c6292a4cc1 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -984,12 +984,13 @@ add_subdirectory(library/cpp/bucket_quoter) add_subdirectory(ydb/public/lib/ydb_cli/dump/util) add_subdirectory(ydb/public/lib/ydb_cli/dump) add_subdirectory(ydb/public/lib/ydb_cli/import) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_export) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_import) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_operation) +add_subdirectory(ydb/public/lib/ydb_cli/topic) add_subdirectory(ydb/public/sdk/cpp/client/ydb_persqueue_public) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_export) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_import) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_operation) add_subdirectory(ydb/core/testlib) add_subdirectory(library/cpp/testing/gmock_in_unittest) add_subdirectory(contrib/restricted/googletest/googlemock) @@ -1334,6 +1335,7 @@ add_subdirectory(ydb/library/yql/public/issue/ut) add_subdirectory(ydb/library/yql/public/udf/ut) add_subdirectory(ydb/public/lib/deprecated/kicli/ut) add_subdirectory(ydb/public/lib/ydb_cli/common/ut) +add_subdirectory(ydb/public/lib/ydb_cli/topic/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/ut) add_subdirectory(ydb/public/sdk/cpp/client/extensions/solomon_stats) add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 9f4cc8e0ca..4d6c28d694 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -991,12 +991,13 @@ add_subdirectory(library/cpp/bucket_quoter) add_subdirectory(ydb/public/lib/ydb_cli/dump/util) add_subdirectory(ydb/public/lib/ydb_cli/dump) add_subdirectory(ydb/public/lib/ydb_cli/import) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_export) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_import) -add_subdirectory(ydb/public/sdk/cpp/client/ydb_operation) +add_subdirectory(ydb/public/lib/ydb_cli/topic) add_subdirectory(ydb/public/sdk/cpp/client/ydb_persqueue_public) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_export) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_import) +add_subdirectory(ydb/public/sdk/cpp/client/ydb_operation) add_subdirectory(ydb/core/testlib) add_subdirectory(library/cpp/testing/gmock_in_unittest) add_subdirectory(contrib/restricted/googletest/googlemock) @@ -1341,6 +1342,7 @@ add_subdirectory(ydb/library/yql/public/issue/ut) add_subdirectory(ydb/library/yql/public/udf/ut) add_subdirectory(ydb/public/lib/deprecated/kicli/ut) add_subdirectory(ydb/public/lib/ydb_cli/common/ut) +add_subdirectory(ydb/public/lib/ydb_cli/topic/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/ut) add_subdirectory(ydb/public/sdk/cpp/client/extensions/solomon_stats) add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator) diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index c493d2190c..5040850ad7 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -186,13 +186,13 @@ protected: virtual void PrepareForExec() { ExecArgs.resize(Args.size() + 1, nullptr); - for (uint i = 0; i < Args.size(); ++i) { + for (size_t i = 0; i < Args.size(); ++i) { ExecArgs[i] = const_cast<char*>(Args[i].c_str()); } ExecEnv.resize(Env.size() + 1, nullptr); EnvElems.reserve(Env.size()); - uint i = 0; + size_t i = 0; for (const auto& [k, v] : Env) { EnvElems.push_back(k + "=" + v); ExecEnv[i++] = const_cast<char*>(EnvElems.back().c_str()); @@ -200,7 +200,7 @@ protected: } virtual void Exec() { - for (uint i = 3; i < 32768; ++i) { + for (int i = 3; i < 32768; ++i) { close(i); } @@ -410,13 +410,13 @@ private: "anon_limit=" + ToString(MemoryLimit) }; ExecArgs.resize(ArgsElems.size() + 1, nullptr); - for (uint i = 0; i < ArgsElems.size(); ++i) { + for (size_t i = 0; i < ArgsElems.size(); ++i) { ExecArgs[i] = const_cast<char*>(ArgsElems[i].c_str()); } } void Exec() override { - for (uint i = 3; i < 32768; ++i) { + for (int i = 3; i < 32768; ++i) { close(i); } diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index 7cfc7c3df0..b6a33eb5ff 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -1153,6 +1153,8 @@ public: if (!ParseAlias(value->alias, alias, colnames)) { return {}; } + } else { + alias = value->relname; } if (view) { diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.txt index 268d9718d7..5892a1d144 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.txt @@ -21,6 +21,7 @@ target_link_libraries(clicommands PUBLIC common lib-ydb_cli-dump lib-ydb_cli-import + topic cpp-client-draft cpp-client-ydb_discovery cpp-client-ydb_export diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index ec47a843f9..c3a9004321 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -1,8 +1,15 @@ +#include <contrib/libs/openssl/include/openssl/sha.h> + #include "ydb_service_topic.h" #include <ydb/public/lib/ydb_cli/commands/ydb_command.h> #include <ydb/public/lib/ydb_cli/common/command.h> +#include <ydb/public/lib/ydb_cli/topic/topic_read.h> +#include <ydb/public/lib/ydb_cli/topic/topic_util.h> +#include <ydb/public/lib/ydb_cli/topic/topic_write.h> +#include <util/generic/set.h> #include <util/stream/str.h> +#include <util/string/hex.h> #include <util/string/vector.h> namespace NYdb::NConsoleClient { @@ -20,9 +27,47 @@ namespace NYdb::NConsoleClient { std::pair<TString, NYdb::NTopic::ECodec>("lzop", NYdb::NTopic::ECodec::LZOP), std::pair<TString, NYdb::NTopic::ECodec>("zstd", NYdb::NTopic::ECodec::ZSTD), }; + + // TODO(shmel1k@): improve docs + THashMap<EStreamMetadataField, TString> StreamMetadataFieldsDescriptions = { + {EStreamMetadataField::Body, "message content"}, + {EStreamMetadataField::WriteTime, "message write time"}, + {EStreamMetadataField::CreateTime, "message creation time"}, + {EStreamMetadataField::MessageGroupID, "message group id"}, + {EStreamMetadataField::Offset, "offset"}, + {EStreamMetadataField::SeqNo, "seqno"}, + {EStreamMetadataField::Meta, "meta"}, + }; + + const TVector<EStreamMetadataField> AllStreamMetadataFields = { + EStreamMetadataField::Body, + EStreamMetadataField::WriteTime, + EStreamMetadataField::CreateTime, + EStreamMetadataField::MessageGroupID, + EStreamMetadataField::Offset, + EStreamMetadataField::SeqNo, + EStreamMetadataField::Meta, + }; + + const THashMap<TString, EStreamMetadataField> StreamMetadataFieldsMap = { + {"body", EStreamMetadataField::Body}, + {"write_time", EStreamMetadataField::WriteTime}, + {"create_time", EStreamMetadataField::CreateTime}, + {"message_group_id", EStreamMetadataField::MessageGroupID}, + {"offset", EStreamMetadataField::Offset}, + {"seq_no", EStreamMetadataField::SeqNo}, + {"meta", EStreamMetadataField::Meta}, + }; + + THashMap<ETransformBody, TString> TransformBodyDescriptions = { + {ETransformBody::None, "do not transform body to any format"}, + {ETransformBody::Base64, "transform body to base64 format"}, + }; + + constexpr TDuration DefaultIdleTimeout = TDuration::Seconds(1); } // namespace - void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NTopic::ECodec>& supportedCodecs) { + void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) { TStringStream description; description << "Comma-separated list of supported codecs. Available codecs: "; NColorizer::TColors colors = NColorizer::AutoColors(Cout); @@ -70,6 +115,7 @@ namespace NYdb::NConsoleClient { AddCommand(std::make_unique<TCommandTopicAlter>()); AddCommand(std::make_unique<TCommandTopicDrop>()); AddCommand(std::make_unique<TCommandTopicConsumer>()); + AddCommand(std::make_unique<TCommandTopicRead>()); } TCommandTopicCreate::TCommandTopicCreate() @@ -283,4 +329,379 @@ namespace NYdb::NConsoleClient { ThrowOnError(status); return EXIT_SUCCESS; } + + TCommandTopicInternal::TCommandTopicInternal() + : TClientCommandTree("topic", {}, "Experimental topic operations") { + AddCommand(std::make_unique<TCommandTopicWrite>()); + } + + TCommandTopicRead::TCommandTopicRead() + : TYdbCommand("read", {}, "Read from topic command") { + } + + void TCommandTopicRead::AddAllowedMetadataFields(TConfig& config) { + TStringStream description; + description << "Comma-separated list of message fields to print. Available fields: "; + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + for (const auto& iter : StreamMetadataFieldsDescriptions) { + description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second; + } + + config.Opts->AddLongOption("with-metadata-fields", description.Str()) + .Optional() + .StoreResult(&WithMetadataFields_); + } + + void TCommandTopicRead::AddAllowedTransformFormats(TConfig& config) { + TStringStream description; + description << "Format to transform received body: Available \"transform\" values: "; + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + for (const auto& iter : TransformBodyDescriptions) { + description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second; + } + + config.Opts->AddLongOption("transform", description.Str()) + .Optional() + .StoreResult(&TransformStr_); + } + + void TCommandTopicRead::Config(TConfig& config) { + TYdbCommand::Config(config); + config.Opts->SetFreeArgsNum(1); + SetFreeArgTitle(0, "<stream-path>", "Stream to read data"); + + AddFormats(config, { + EOutputFormat::Pretty, + EOutputFormat::NewlineDelimited, + EOutputFormat::Concatenated, + }); + + // TODO(shmel1k@): improve help. + config.Opts->AddLongOption('c', "consumer", "Consumer name") + .Required() + .StoreResult(&Consumer_); + config.Opts->AddLongOption("offset", "Offset to start read from") + .Optional() + .StoreResult(&Offset_); + config.Opts->AddLongOption("partition", "Partition to read from") + .Optional() + .StoreResult(&Partition_); + config.Opts->AddLongOption('f', "file", "File to write data to") + .Optional() + .StoreResult(&File_); + config.Opts->AddLongOption("flush-duration", "Duration for message flushing") + .Optional() + .StoreResult(&FlushDuration_); + config.Opts->AddLongOption("flush-size", "Maximum flush size") // TODO(shmel1k@): improve + .Optional() + .StoreResult(&FlushSize_); + config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve + .Optional() + .StoreResult(&FlushMessagesCount_); + config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages") + .Optional() + .DefaultValue(DefaultIdleTimeout) + .StoreResult(&IdleTimeout_); + config.Opts->AddLongOption("commit", "Commit messages after successful read") + .Optional() + .DefaultValue(true) + .StoreResult(&Commit_); + config.Opts->AddLongOption("message-size-limit", "Maximum message size") + .Optional() + .StoreResult(&MessageSizeLimit_); + config.Opts->AddLongOption("discard-above-limits", "Do not print messages with size more than defined in 'message-size-limit' option") + .Optional() + .StoreResult(&DiscardAboveLimits_); + config.Opts->AddLongOption("limit", "Messages count to read") + .Optional() + .StoreResult(&Limit_); + config.Opts->AddLongOption('w', "wait", "Wait for infinite time for first message received") + .Optional() + .NoArgument() + .StoreValue(&Wait_, true); + config.Opts->AddLongOption("timestamp", "Timestamp from which messages will be read") + .Optional() + .StoreResult(&Timestamp_); + + AddAllowedMetadataFields(config); + AddAllowedTransformFormats(config); + } + + void TCommandTopicRead::ParseMetadataFields() { + MetadataFields_ = AllStreamMetadataFields; + + // TODO(shmel1k@): discuss: disable all fields? + if (WithMetadataFields_ == "all") { + return; + } + + TVector<TString> split = SplitString(WithMetadataFields_, ","); + if (split.empty()) { + return; + } + + TSet<EStreamMetadataField> set; + for (const auto& field : split) { + auto f = StreamMetadataFieldsMap.find(field); + if (f == StreamMetadataFieldsMap.end()) { + throw TMisuseException() << "Field " << field << " not found in available fields"; // TODO(shmel1k@): improve message. + } + set.insert(f->second); + } + + TVector<EStreamMetadataField> result; + result.reserve(set.size()); + // NOTE(shmel1k@): preserving the order from AllMetadataFields + for (const auto metadataField : set) { + auto f = std::find(AllStreamMetadataFields.begin(), AllStreamMetadataFields.end(), metadataField); + if (f == AllStreamMetadataFields.end()) { + continue; + } + result.push_back(metadataField); + } + + MetadataFields_ = result; + } + + void TCommandTopicRead::ParseTransformFormat() { + if (!TransformStr_.Defined()) { + return; + } + + TString val = *TransformStr_; + if (val == (TStringBuilder() << ETransformBody::None)) { + return; + } + if (val == (TStringBuilder() << ETransformBody::Base64)) { + Transform_ = ETransformBody::Base64; + return; + } + + throw TMisuseException() << "Transform " << *TransformStr_ << " not found in available \"transform\" values"; + } + + void TCommandTopicRead::Parse(TConfig& config) { + TYdbCommand::Parse(config); + ParseTopicName(config, 0); + ParseFormats(); + ParseMetadataFields(); + ParseTransformFormat(); + } + + NTopic::TReadSessionSettings TCommandTopicRead::PrepareReadSessionSettings() { + NTopic::TReadSessionSettings settings; + settings.ConsumerName(Consumer_); + // settings.ReadAll(); // TODO(shmel1k@): change to read only original? + if (Timestamp_.Defined()) { + settings.ReadFromTimestamp(TInstant::Seconds(*(Timestamp_.Get()))); + } + + // TODO(shmel1k@): partition can be added here. + NTopic::TTopicReadSettings readSettings; + readSettings.Path(TopicName); + settings.AppendTopics(std::move(readSettings)); + return settings; + } + + void TCommandTopicRead::ValidateConfig() { + // TODO(shmel1k@): add more formats. + if (OutputFormat != EOutputFormat::Default && (Limit_.Defined() && (Limit_ < 0 || Limit_ > 500))) { + throw TMisuseException() << "OutputFormat " << OutputFormat << " is not compatible with " + << "limit equal '0' or more than '500': '" << *Limit_ << "' was given"; + } + } + + int TCommandTopicRead::Run(TConfig& config) { + ValidateConfig(); + + auto driver = std::make_unique<TDriver>(CreateDriver(config)); + NTopic::TTopicClient persQueueClient(*driver); + auto readSession = persQueueClient.CreateReadSession(PrepareReadSessionSettings()); + + TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient); + checker.CheckTopicExistence(TopicName, Consumer_); + + { + TTopicReader reader = TTopicReader(std::move(readSession), TTopicReaderSettings( + Limit_, + Commit_, + Wait_, + OutputFormat, + MetadataFields_, + Transform_, + IdleTimeout_)); + + reader.Init(); + + int status = 0; + if (File_.Defined()) { + TFileOutput out(*File_); + status = reader.Run(out); + reader.Close(out); + } else { + status = reader.Run(Cout); + reader.Close(Cout); + } + if (status) { + return status; + } + } + + driver->Stop(true); + + return EXIT_SUCCESS; + } + + TCommandTopicWrite::TCommandTopicWrite() + : TYdbCommand("write", {}, "Write to topic command") { + } + + void TCommandTopicWrite::Config(TConfig& config) { + TYdbCommand::Config(config); + config.Opts->SetFreeArgsNum(1); + SetFreeArgTitle(0, "<topic-path>", "Topic to write data"); + + AddInputFormats(config, { + EOutputFormat::NewlineDelimited, + EOutputFormat::SingleMessage, + // EOutputFormat::JsonRawStreamConcat, + // EOutputFormat::JsonRawArray, + EOutputFormat::SingleMessage, + }); + AddAllowedCodecs(config); + + // TODO(shmel1k@): improve help. + config.Opts->AddLongOption('d', "delimiter", "Delimiter to split messages") + .Optional() + .StoreResult(&Delimiter_); + config.Opts->AddLongOption('f', "file", "File to read data from") + .Optional() + .StoreResult(&File_); + config.Opts->AddLongOption("message-size-limit", "Size limit for a single message") + .Optional() + .StoreResult(&MessageSizeLimitStr_); + config.Opts->AddLongOption("batch-duration", "Duration for message batching") + .Optional() + .StoreResult(&BatchDuration_); + config.Opts->AddLongOption("batch-size", "Maximum batch size") // TODO(shmel1k@): improve + .Optional() + .StoreResult(&BatchSize_); + config.Opts->AddLongOption("batch-messages-count", "") // TODO(shmel1k@): improve + .Optional() + .StoreResult(&BatchMessagesCount_); + config.Opts->AddLongOption("message-group-id", "Message Group ID") // TODO(shmel1k@): improve + .Optional() + .StoreResult(&MessageGroupId_); + } + + void TCommandTopicWrite::Parse(TConfig& config) { + TYdbCommand::Parse(config); + ParseTopicName(config, 0); + ParseFormats(); + ParseCodec(); + + if (Delimiter_.Defined() && InputFormat != EOutputFormat::Default) { + throw TMisuseException() << "Both mutually exclusive options \"delimiter\"(\"--delimiter\", \"-d\" " + << "and \"input format\"(\"--input-format\") were provided."; + } + } + + NPersQueue::TWriteSessionSettings TCommandTopicWrite::PrepareWriteSessionSettings() { + NPersQueue::TWriteSessionSettings settings; + settings.Codec(GetCodec()); // TODO(shmel1k@): codecs? + settings.Path(TopicName); + //settings.BatchFlushInterval(BatchDuration_); + //settings.BatchFlushSizeBytes(BatchSize_); + settings.ClusterDiscoveryMode(NPersQueue::EClusterDiscoveryMode::Auto); + + if (!MessageGroupId_.Defined()) { + const TString rnd = ToString(TInstant::Now().NanoSeconds()); + SHA_CTX ctx; + SHA1_Init(&ctx); + SHA1_Update(&ctx, rnd.data(), rnd.size()); + unsigned char sha1[SHA_DIGEST_LENGTH]; + SHA1_Final(sha1, &ctx); + + TString hex = HexEncode(TString(reinterpret_cast<const char*>(sha1), SHA_DIGEST_LENGTH)); + hex.to_lower(); + MessageGroupId_ = TString(hex.begin(), hex.begin() + 6); + } + + settings.MessageGroupId(*MessageGroupId_); + + return settings; + } + + void TCommandTopicWrite::CheckOptions(NPersQueue::TPersQueueClient& persQueueClient) { + NPersQueue::TAsyncDescribeTopicResult descriptionFuture = persQueueClient.DescribeTopic(TopicName); + descriptionFuture.Wait(TDuration::Seconds(1)); + NPersQueue::TDescribeTopicResult description = descriptionFuture.GetValueSync(); + ThrowOnError(description); + + NPersQueue::ECodec codec = GetCodec(); + bool exists = false; + for (const auto c : description.TopicSettings().SupportedCodecs()) { + if (c == codec) { + exists = true; + break; + } + } + if (exists) { + return; + } + TStringStream errorMessage; + errorMessage << "Codec \"" << (TStringBuilder() << codec) << "\" is not available for topic. Available codecs:\n"; + for (const auto c : description.TopicSettings().SupportedCodecs()) { + errorMessage << " " << (TStringBuilder() << c) << "\n"; + } + throw TMisuseException() << errorMessage.Str(); + } + + int TCommandTopicWrite::Run(TConfig& config) { + SetInterruptHandlers(); + + auto driver = std::make_unique<TDriver>(CreateDriver(config)); + NPersQueue::TPersQueueClient persQueueClient(*driver); + + CheckOptions(persQueueClient); + + // TODO(shmel1k@): return back after IWriteSession in TopicService SDK + // TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient); + // checker.CheckTopicExistence(TopicName); + + { + auto writeSession = NPersQueue::TPersQueueClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings())); + auto writer = TTopicWriter(writeSession, std::move(TTopicWriterParams( + InputFormat, + Delimiter_, + MessageSizeLimit_, + BatchDuration_, + BatchSize_, + BatchMessagesCount_))); + + if (int status = writer.Init(); status) { + return status; + } + + int status = 0; + if (File_.Defined()) { + TFileInput input(*File_); + status = writer.Run(input); + } else { + status = writer.Run(Cin); + } + if (status) { + return status; + } + + if (!writer.Close()) { + Cerr << "Failed to close session" << Endl; + return EXIT_FAILURE; + } + } + + driver->Stop(true); + return EXIT_SUCCESS; + } + } // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index 555af2f99e..95a3f97f11 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -3,8 +3,12 @@ #include "ydb_command.h" #include "ydb_common.h" +#include <ydb/public/lib/ydb_cli/common/interruptible.h> +#include <ydb/public/lib/ydb_cli/topic/topic_read.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> + namespace NYdb::NConsoleClient { TVector<NYdb::NTopic::ECodec> InitAllowedCodecs(); const TVector<NYdb::NTopic::ECodec> AllowedCodecs = InitAllowedCodecs(); @@ -87,4 +91,142 @@ namespace NYdb::NConsoleClient { private: TString ConsumerName_; }; + + class TCommandTopicInternal: public TClientCommandTree { + public: + TCommandTopicInternal(); + }; + + class TCommandTopicRead: public TYdbCommand, + public TCommandWithFormat, + public TInterruptibleCommand, + public TCommandWithTopicName { + public: + TCommandTopicRead(); + void Config(TConfig& config) override; + void Parse(TConfig& config) override; + int Run(TConfig& config) override; + + private: + TString Consumer_ = ""; + TMaybe<uint32_t> Offset_; + TMaybe<uint32_t> Partition_; + TMaybe<ui64> Timestamp_; + TMaybe<TString> File_; + TMaybe<TString> TransformStr_; + + TMaybe<TDuration> FlushDuration_; + TMaybe<int> FlushSize_; + TMaybe<int> FlushMessagesCount_; + TDuration IdleTimeout_; + + TString WithMetadataFields_ = "all"; // TODO(shmel1k@): improve. + TVector<EStreamMetadataField> MetadataFields_; + + TMaybe<ui64> MessageSizeLimit_; + TMaybe<i64> Limit_ = Nothing(); + ETransformBody Transform_ = ETransformBody::None; + + bool Commit_ = false; + bool DiscardAboveLimits_ = false; + bool Wait_ = false; + + private: + void ValidateConfig(); + void AddAllowedMetadataFields(TConfig& config); + void ParseMetadataFields(); + void AddAllowedTransformFormats(TConfig& config); + void ParseTransformFormat(); + NTopic::TReadSessionSettings PrepareReadSessionSettings(); + }; + + namespace { + const THashMap<NYdb::NPersQueue::ECodec, TString> CodecsDescriptionsMigration = { + {NYdb::NPersQueue::ECodec::RAW, "Raw codec. No data compression(default)"}, + {NYdb::NPersQueue::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"}, + {NYdb::NPersQueue::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"}, + {NYdb::NPersQueue::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"}, + }; + } + + const TVector<NYdb::NPersQueue::ECodec> AllowedCodecsMigration = { + NPersQueue::ECodec::RAW, + NPersQueue::ECodec::GZIP, + NPersQueue::ECodec::ZSTD, + }; + + class TCommandWithCodecMigration { + // TODO(shmel1k@): remove after TopicService C++ SDK supports IWriteSession + protected: + void AddAllowedCodecs(TClientCommand::TConfig& config) { + TStringStream description; + description << "Codec used for writing. Available codecs: "; + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + for (const auto& codec : AllowedCodecsMigration) { + auto findResult = CodecsDescriptionsMigration.find(codec); + Y_VERIFY(findResult != CodecsDescriptionsMigration.end(), + "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str()); + description << "\n " << colors.BoldColor() << codec << colors.OldColor() + << "\n " << findResult->second; + } + config.Opts->AddLongOption("codec", description.Str()) + .RequiredArgument("STRING") + .StoreResult(&CodecStr_); + } + + void ParseCodec() { + TString codec = to_lower(CodecStr_); + if (codec == "raw") { + Codec_ = NPersQueue::ECodec::RAW; + } + if (codec == "gzip") { + Codec_ = NPersQueue::ECodec::GZIP; + } + if (codec = "zstd") { + Codec_ = NPersQueue::ECodec::ZSTD; + } + } + + NPersQueue::ECodec GetCodec() const { + if (CodecStr_ == "") { + return DefaultCodec_; + } + return Codec_; + } + + private: + NPersQueue::ECodec DefaultCodec_ = NPersQueue::ECodec::RAW; + TString CodecStr_; + NPersQueue::ECodec Codec_; + }; + + class TCommandTopicWrite: public TYdbCommand, + public TCommandWithFormat, + public TInterruptibleCommand, + public TCommandWithTopicName, + public TCommandWithCodecMigration { + public: + TCommandTopicWrite(); + void Config(TConfig& config) override; + void Parse(TConfig& config) override; + int Run(TConfig& config) override; + + private: + TMaybe<TString> File_; + TMaybe<TString> Delimiter_; + TMaybe<TString> MessageSizeLimitStr_; // TODO(shmel1k@): think how to parse + + // TODO(shmel1k@): move to 'TWithBatchingCommand' or something like that. + TMaybe<TDuration> BatchDuration_; + TMaybe<ui64> BatchSize_; + TMaybe<ui64> BatchMessagesCount_; + TMaybe<TString> MessageGroupId_; + + ui64 MessageSizeLimit_ = 0; + void ParseMessageSizeLimit(); + void CheckOptions(NPersQueue::TPersQueueClient& persQueueClient); + + private: + NPersQueue::TWriteSessionSettings PrepareWriteSessionSettings(); + }; } // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/topic/CMakeLists.txt b/ydb/public/lib/ydb_cli/topic/CMakeLists.txt new file mode 100644 index 0000000000..84b0725ec1 --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/CMakeLists.txt @@ -0,0 +1,28 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(topic) +target_link_libraries(topic PUBLIC + contrib-libs-cxxsupp + yutil + common + cpp-client-ydb_proto + cpp-client-ydb_persqueue_public + cpp-client-ydb_topic + tools-enum_parser-enum_serialization_runtime +) +target_sources(topic PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_read.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write.cpp +) +generate_enum_serilization(topic + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h + INCLUDE_HEADERS + ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h +) diff --git a/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h b/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h new file mode 100644 index 0000000000..7ecc1bbda0 --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h @@ -0,0 +1,20 @@ +#pragma once + +#include <util/string/vector.h> + +namespace NYdb::NConsoleClient { + enum class EStreamMetadataField { + Body /* "body" */, + WriteTime /* "write_time" */, + CreateTime /* "create_time" */, + MessageGroupID /* "message_group_id" */, + Offset /* "offset" */, + SeqNo /* "seq_no" */, + Meta /* "meta" */, + }; + + enum class ETransformBody { + None = 0 /* "none" */, + Base64 = 1 /* "base64" */, + }; +} // namespace NYdb::NConsoleClient
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp new file mode 100644 index 0000000000..34a213b6f9 --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp @@ -0,0 +1,278 @@ +#include "topic_metadata_fields.h" +#include "topic_read.h" +#include "topic_util.h" +#include "topic_write.h" +#include <library/cpp/json/json_reader.h> +#include <library/cpp/string_utils/base64/base64.h> +#include <util/generic/set.h> + +namespace NYdb::NConsoleClient { + namespace { + constexpr i64 MessagesLimitUnlimited = -1; + constexpr i64 MessagesLimitDefaultPrettyFormat = 10; + constexpr i64 MessagesLimitDefaultJsonArrayFormat = 500; + + bool IsStreamingFormat(EOutputFormat format) { + return format == EOutputFormat::NewlineBase64 || format == EOutputFormat::NewlineDelimited || format == EOutputFormat::Concatenated; + } + } + + TTopicReaderSettings::TTopicReaderSettings() { + } + + TTopicReaderSettings::TTopicReaderSettings( + TMaybe<i64> limit, + bool commit, + bool wait, + EOutputFormat format, + TVector<EStreamMetadataField> metadataFields, + ETransformBody transform, + TDuration idleTimeout) + : MetadataFields_(metadataFields) + , IdleTimeout_(idleTimeout) + , OutputFormat_(format) + , Transform_(transform) + , Limit_(limit) + , Commit_(commit) + , Wait_(wait) { + } + + TTopicReader::TTopicReader( + std::shared_ptr<NTopic::IReadSession> readSession, + TTopicReaderSettings params) + : ReadSession_(readSession) + , ReaderParams_(params) { + } + + void TTopicReader::Init() { + TVector<TString> tableFields; + for (const auto& f : ReaderParams_.MetadataFields()) { + tableFields.emplace_back(TStringBuilder() << f); + } + TPrettyTable table(tableFields); + OutputTable_ = std::make_unique<TPrettyTable>(table); + + if (!ReaderParams_.Limit().Defined()) { + if (IsStreamingFormat(ReaderParams_.OutputFormat())) { + MessagesLeft_ = MessagesLimitUnlimited; + } + if (ReaderParams_.OutputFormat() == EOutputFormat::Pretty) { + MessagesLeft_ = MessagesLimitDefaultPrettyFormat; + } + if (ReaderParams_.OutputFormat() == EOutputFormat::JsonRawArray) { + MessagesLeft_ = MessagesLimitDefaultJsonArrayFormat; + } + return; + } + + i64 limit = *(ReaderParams_.Limit()); + if (IsStreamingFormat(ReaderParams_.OutputFormat()) && limit == 0) { + limit = -1; + } + MessagesLeft_ = limit; + } + + namespace { + const TString FormatBody(const TString& body, ETransformBody transform) { + if (transform == ETransformBody::Base64) { + return Base64Encode(body); + } + + return body; + } + + using TReceivedMessage = NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage; + + void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, EOutputFormat format, ETransformBody transform, EStreamMetadataField f, size_t idx) { + switch (f) { + case EStreamMetadataField::Body: + if (format == EOutputFormat::PrettyBase64) { + row.Column(idx, FormatBody(message.GetData(), transform)); + } + if (format == EOutputFormat::Pretty || format == EOutputFormat::PrettyRaw) { + row.Column(idx, FormatBody(message.GetData(), transform)); + } + if (format == EOutputFormat::PrettyUnicode) { + row.Column(idx, message.GetData()); + } + break; + + case EStreamMetadataField::CreateTime: + row.Column(idx, message.GetCreateTime()); + break; + case EStreamMetadataField::MessageGroupID: + row.Column(idx, message.GetMessageGroupId()); + break; + case EStreamMetadataField::Offset: + row.Column(idx, message.GetOffset()); + break; + case EStreamMetadataField::WriteTime: + row.Column(idx, message.GetWriteTime()); // improve for pretty + break; + case EStreamMetadataField::SeqNo: + row.Column(idx, message.GetSeqNo()); + break; + case EStreamMetadataField::Meta: + NJson::TJsonValue json; + for (auto const& [k, v] : message.GetMeta()->Fields) { + json[k] = v; + } + row.Column(idx, json); + break; + } + } + } // namespace + + void TTopicReader::PrintMessagesInPrettyFormat(IOutputStream& output) { + for (const auto& message : ReceivedMessages_) { + TPrettyTable::TRow& row = OutputTable_->AddRow(); + for (size_t i = 0; i < ReaderParams_.MetadataFields().size(); ++i) { + EStreamMetadataField f = ReaderParams_.MetadataFields()[i]; + AddMetadataFieldToRow(row, message, ReaderParams_.OutputFormat(), ReaderParams_.Transform(), f, i); + } + } + + OutputTable_->Print(output); + } + + void TTopicReader::PrintMessagesInJsonArrayFormat(IOutputStream& output) { + // TODO(shmel1k@): not implemented yet. + Y_UNUSED(output); + } + + void TTopicReader::Close(IOutputStream& output, TDuration closeTimeout) { + if (ReaderParams_.OutputFormat() == EOutputFormat::Pretty || + ReaderParams_.OutputFormat() == EOutputFormat::PrettyBase64 || + ReaderParams_.OutputFormat() == EOutputFormat::PrettyRaw) { + PrintMessagesInPrettyFormat(output); + } + if (ReaderParams_.OutputFormat() == EOutputFormat::JsonRawArray || + ReaderParams_.OutputFormat() == EOutputFormat::JsonBase64Array || + ReaderParams_.OutputFormat() == EOutputFormat::JsonUnicodeArray) { + PrintMessagesInJsonArrayFormat(output); + } + output.Flush(); + bool success = ReadSession_->Close(closeTimeout); + if (!success) { + throw yexception() << "Failed to close read session\n"; + } + } + + void TTopicReader::HandleReceivedMessage(const TReceivedMessage& message, IOutputStream& output) { + EOutputFormat outputFormat = ReaderParams_.OutputFormat(); + if (outputFormat == EOutputFormat::Default || outputFormat == EOutputFormat::Concatenated) { + output << FormatBody(message.GetData(), ReaderParams_.Transform()); + output.Flush(); + return; + } + if (outputFormat == EOutputFormat::NewlineDelimited) { + output << FormatBody(message.GetData(), ReaderParams_.Transform()); + output << "\n"; + output.Flush(); + return; + } + if (outputFormat == EOutputFormat::Default) { + output << FormatBody(message.GetData(), ReaderParams_.Transform()); + return; + } + + ReceivedMessages_.push_back(message); + } + + int TTopicReader::HandleDataReceivedEvent(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent* event, IOutputStream& output) { + HasFirstMessage_ = true; + + NTopic::TDeferredCommit defCommit; + for (const auto& message : event->GetMessages()) { + HandleReceivedMessage(message, output); + if (ReaderParams_.Commit()) { + defCommit.Add(message); + } + + if (MessagesLeft_ == MessagesLimitUnlimited) { + continue; + } + + --MessagesLeft_; + if (MessagesLeft_ == 0) { + break; + } + } + + if (ReaderParams_.Commit()) { + defCommit.Commit(); + } + LastMessageReceivedTs_ = Now(); + return EXIT_SUCCESS; + } + + int TTopicReader::HandleStartPartitionSessionEvent(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent* event) { + event->Confirm(); + + return EXIT_SUCCESS; + } + + int TTopicReader::HandleCommitOffsetAcknowledgementEvent(NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent* event) { + Y_UNUSED(event); + + return EXIT_SUCCESS; + } + + int TTopicReader::HandleEvent(TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent>& event, IOutputStream& output) { + if (!event) { + return 0; + } + + if (auto* dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) { + return HandleDataReceivedEvent(dataEvent, output); + } else if (auto* createPartitionStreamEvent = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { + return HandleStartPartitionSessionEvent(createPartitionStreamEvent); + } else if (auto* commitEvent = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) { + return HandleCommitOffsetAcknowledgementEvent(commitEvent); + } else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) { + ThrowOnError(*sessionClosedEvent); + return 1; + } + return 0; + } + + int TTopicReader::Run(IOutputStream& output) { + // TODO(shmel1k@): improve behavior according to documentation + constexpr TDuration MaxWaitTime = TDuration::Seconds(1); // TODO(shmel1k@): to consts + + LastMessageReceivedTs_ = TInstant::Now(); + + bool waitForever = ReaderParams_.Wait() && (ReaderParams_.OutputFormat() == EOutputFormat::NewlineDelimited || ReaderParams_.OutputFormat() == EOutputFormat::Concatenated); + + while ((MessagesLeft_ > 0 || MessagesLeft_ == -1) && !IsInterrupted()) { + TInstant messageReceiveDeadline = LastMessageReceivedTs_ + MaxWaitTime; + NThreading::TFuture<void> future = ReadSession_->WaitEvent(); + future.Wait(messageReceiveDeadline); + if (!future.HasValue()) { + if (ReaderParams_.Wait() && !HasFirstMessage_) { + LastMessageReceivedTs_ = Now(); + continue; + } + + if (waitForever) { + continue; + } + + return EXIT_SUCCESS; + } + + // TODO(shmel1k@): throttling? + // TODO(shmel1k@): think about limiting size of events + TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = ReadSession_->GetEvent(true); + if (!event) { + // TODO(shmel1k@): does it work properly? + continue; + } + + if (int status = HandleEvent(event, output); status) { + return status; + } + } + return EXIT_SUCCESS; + } +} // namespace NYdb::NConsoleClient
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h new file mode 100644 index 0000000000..f3297007bd --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/topic_read.h @@ -0,0 +1,92 @@ +#pragma once + +#include "topic_metadata_fields.h" +#include "ydb/public/lib/ydb_cli/commands/ydb_command.h" +#include <util/stream/null.h> +#include <ydb/public/lib/ydb_cli/common/format.h> +#include <ydb/public/lib/ydb_cli/common/interruptible.h> +#include <ydb/public/lib/ydb_cli/common/pretty_table.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +namespace NYdb::NConsoleClient { +#define GETTER(TYPE, NAME) \ + TYPE NAME() const { \ + return NAME##_; \ + } + + class TTopicReaderSettings { + public: + TTopicReaderSettings( + TMaybe<i64> limit, + bool commit, + bool wait, + EOutputFormat format, + TVector<EStreamMetadataField> metadataFields, + ETransformBody transform, + TDuration idleTimeout); + + TTopicReaderSettings(); + TTopicReaderSettings(const TTopicReaderSettings&) = default; + TTopicReaderSettings(TTopicReaderSettings&&) = default; + + GETTER(TVector<EStreamMetadataField>, MetadataFields); + GETTER(bool, Commit); + GETTER(TMaybe<i64>, Limit); + GETTER(bool, Wait); + GETTER(EOutputFormat, OutputFormat); + GETTER(ETransformBody, Transform); + GETTER(TDuration, IdleTimeout); + // TODO(shmel1k@): add batching settings. + + private: + TVector<EStreamMetadataField> MetadataFields_; + TMaybe<TDuration> FlushDuration_; + TMaybe<int> FlushSize_; + TMaybe<int> FlushMessagesCount_; + TDuration IdleTimeout_; + + EOutputFormat OutputFormat_ = EOutputFormat::Default; + ETransformBody Transform_ = ETransformBody::None; + TMaybe<i64> Limit_ = Nothing(); + bool Commit_ = false; + bool Wait_ = false; + }; + + class TTopicReaderTests; + + // TODO(shmel1k@): think about interruption here. + class TTopicReader: public TInterruptibleCommand { + using TReceivedMessage = NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage; + + public: + TTopicReader(std::shared_ptr<NTopic::IReadSession>, TTopicReaderSettings); + + void Close(IOutputStream& output, TDuration closeTimeout = TDuration::Max()); + void Init(); + int Run(IOutputStream&); + + void HandleReceivedMessage(const TReceivedMessage& message, IOutputStream& output); + + int HandleStartPartitionSessionEvent(NTopic::TReadSessionEvent::TStartPartitionSessionEvent*); + int HandleDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent*, IOutputStream&); + int HandleCommitOffsetAcknowledgementEvent(NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent*); + int HandleEvent(TMaybe<NTopic::TReadSessionEvent::TEvent>&, IOutputStream&); + + private: + void PrintMessagesInPrettyFormat(IOutputStream& output); + void PrintMessagesInJsonArrayFormat(IOutputStream& output); + + private: + std::shared_ptr<NTopic::IReadSession> ReadSession_; + const TTopicReaderSettings ReaderParams_; + + i64 MessagesLeft_ = 1; // Messages left to read. -1 means 'unlimited' + bool HasFirstMessage_ = false; + TInstant LastMessageReceivedTs_; + + std::unique_ptr<TPrettyTable> OutputTable_; + TVector<TReceivedMessage> ReceivedMessages_; + + friend class TTopicReaderTests; + }; +} // namespace NYdb::NConsoleClient
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp new file mode 100644 index 0000000000..d18bc53c4d --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp @@ -0,0 +1,160 @@ +#include "topic_read.h" +#include <library/cpp/testing/unittest/registar.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h> +#include <ydb/services/persqueue_v1/ut/persqueue_test_fixture.h> + +namespace NYdb::NConsoleClient { + class TTopicReaderTests: public NUnitTest::TTestBase { + using IReadSession = std::shared_ptr<NYdb::NTopic::IReadSession>; + + public: + UNIT_TEST_SUITE(TTopicReaderTests) + UNIT_TEST(TestRun_ReadOneMessage); + UNIT_TEST(TestRun_ReadTwoMessages_With_Limit_1); + UNIT_TEST(TestRun_ReadMoreMessagesThanLimit_Without_Wait_NewlineDelimited); + UNIT_TEST(TestRun_ReadMoreMessagesThanLimit_Without_Wait_NoDelimiter); + UNIT_TEST(TestRun_ReadMessages_Output_Base64); + UNIT_TEST(TestRun_Read_Less_Messages_Than_Sent); + UNIT_TEST_SUITE_END(); + + void TestRun_ReadOneMessage() { + RunTest({ + "some simple message", + }, + { + "some simple message", + }, + "", TTopicReaderSettings(Nothing(), false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1))); + } + + void TestRun_ReadTwoMessages_With_Limit_1() { + RunTest({ + "message1", + "message2", + }, + { + "message1", + }, + "", TTopicReaderSettings(1, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1))); + } + + void TestRun_ReadMoreMessagesThanLimit_Without_Wait_NewlineDelimited() { + ui64 limit = 4; + RunTest( + { + "message1", + "message2", + "message3", + }, + { + "message1", + "message2", + "message3", + }, + "\n", TTopicReaderSettings(limit, false, false, EOutputFormat::NewlineDelimited, {}, ETransformBody::None, TDuration::Seconds(1))); + } + + void TestRun_ReadMoreMessagesThanLimit_Without_Wait_NoDelimiter() { + ui64 limit = 5; + RunTest( + {"message1", + "message2", + "message3", + "message4"}, + { + "message1message2message3message4", + }, + "", TTopicReaderSettings(limit, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1))); + } + + void TestRun_ReadMessages_Output_Base64() { + ui64 limit = 3; + RunTest( + { + "message1", + "message2", + "message3", + }, + { + "bWVzc2FnZTE=", + "bWVzc2FnZTI=", + "bWVzc2FnZTM=", + }, + "\n", TTopicReaderSettings(limit, false, false, EOutputFormat::NewlineDelimited, {}, ETransformBody::Base64, TDuration::Seconds(1))); + } + + void TestRun_Read_Less_Messages_Than_Sent() { + ui64 limit = 2; + RunTest( + { + "message1", + "message2", + "message3", + }, + { + "message1message2", + }, + "", TTopicReaderSettings(limit, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1))); + } + + private: + void WriteTestData(NYdb::TDriver* driver, const TString& topicPath, const TVector<TString>& data) { + auto writer = CreateSimpleWriter(*driver, topicPath, "source1", {}, TString("raw")); + for (size_t i = 1; i <= data.size(); ++i) { + UNIT_ASSERT(writer->Write(data[i - 1], i)); + } + writer->Close(); + } + + void RunTest( + const TVector<TString>& dataToWrite, + const TVector<TString>& expected, + const TString& delimiter, + TTopicReaderSettings&& settings) { + Cerr << "=== Starting PQ server\n"; + // NOTE(shmel1k@): old PQ configuration. Waiting for topicservice. + TPersQueueV1TestServer server; + Cerr << "=== Started PQ server\n"; + + SET_LOCALS; + + const TString topicPath = server.GetTopic(); + auto driver = server.Server->AnnoyingClient->GetDriver(); + server.Server->AnnoyingClient->CreateConsumer("cli"); + NPersQueue::TPersQueueClient persQueueClient(*driver); + + WriteTestData(driver, topicPath, dataToWrite); + NTopic::TReadSessionSettings readSessionSettings = PrepareReadSessionSettings(topicPath); + IReadSession sess = CreateTopicReader(*driver, readSessionSettings); + TTopicReader reader(sess, settings); + reader.Init(); + + TStringStream output; + int status = reader.Run(output); + UNIT_ASSERT_EQUAL(status, 0); + + TVector<TString> split; + Split(output.Str(), delimiter, split); + + UNIT_ASSERT_VALUES_EQUAL(split.size(), expected.size()); + for (size_t i = 0; i < split.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(split[i], expected[i]); + } + } + + NTopic::TReadSessionSettings PrepareReadSessionSettings(const TString& topicPath) { + NTopic::TReadSessionSettings settings; + settings.ConsumerName("cli"); + settings.AppendTopics(topicPath); + + return settings; + } + + private: + IReadSession CreateTopicReader(NYdb::TDriver& driver, NYdb::NTopic::TReadSessionSettings& settings) { + return NTopic::TTopicClient(driver).CreateReadSession(settings); + } + }; + + UNIT_TEST_SUITE_REGISTRATION(TTopicReaderTests); +} // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/topic/topic_util.h b/ydb/public/lib/ydb_cli/topic/topic_util.h new file mode 100644 index 0000000000..54fd9ee7de --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/topic_util.h @@ -0,0 +1,41 @@ +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +namespace NYdb::NConsoleClient { + namespace { + constexpr TDuration DefaultCheckTopicExistenceTimeout = TDuration::Seconds(1); + } + + class TTopicInitializationChecker { + public: + TTopicInitializationChecker(NTopic::TTopicClient& topicClient) + : TopicClient_(topicClient) { + } + + void CheckTopicExistence(TString& topicPath, TString consumer = "", TDuration timeout = DefaultCheckTopicExistenceTimeout) { + NTopic::TAsyncDescribeTopicResult descriptionFuture = TopicClient_.DescribeTopic(topicPath); + descriptionFuture.Wait(timeout); + NTopic::TDescribeTopicResult description = descriptionFuture.GetValueSync(); + ThrowOnError(description); + + if (consumer == "") { + return; + } + + bool hasConsumer = false; + for (const auto& rr : description.GetTopicDescription().GetConsumers()) { + if (rr.GetConsumerName() == consumer) { + hasConsumer = true; + break; + } + } + + if (!hasConsumer) { + throw yexception() << "No consumer \"" << consumer << "\" found"; + } + } + + private: + NTopic::TTopicClient TopicClient_; + }; +} // namespace NYdb::NConsoleClient
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.cpp b/ydb/public/lib/ydb_cli/topic/topic_write.cpp new file mode 100644 index 0000000000..93d8c09eb6 --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/topic_write.cpp @@ -0,0 +1,207 @@ +#include "topic_util.h" +#include "topic_write.h" +#include <contrib/libs/openssl/include/openssl/sha.h> +#include <util/generic/overloaded.h> +#include <util/stream/tokenizer.h> +#include <util/string/hex.h> + +namespace NYdb::NConsoleClient { + namespace { + constexpr TDuration DefaultMessagesWaitTimeout = TDuration::Seconds(1); + } + + TTopicWriterParams::TTopicWriterParams() { + } + + TTopicWriterParams::TTopicWriterParams(EOutputFormat inputFormat, TMaybe<TString> delimiter, + ui64 messageSizeLimit, TMaybe<TDuration> batchDuration, + TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount) + : InputFormat_(inputFormat) + , BatchDuration_(batchDuration) + , BatchSize_(batchSize) + , BatchMessagesCount_(batchMessagesCount) + , MessageSizeLimit_(messageSizeLimit) { + if (inputFormat == EOutputFormat::NewlineDelimited || inputFormat == EOutputFormat::Concatenated) { + Delimiter_ = TMaybe<char>('\n'); + } + if (delimiter.Defined()) { + // TODO(shmel1k@): remove when string delimiter is supported. + // TODO(shmel1k@): think about better way. + if (delimiter == "\\n") { + Delimiter_ = TMaybe<char>('\n'); + return; + } + if (delimiter == "\\t") { + Delimiter_ = TMaybe<char>('\t'); + return; + } + if (delimiter == "\r") { + Delimiter_ = TMaybe<char>('\r'); + return; + } + if (delimiter == "\0") { + Delimiter_ = TMaybe<char>('\0'); + return; + } + if (delimiter == "") { + Delimiter_ = Nothing(); + return; + } + + Y_ENSURE(delimiter->Size() == 1, "Invalid delimiter size, should be <= 1"); + Delimiter_ = TMaybe<char>(delimiter->at(0)); + } + } + + TTopicWriter::TTopicWriter() { + } + + TTopicWriter::TTopicWriter(std::shared_ptr<NYdb::NPersQueue::IWriteSession> writeSession, + TTopicWriterParams params) + : WriteSession_(writeSession) + , WriterParams_(params) { + } + + int TTopicWriter::Init() { + TInstant endPreparationTime = Now() + DefaultMessagesWaitTimeout; + NThreading::TFuture<ui64> initSeqNo = WriteSession_->GetInitSeqNo(); + + while (Now() < endPreparationTime) { + // TODO(shmel1k@): handle situation if seqNo already exists but with exception. + if (!initSeqNo.HasValue() && !initSeqNo.Wait(TDuration::Seconds(1))) { + // TODO(shmel1k@): change logs + Cerr << "no init seqno yet" << Endl; + continue; + } + break; + } + + if (!initSeqNo.HasValue()) { + // TODO(shmel1k@): logging + return EXIT_FAILURE; + } + + CurrentSeqNo_ = initSeqNo.GetValue() + 1; + return EXIT_SUCCESS; + } + + int TTopicWriter::HandleAcksEvent(const NPersQueue::TWriteSessionEvent::TAcksEvent& event) { + Y_UNUSED(event); + return EXIT_SUCCESS; + } + + int TTopicWriter::HandleReadyToAcceptEvent(NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) { + ContinuationToken_ = std::move(event.ContinuationToken); + return EXIT_SUCCESS; + } + + int TTopicWriter::HandleSessionClosedEvent(const NPersQueue::TSessionClosedEvent& event) { + ThrowOnError(event); + return EXIT_FAILURE; + } + + int TTopicWriter::HandleEvent(NPersQueue::TWriteSessionEvent::TEvent& event) { + return std::visit(TOverloaded{ + [&](const NPersQueue::TWriteSessionEvent::TAcksEvent& event) { + return HandleAcksEvent(event); + }, + [&](NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) { + return HandleReadyToAcceptEvent(event); + }, + [&](const NPersQueue::TSessionClosedEvent& event) { + return HandleSessionClosedEvent(event); + }, + }, + event); + } + + TTopicWriter::TSendMessageData TTopicWriter::EnterMessage(IInputStream& input) { + // TODO(shmel1k@): add interruption here. + // TODO(shmel1k@): add JSONStreamReader & etc interfaces. + // TODO(shmel1k@): add stream parsing here & improve performance. + if (!WriterParams_.Delimiter().Defined()) { + // TODO(shmel1k@): interruption? + return TSendMessageData{ + .Data = input.ReadAll(), + .NeedSend = true, + .ContinueSending = false, + }; + } + + TString buffer; + char delimiter = *(WriterParams_.Delimiter()); + size_t read = input.ReadTo(buffer, delimiter); + if (read == 0) { + return TSendMessageData{ + .Data = "", + .NeedSend = false, + .ContinueSending = false, + }; + } + return TSendMessageData{ + .Data = buffer, + .NeedSend = true, + .ContinueSending = true, + }; + } + + int TTopicWriter::Run(IInputStream& input) { + // TODO(shmel1k@): add notificator about failures. + bool continueSending = true; + while (continueSending) { + while (!ContinuationToken_.Defined()) { + TMaybe<NPersQueue::TWriteSessionEvent::TEvent> event = WriteSession_->GetEvent(true); + if (event.Empty()) { + continue; + } + if (int status = HandleEvent(*event); status) { + return status; + } + } + TTopicWriter::TSendMessageData message = EnterMessage(input); + continueSending = message.ContinueSending; + if (!message.NeedSend) { + continue; + } + + WriteSession_->Write(std::move(*ContinuationToken_), std::move(message.Data), CurrentSeqNo_++); + ContinuationToken_ = Nothing(); + } + + return EXIT_SUCCESS; + } + + bool TTopicWriter::Close(TDuration closeTimeout) { + Y_UNUSED(closeTimeout); + if (WriteSession_->Close(TDuration::Hours(12))) { + return true; + } + TVector<NPersQueue::TWriteSessionEvent::TEvent> events = WriteSession_->GetEvents(true); + if (events.empty()) { + return false; + } + for (auto& evt : events) { + bool hasFailure = false; + std::visit(TOverloaded{ + [&](const NPersQueue::TWriteSessionEvent::TAcksEvent& event) { + Y_UNUSED(event); + }, + [&](NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) { + Y_UNUSED(event); + }, + [&](const NPersQueue::TSessionClosedEvent& event) { + int result = HandleSessionClosedEvent(event); + if (result == EXIT_FAILURE) { + hasFailure = true; + } + Y_UNUSED(result); + }, + }, + evt); + if (hasFailure) { + return false; + } + } + return true; + } +} // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.h b/ydb/public/lib/ydb_cli/topic/topic_write.h new file mode 100644 index 0000000000..445e3c9016 --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/topic_write.h @@ -0,0 +1,79 @@ +#pragma once + +#include "ydb/public/lib/ydb_cli/commands/ydb_command.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> +#include <ydb/public/lib/ydb_cli/common/format.h> +#include <ydb/public/lib/ydb_cli/common/interruptible.h> + +namespace NYdb::NConsoleClient { +#define GETTER(TYPE, NAME) \ + TYPE NAME() const { \ + return NAME##_; \ + } + + class TTopicWriterParams { + public: + TTopicWriterParams(); + TTopicWriterParams(EOutputFormat inputFormat, TMaybe<TString> delimiter, + ui64 messageSizeLimit, TMaybe<TDuration> batchDuration, + TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount); + TTopicWriterParams(const TTopicWriterParams&) = default; + TTopicWriterParams(TTopicWriterParams&&) = default; + + GETTER(TMaybe<char>, Delimiter); + GETTER(TMaybe<TDuration>, BatchDuration); + GETTER(TMaybe<ui64>, BatchSize); + GETTER(TMaybe<ui64>, BatchMessagesCount); + GETTER(ui64, MessageSizeLimit); + GETTER(EOutputFormat, InputFormat); + + private: + TMaybe<TString> File_; + TMaybe<char> Delimiter_; + EOutputFormat InputFormat_ = EOutputFormat::Default; + + // TODO(shmel1k@): move to 'TWithBatchingCommand' or something like that. + TMaybe<TDuration> BatchDuration_; + TMaybe<ui64> BatchSize_; + TMaybe<ui64> BatchMessagesCount_; + + ui64 MessageSizeLimit_ = 0; + }; + + class TTopicWriterTests; + + // TODO(shmel1k@): think about interruption here. + class TTopicWriter { + public: + TTopicWriter(); + TTopicWriter(const TTopicWriter&) = default; + TTopicWriter(TTopicWriter&&) = default; + TTopicWriter(std::shared_ptr<NPersQueue::IWriteSession>, TTopicWriterParams); + + bool Close(TDuration closeTimeout = TDuration::Max()); + int Init(); + int Run(IInputStream&); + + private: + struct TSendMessageData { + TString Data = ""; + bool NeedSend = false; + bool ContinueSending = false; + }; + + int HandleEvent(NPersQueue::TWriteSessionEvent::TEvent&); + int HandleAcksEvent(const NPersQueue::TWriteSessionEvent::TAcksEvent&); + int HandleReadyToAcceptEvent(NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent&); + int HandleSessionClosedEvent(const NPersQueue::TSessionClosedEvent&); + TTopicWriter::TSendMessageData EnterMessage(IInputStream&); // TODO(shmel1k@): make static or like a helper function + + std::shared_ptr<NPersQueue::IWriteSession> WriteSession_; + const TTopicWriterParams WriterParams_; + + TMaybe<NPersQueue::TContinuationToken> ContinuationToken_ = Nothing(); + + ui64 CurrentSeqNo_ = 0; + + friend class TTopicWriterTests; + }; +} // namespace NYdb::NConsoleClient
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp new file mode 100644 index 0000000000..8540e64778 --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp @@ -0,0 +1,287 @@ +#include "topic_write.h" +#include <library/cpp/testing/unittest/registar.h> + +namespace NYdb::NConsoleClient { + class TTopicWriterTests: public NUnitTest::TTestBase { + using TMessages = TVector<TTopicWriter::TSendMessageData>; + + public: + UNIT_TEST_SUITE(TTopicWriterTests); + UNIT_TEST(TestEnterMessage_EmptyInput); + UNIT_TEST(TestEnterMessage_OnlyDelimiters); + UNIT_TEST(TestEnterMessage_1KiB_No_Delimiter); + UNIT_TEST(TestEnterMessage_1KiB_Newline_Delimiter); + UNIT_TEST(TestEnterMessage_1KiB_Newline_Delimited_With_Two_Delimiters_In_A_Row); + UNIT_TEST(TestEnterMessage_SomeBinaryData); + UNIT_TEST(TestEnterMessage_ZeroSymbol_Delimited); + UNIT_TEST(TestEnterMessage_Custom_Delimiter_Delimited); + + UNIT_TEST(TestTopicWriterParams_Format_NewlineDelimited); + UNIT_TEST(TestTopicWriterParams_Format_Concatenated); + UNIT_TEST(TestTopicWriterParams_No_Delimiter); + UNIT_TEST(TestTopicWriterParams_InvalidDelimiter); + UNIT_TEST_SUITE_END(); + + void TestEnterMessage_EmptyInput() { + TStringStream str; + TTopicWriter wr; + TMessages got = EnterMessageHelper(wr, str); + AssertMessagesEqual({{ + .Data = "", + .NeedSend = true, + .ContinueSending = false, + }}, + got); + } + + void TestEnterMessage_OnlyDelimiters() { + TStringStream str = TString("\n") * 6; + TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\n", 0, Nothing(), Nothing(), Nothing())); + TMessages got = EnterMessageHelper(wr, str); + AssertMessagesEqual({ + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = false, + .ContinueSending = false, + }, + }, + got); + } + + void TestEnterMessage_1KiB_No_Delimiter() { + /* + Only one message with a * 1024 size expected + */ + TStringStream str = TString("a") * 1_KB; + TTopicWriter wr; + TMessages got = EnterMessageHelper(wr, str); + AssertMessagesEqual({ + { + .Data = TString("a") * 1_KB, + .NeedSend = true, + .ContinueSending = false, + }, + }, + got); + } + + void TestEnterMessage_1KiB_Newline_Delimiter() { + TStringStream str = TString("a") * 512 + "\n" + TString("b") * 512; + TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\n", 0, Nothing(), Nothing(), Nothing())); + TMessages got = EnterMessageHelper(wr, str); + AssertMessagesEqual({ + { + .Data = TString("a") * 512, + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = TString("b") * 512, + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = false, + .ContinueSending = false, + }, + }, + got); + } + + void TestEnterMessage_1KiB_Newline_Delimited_With_Two_Delimiters_In_A_Row() { + TStringStream str = TString("a") * 512 + "\n\n" + TString("b") * 512; + TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing())); + TMessages got = EnterMessageHelper(wr, str); + AssertMessagesEqual({ + { + .Data = TString("a") * 512, + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = TString("b") * 512, + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = false, + .ContinueSending = false, + }, + }, + got); + } + + void TestEnterMessage_SomeBinaryData() { + TStringStream str = TString("\0\0\n\n\r\n\r\n"); + TTopicWriter wr; + TMessages got = EnterMessageHelper(wr, str); + AssertMessagesEqual({ + { + .Data = TString("\0\0\n\n\r\n\r\n"), + .NeedSend = true, + .ContinueSending = false, + }, + }, + got); + } + + void TestEnterMessage_ZeroSymbol_Delimited() { + auto& s = "\0\0\0\0\n\nprivet"; + TStringStream str = TString(std::begin(s), std::end(s)); + TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\0", 0, Nothing(), Nothing(), Nothing())); + TMessages got = EnterMessageHelper(wr, str); + AssertMessagesEqual({ + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "\n\nprivet", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = false, + .ContinueSending = false, + }, + }, + got); + } + + void TestEnterMessage_Custom_Delimiter_Delimited() { + TStringStream str = TString("privet_vasya_kak_dela?"); + TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "_", 0, Nothing(), Nothing(), Nothing())); + TMessages got = EnterMessageHelper(wr, str); + AssertMessagesEqual({ + { + .Data = "privet", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "vasya", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "kak", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "dela?", + .NeedSend = true, + .ContinueSending = true, + }, + { + .Data = "", + .NeedSend = false, + .ContinueSending = false, + }, + }, + got); + } + + void TestTopicWriterParams_Format_NewlineDelimited() { + TTopicWriterParams p = TTopicWriterParams(EOutputFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing()); + UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::NewlineDelimited); + UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n'); + } + + void TestTopicWriterParams_Format_Concatenated() { + TTopicWriterParams p = TTopicWriterParams(EOutputFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing()); + UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::Concatenated); + UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n'); + } + + void TestTopicWriterParams_No_Delimiter() { + TTopicWriterParams p = TTopicWriterParams(EOutputFormat::Default, Nothing(), 0, Nothing(), Nothing(), Nothing()); + UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::Default); + UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), Nothing()); + } + + void TestTopicWriterParams_InvalidDelimiter() { + UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EOutputFormat::Default, "invalid", 0, Nothing(), Nothing(), Nothing()), yexception); + } + + private: + TMessages EnterMessageHelper(TTopicWriter& wr, IInputStream& input) { + TMessages result; + while (true) { + TTopicWriter::TSendMessageData message = wr.EnterMessage(input); + result.emplace_back(message); + if (result.back().ContinueSending == false) { + break; + } + } + return result; + } + + void AssertMessagesEqual(TMessages want, TMessages& got) { + UNIT_ASSERT_VALUES_EQUAL(want.size(), got.size()); + for (size_t i = 0; i < want.size(); ++i) { + TTopicWriter::TSendMessageData w = want[i]; + TTopicWriter::TSendMessageData g = got[i]; + UNIT_ASSERT_VALUES_EQUAL(g.ContinueSending, w.ContinueSending); + UNIT_ASSERT_VALUES_EQUAL(g.Data, w.Data); + UNIT_ASSERT_VALUES_EQUAL(g.NeedSend, w.NeedSend); + } + } + }; + + UNIT_TEST_SUITE_REGISTRATION(TTopicWriterTests); +} // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.darwin.txt b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.darwin.txt new file mode 100644 index 0000000000..7d61324672 --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.darwin.txt @@ -0,0 +1,60 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-public-lib-ydb_cli-topic-ut) +target_compile_options(ydb-public-lib-ydb_cli-topic-ut PRIVATE + -DACTORLIB_HUGE_PB_SIZE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-public-lib-ydb_cli-topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic +) +target_link_libraries(ydb-public-lib-ydb_cli-topic-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + topic + cpp-histogram-hdr + cpp-threading-local_executor + yq-libs-private_client + cpp-client-ydb_persqueue_public + public-lib-experimental + clicommands + common + public-lib-yq + public-lib-yson_value + cpp-client-ydb_proto + ydb_persqueue_core-ut-ut_utils +) +target_link_options(ydb-public-lib-ydb_cli-topic-ut PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-public-lib-ydb_cli-topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp +) +add_test( + NAME + ydb-public-lib-ydb_cli-topic-ut + COMMAND + ydb-public-lib-ydb_cli-topic-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-public-lib-ydb_cli-topic-ut) diff --git a/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.linux.txt b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.linux.txt new file mode 100644 index 0000000000..d23103b4dc --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.linux.txt @@ -0,0 +1,64 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-public-lib-ydb_cli-topic-ut) +target_compile_options(ydb-public-lib-ydb_cli-topic-ut PRIVATE + -DACTORLIB_HUGE_PB_SIZE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-public-lib-ydb_cli-topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic +) +target_link_libraries(ydb-public-lib-ydb_cli-topic-ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + topic + cpp-histogram-hdr + cpp-threading-local_executor + yq-libs-private_client + cpp-client-ydb_persqueue_public + public-lib-experimental + clicommands + common + public-lib-yq + public-lib-yson_value + cpp-client-ydb_proto + ydb_persqueue_core-ut-ut_utils +) +target_link_options(ydb-public-lib-ydb_cli-topic-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-public-lib-ydb_cli-topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp +) +add_test( + NAME + ydb-public-lib-ydb_cli-topic-ut + COMMAND + ydb-public-lib-ydb_cli-topic-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-public-lib-ydb_cli-topic-ut) diff --git a/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.txt b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.txt new file mode 100644 index 0000000000..fc7b1ee73c --- /dev/null +++ b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX AND NOT APPLE) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp index 5aa49006b8..17bc321c25 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp @@ -191,4 +191,9 @@ TThreadPoolExecutor::TThreadPoolExecutor(size_t threadsCount) ThreadsCount = threadsCount; } +IExecutor::TPtr CreateSyncExecutor() +{ + return MakeIntrusive<TSyncExecutor>(); +} + } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index edb4a770ad..e5de6e3b64 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -168,8 +168,10 @@ private: }; template <bool UseMigrationProtocol> -class TDataDecompressionInfo { +class TDataDecompressionInfo : public std::enable_shared_from_this<TDataDecompressionInfo<UseMigrationProtocol>> { public: + using TPtr = std::shared_ptr<TDataDecompressionInfo<UseMigrationProtocol>>; + TDataDecompressionInfo(const TDataDecompressionInfo&) = default; TDataDecompressionInfo(TDataDecompressionInfo&&) = default; TDataDecompressionInfo( @@ -205,6 +207,10 @@ public: return ServerMessage; } + bool GetDoDecompress() const { + return DoDecompress; + } + TMaybe<std::pair<size_t, size_t>> GetReadyThreshold() const { size_t readyCount = 0; std::pair<size_t, size_t> ret; @@ -252,7 +258,7 @@ private: }; struct TDecompressionTask { - explicit TDecompressionTask(TDataDecompressionInfo* parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TReadyMessageThreshold* ready); + TDecompressionTask(TDataDecompressionInfo::TPtr parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TReadyMessageThreshold* ready); // Decompress and notify about memory consumption changes. void operator()(); @@ -267,7 +273,7 @@ private: } private: - TDataDecompressionInfo* Parent; + TDataDecompressionInfo::TPtr Parent; TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream; i64 SourceDataSize = 0; i64 EstimatedDecompressedSize = 0; @@ -299,6 +305,33 @@ private: TAdaptiveLock DecompressionErrorsStructLock; std::vector<std::vector<std::exception_ptr>> DecompressionErrors; }; + +template <bool UseMigrationProtocol> +class TDataDecompressionEvent { +public: + TDataDecompressionEvent(size_t batch, size_t message, typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, std::atomic<bool> &ready) : + Batch{batch}, + Message{message}, + Parent{std::move(parent)}, + Ready{ready} + { + } + + bool IsReady() const { + return Ready; + } + + bool TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, + size_t* maxByteSize) const; + +private: + size_t Batch; + size_t Message; + typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr Parent; + std::atomic<bool> &Ready; +}; template <bool UseMigrationProtocol> struct IUserRetrievedEventCallback { @@ -317,6 +350,7 @@ struct TReadSessionEventInfo { // Event with only partition stream ref. // Partition stream holds all its events. TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream; + size_t DataCount = 0; TMaybe<TEvent> Event; std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> Session; @@ -333,34 +367,16 @@ struct TReadSessionEventInfo { // Data event. TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session); - TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, - TVector<TMessage> messages, - TVector<TCompressedMessage> compressedMessages); - bool IsEmpty() const; bool IsDataEvent() const; - // Takes data. Returns true if event has more unpacked data. - bool TakeData(TVector<TMessage>* messages, - TVector<TCompressedMessage>* comressedMessages, - size_t* maxByteSize); - TEvent& GetEvent() { Y_ASSERT(Event); return *Event; } - // Move event to partition stream queue. - void MoveToPartitionStream(); - - void ExtractFromPartitionStream(); - void OnUserRetrievedEvent(); - bool HasMoreData() const; // Has unread data. - bool HasReadyUnreadData() const; // Has ready unread data. - bool IsSessionClosedEvent() const { return Event && std::holds_alternative<TASessionClosedEvent<UseMigrationProtocol>>(*Event); } @@ -371,18 +387,20 @@ template <bool UseMigrationProtocol> struct TRawPartitionStreamEvent { using TEvent = typename TAReadSessionEvent<UseMigrationProtocol>::TEvent; - std::variant<TDataDecompressionInfo<UseMigrationProtocol>, TEvent> Event; - bool Signalled = false; + std::variant<TDataDecompressionEvent<UseMigrationProtocol>, TEvent> Event; TRawPartitionStreamEvent(const TRawPartitionStreamEvent&) = default; TRawPartitionStreamEvent(TRawPartitionStreamEvent&&) = default; - TRawPartitionStreamEvent( - TPartitionData<UseMigrationProtocol>&& msg, - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, - bool doDecompress - ) - : Event(std::in_place_type_t<TDataDecompressionInfo<UseMigrationProtocol>>(), std::move(msg), std::move(session), doDecompress) + TRawPartitionStreamEvent(size_t batch, + size_t message, + typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, + std::atomic<bool> &ready) + : Event(std::in_place_type_t<TDataDecompressionEvent<UseMigrationProtocol>>(), + batch, + message, + std::move(parent), + ready) { } @@ -393,17 +411,12 @@ struct TRawPartitionStreamEvent { } bool IsDataEvent() const { - return std::holds_alternative<TDataDecompressionInfo<UseMigrationProtocol>>(Event); - } - - const TDataDecompressionInfo<UseMigrationProtocol>& GetData() const { - Y_ASSERT(IsDataEvent()); - return std::get<TDataDecompressionInfo<UseMigrationProtocol>>(Event); + return std::holds_alternative<TDataDecompressionEvent<UseMigrationProtocol>>(Event); } - TDataDecompressionInfo<UseMigrationProtocol>& GetData() { + const TDataDecompressionEvent<UseMigrationProtocol>& GetDataEvent() const { Y_ASSERT(IsDataEvent()); - return std::get<TDataDecompressionInfo<UseMigrationProtocol>>(Event); + return std::get<TDataDecompressionEvent<UseMigrationProtocol>>(Event); } TEvent& GetEvent() { @@ -417,13 +430,60 @@ struct TRawPartitionStreamEvent { } bool IsReady() const { - return !IsDataEvent() || GetData().IsReady(); - } + if (!IsDataEvent()) { + return true; + } - void Signal(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TReadSessionEventsQueue<UseMigrationProtocol>* queue, TDeferredActions<UseMigrationProtocol>& deferred); + return std::get<TDataDecompressionEvent<UseMigrationProtocol>>(Event).IsReady(); + } }; +template <bool UseMigrationProtocol> +class TRawPartitionStreamEventQueue { +public: + TRawPartitionStreamEventQueue() = default; + + template <class... Ts> + TRawPartitionStreamEvent<UseMigrationProtocol>& emplace_back(Ts&&... event) + { + return NotReady.emplace_back(std::forward<Ts>(event)...); + } + + bool empty() const + { + return Ready.empty() && NotReady.empty(); + } + + TRawPartitionStreamEvent<UseMigrationProtocol>& front() + { + Y_VERIFY(!empty()); + + return (Ready.empty() ? NotReady : Ready).front(); + } + + void pop_front() + { + Y_VERIFY(!empty()); + (Ready.empty() ? NotReady : Ready).pop_front(); + } + + void pop_back() + { + Y_VERIFY(!empty()); + + (NotReady.empty() ? Ready : NotReady).pop_back(); + } + + void SignalReadyEvents(TPartitionStreamImpl<UseMigrationProtocol>& stream, + TReadSessionEventsQueue<UseMigrationProtocol>& queue, + TDeferredActions<UseMigrationProtocol>& deferred); + void DeleteNotReadyTail(); + +private: + std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> Ready; + std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> NotReady; +}; template <bool UseMigrationProtocol> class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> { @@ -517,12 +577,13 @@ public: EventsQueue.emplace_back(std::forward<T>(event)); } - TDataDecompressionInfo<UseMigrationProtocol>& InsertDataEvent( - TPartitionData<UseMigrationProtocol>&& msg, - bool doDecompress - ) { + void InsertDataEvent(size_t batch, + size_t message, + typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, + std::atomic<bool> &ready) + { ++DataDecompressionEventsCount; - return EventsQueue.emplace_back(std::move(msg), Session, doDecompress).GetData(); + EventsQueue.emplace_back(batch, message, std::move(parent), ready); } bool IsWaitingForDataDecompression() const { @@ -537,10 +598,6 @@ public: return EventsQueue.front(); } - const TRawPartitionStreamEvent<UseMigrationProtocol>& TopEvent() const { - return EventsQueue.front(); - } - void PopEvent() { if (EventsQueue.front().IsDataEvent()) { --DataDecompressionEventsCount; @@ -615,6 +672,7 @@ public: return true; } + void DeleteNotReadyTail(); private: const TKey Key; @@ -622,7 +680,7 @@ private: ui64 FirstNotReadOffset; std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler; - std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> EventsQueue; + TRawPartitionStreamEventQueue<UseMigrationProtocol> EventsQueue; size_t DataDecompressionEventsCount = 0; ui64 MaxReadOffset = 0; ui64 MaxCommittedOffset = 0; @@ -646,50 +704,45 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti public: explicit TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session); - TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> GetDataEventImpl(TReadSessionEventInfo<UseMigrationProtocol>& srcDataEventInfo, size_t* maxByteSize); // Assumes that we're under lock. + typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent + GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, size_t* maxByteSize); // Assumes that we're under lock. - TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> TryGetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock. + TReadSessionEventInfo<UseMigrationProtocol> GetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock. Y_ASSERT(TParent::HasEventsImpl()); - TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages; + if (!TParent::Events.empty()) { - TReadSessionEventInfo<UseMigrationProtocol> event = std::move(TParent::Events.front()); - TParent::Events.pop(); - TParent::RenewWaiterImpl(); - auto partitionStream = event.PartitionStream; + TReadSessionEventInfo<UseMigrationProtocol>& front = TParent::Events.front(); + auto partitionStream = front.PartitionStream; if (!partitionStream->HasEvents()) { Y_FAIL("can't be here - got events in global queue, but nothing in partition queue"); - return Nothing(); } + TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> event; + if (partitionStream->TopEvent().IsDataEvent()) { - return GetDataEventImpl(event, maxByteSize); + event = GetDataEventImpl(partitionStream, maxByteSize); + } else { + event = std::move(partitionStream->TopEvent().GetEvent()); + partitionStream->PopEvent(); + + TParent::Events.pop(); } - event = TReadSessionEventInfo<UseMigrationProtocol>(partitionStream.Get(), event.Session, partitionStream->TopEvent().GetEvent()); - partitionStream->PopEvent(); - return event; + TParent::RenewWaiterImpl(); + + return {partitionStream, front.Session, std::move(*event)}; } Y_ASSERT(TParent::CloseEvent); - return TReadSessionEventInfo<UseMigrationProtocol>(*TParent::CloseEvent, Session); - } - TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> GetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock and that the event queue has events. - do { - TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> result = TryGetEventImpl(maxByteSize); // We could have read all the data in current message previous time. - if (result) { - return result; - } - } while (TParent::HasEventsImpl()); - return Nothing(); + return {*TParent::CloseEvent, Session}; } TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(), size_t maxByteSize = std::numeric_limits<size_t>::max()) { TVector<TReadSessionEventInfo<UseMigrationProtocol>> eventInfos; const size_t maxCount = maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max(); TDeferredActions<UseMigrationProtocol> deferred; - std::vector<TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>> partitionStreamsForSignalling; with_lock (TParent::Mutex) { eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount)); do { @@ -700,23 +753,14 @@ public: ApplyCallbacksToReadyEventsImpl(deferred); while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) { - TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> event = GetEventImpl(&maxByteSize); - if (event) { - const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStreamForSignalling = event->IsDataEvent() ? event->PartitionStream : nullptr; - eventInfos.emplace_back(std::move(*event)); - if (eventInfos.back().IsSessionClosedEvent()) { - break; - } - if (partitionStreamForSignalling) { - partitionStreamsForSignalling.emplace_back(std::move(partitionStreamForSignalling)); - } + TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(&maxByteSize); + eventInfos.emplace_back(std::move(event)); + if (eventInfos.back().IsSessionClosedEvent()) { + break; } } } while (block && (eventInfos.empty() || eventInfos.back().IsSessionClosedEvent())); ApplyCallbacksToReadyEventsImpl(deferred); - for (const auto& partitionStreamForSignalling : partitionStreamsForSignalling) { - SignalReadyEventsImpl(partitionStreamForSignalling.Get(), deferred); - } } TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> result; @@ -732,7 +776,6 @@ public: TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo; TDeferredActions<UseMigrationProtocol> deferred; with_lock (TParent::Mutex) { - TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStreamForSignalling; do { if (block) { TParent::WaitEventsImpl(); @@ -742,17 +785,11 @@ public: if (TParent::HasEventsImpl()) { eventInfo = GetEventImpl(&maxByteSize); - if (eventInfo && eventInfo->IsDataEvent()) { - partitionStreamForSignalling = eventInfo->PartitionStream; - } } else if (!appliedCallbacks) { return Nothing(); } } while (block && !eventInfo); ApplyCallbacksToReadyEventsImpl(deferred); - if (partitionStreamForSignalling) { - SignalReadyEventsImpl(partitionStreamForSignalling.Get(), deferred); - } } if (eventInfo) { eventInfo->OnUserRetrievedEvent(); @@ -780,12 +817,17 @@ public: bool ApplyCallbacksToReadyEventsImpl(TDeferredActions<UseMigrationProtocol>& deferred); // Push usual event. - void PushEvent(TReadSessionEventInfo<UseMigrationProtocol> eventInfo, TDeferredActions<UseMigrationProtocol>& deferred); + void PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, + typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event, + TDeferredActions<UseMigrationProtocol>& deferred); // Push data event. - TDataDecompressionInfo<UseMigrationProtocol>* - PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, - TPartitionData<UseMigrationProtocol>&& msg); + void PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, + size_t batch, + size_t message, + typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, + std::atomic<bool> &ready); void SignalEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TDeferredActions<UseMigrationProtocol>& deferred); // Assumes that we're under lock. @@ -1088,13 +1130,14 @@ private: }; struct TDecompressionQueueItem { - TDecompressionQueueItem(TDataDecompressionInfo<UseMigrationProtocol>* batchInfo, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) - : BatchInfo(batchInfo) + TDecompressionQueueItem(typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr batchInfo, + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) + : BatchInfo(std::move(batchInfo)) , PartitionStream(std::move(partitionStream)) { } - TDataDecompressionInfo<UseMigrationProtocol>* BatchInfo; + typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr BatchInfo; TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream; }; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 85ae2de2e9..208eeec8f1 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -118,15 +118,47 @@ void TPartitionStreamImpl<UseMigrationProtocol>::ResumeReading() { template<bool UseMigrationProtocol> void TPartitionStreamImpl<UseMigrationProtocol>::SignalReadyEvents(TReadSessionEventsQueue<UseMigrationProtocol>* queue, TDeferredActions<UseMigrationProtocol>& deferred) { - for (auto& event : EventsQueue) { - event.Signal(this, queue, deferred); + Y_VERIFY(queue); + EventsQueue.SignalReadyEvents(*this, *queue, deferred); +} + +template<bool UseMigrationProtocol> +void TPartitionStreamImpl<UseMigrationProtocol>::DeleteNotReadyTail() +{ + EventsQueue.DeleteNotReadyTail(); +} + +template<bool UseMigrationProtocol> +void TRawPartitionStreamEventQueue<UseMigrationProtocol>::SignalReadyEvents(TPartitionStreamImpl<UseMigrationProtocol>& stream, + TReadSessionEventsQueue<UseMigrationProtocol>& queue, + TDeferredActions<UseMigrationProtocol>& deferred) +{ + while (!NotReady.empty() && NotReady.front().IsReady()) { + auto& event = NotReady.front(); + + queue.SignalEventImpl(&stream, deferred); + Ready.push_back(std::move(event)); + NotReady.pop_front(); + } +} + +template<bool UseMigrationProtocol> +void TRawPartitionStreamEventQueue<UseMigrationProtocol>::DeleteNotReadyTail() +{ + std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> head; + + for (auto& event : NotReady) { if (!event.IsReady()) { break; } + + head.push_back(std::move(event)); } -} + swap(head, NotReady); +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TSingleClusterReadSessionImpl @@ -511,20 +543,23 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream return; } - using TClosedEvent = typename std::conditional_t<UseMigrationProtocol, NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, - NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>; + using TClosedEvent = std::conditional_t< + UseMigrationProtocol, + NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, + NTopic::TReadSessionEvent::TPartitionSessionClosedEvent + >; CookieMapping.RemoveMapping(GetPartitionStreamId(partitionStream)); PartitionStreams.erase(partitionStream->GetAssignId()); if constexpr (UseMigrationProtocol) { - EventsQueue->PushEvent({partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), - TClosedEvent(partitionStream, TClosedEvent::EReason::DestroyConfirmedByUser)}, - deferred); + EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + TClosedEvent(partitionStream, TClosedEvent::EReason::DestroyConfirmedByUser), + deferred); } else { - EventsQueue->PushEvent({partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), - TClosedEvent(partitionStream, TClosedEvent::EReason::StopConfirmedByUser)}, - deferred); + EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + TClosedEvent(partitionStream, TClosedEvent::EReason::StopConfirmedByUser), + deferred); } TClientMessage<UseMigrationProtocol> req; @@ -859,8 +894,12 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( deferred); return; } - TDataDecompressionInfo<true>* decompressionInfo = EventsQueue->PushDataEvent(partitionStream, std::move(partitionData)); + + auto decompressionInfo = std::make_shared<TDataDecompressionInfo<true>>(std::move(partitionData), + shared_from_this(), + Settings.Decompress_); Y_VERIFY(decompressionInfo); + if (decompressionInfo) { DecompressionQueue.emplace_back(decompressionInfo, partitionStream); StartDecompressionTasksImpl(deferred); @@ -890,17 +929,17 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( if (currentPartitionStream) { CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId()); EventsQueue->PushEvent( - {currentPartitionStream, weak_from_this(), + currentPartitionStream, weak_from_this(), TReadSessionEvent::TPartitionStreamClosedEvent( - currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost)}, + currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost), deferred); } currentPartitionStream = partitionStream; // Send event to user. EventsQueue->PushEvent( - {partitionStream, weak_from_this(), - TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset())}, + partitionStream, weak_from_this(), + TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset()), deferred); } @@ -917,14 +956,14 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( if (msg.forceful_release()) { PartitionStreams.erase(msg.assign_id()); CookieMapping.RemoveMapping(partitionStream->GetPartitionStreamId()); - EventsQueue->PushEvent({partitionStream, weak_from_this(), + EventsQueue->PushEvent(partitionStream, weak_from_this(), TReadSessionEvent::TPartitionStreamClosedEvent( - partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost)}, + partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost), deferred); } else { EventsQueue->PushEvent( - {partitionStream, weak_from_this(), - TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset())}, + partitionStream, weak_from_this(), + TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset()), deferred); } } @@ -947,8 +986,8 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( } for (auto& [id, partitionStream] : partitionStreams) { EventsQueue->PushEvent( - {partitionStream, weak_from_this(), - TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset())}, + partitionStream, weak_from_this(), + TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset()), deferred); } @@ -958,8 +997,8 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( auto partitionStream = partitionStreamIt->second; partitionStream->UpdateMaxCommittedOffset(rangeProto.end_offset()); EventsQueue->PushEvent( - {partitionStream, weak_from_this(), - TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset())}, + partitionStream, weak_from_this(), + TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset()), deferred); } } @@ -974,11 +1013,11 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( if (partitionStreamIt == PartitionStreams.end()) { return; } - EventsQueue->PushEvent({partitionStreamIt->second, weak_from_this(), + EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(), TReadSessionEvent::TPartitionStreamStatusEvent( partitionStreamIt->second, msg.committed_offset(), 0, // TODO: support read offset in status - msg.end_offset(), TInstant::MilliSeconds(msg.write_watermark_ms()))}, + msg.end_offset(), TInstant::MilliSeconds(msg.write_watermark_ms())), deferred); } @@ -1055,8 +1094,12 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( return; } partitionStream->SetFirstNotReadOffset(desiredOffset); - TDataDecompressionInfo<false>* decompressionInfo = EventsQueue->PushDataEvent(partitionStream, std::move(partitionData)); + + auto decompressionInfo = std::make_shared<TDataDecompressionInfo<false>>(std::move(partitionData), + shared_from_this(), + Settings.Decompress_); Y_VERIFY(decompressionInfo); + if (decompressionInfo) { DecompressionQueue.emplace_back(decompressionInfo, partitionStream); StartDecompressionTasksImpl(deferred); @@ -1082,17 +1125,17 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( TIntrusivePtr<TPartitionStreamImpl<false>>& currentPartitionStream = PartitionStreams[partitionStream->GetAssignId()]; if (currentPartitionStream) { EventsQueue->PushEvent( - {currentPartitionStream, weak_from_this(), + currentPartitionStream, weak_from_this(), NTopic::TReadSessionEvent::TPartitionSessionClosedEvent( - currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost)}, + currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost), deferred); } currentPartitionStream = partitionStream; // Send event to user. - EventsQueue->PushEvent({partitionStream, weak_from_this(), + EventsQueue->PushEvent(partitionStream, weak_from_this(), NTopic::TReadSessionEvent::TStartPartitionSessionEvent( - partitionStream, msg.committed_offset(), msg.partition_offsets().end())}, + partitionStream, msg.committed_offset(), msg.partition_offsets().end()), deferred); } @@ -1108,14 +1151,14 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( TIntrusivePtr<TPartitionStreamImpl<false>> partitionStream = partitionStreamIt->second; if (!msg.graceful()) { PartitionStreams.erase(msg.partition_session_id()); - EventsQueue->PushEvent({partitionStream, weak_from_this(), + EventsQueue->PushEvent(partitionStream, weak_from_this(), NTopic::TReadSessionEvent::TPartitionSessionClosedEvent( - partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost)}, + partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost), deferred); } else { EventsQueue->PushEvent( - {partitionStream, weak_from_this(), - NTopic::TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset())}, + partitionStream, weak_from_this(), + NTopic::TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset()), deferred); } } @@ -1133,9 +1176,9 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( if (partitionStreamIt != PartitionStreams.end()) { auto partitionStream = partitionStreamIt->second; partitionStream->UpdateMaxCommittedOffset(rangeProto.committed_offset()); - EventsQueue->PushEvent({partitionStream, weak_from_this(), + EventsQueue->PushEvent(partitionStream, weak_from_this(), NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent( - partitionStream, rangeProto.committed_offset())}, + partitionStream, rangeProto.committed_offset()), deferred); } } @@ -1150,13 +1193,13 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( if (partitionStreamIt == PartitionStreams.end()) { return; } - EventsQueue->PushEvent({partitionStreamIt->second, weak_from_this(), + EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(), NTopic::TReadSessionEvent::TPartitionSessionStatusEvent( partitionStreamIt->second, msg.committed_offset(), 0, // TODO: support read offset in status msg.partition_offsets().end(), TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds( - msg.write_time_high_watermark())))}, + msg.write_time_high_watermark()))), deferred); } @@ -1198,13 +1241,15 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::StartDecompressionTask template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DestroyAllPartitionStreamsImpl(TDeferredActions<UseMigrationProtocol>& deferred) { - using TClosedEvent = - typename std::conditional_t<UseMigrationProtocol, NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, - NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>; + using TClosedEvent = std::conditional_t< + UseMigrationProtocol, + NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, + NTopic::TReadSessionEvent::TPartitionSessionClosedEvent + >; for (auto&& [key, partitionStream] : PartitionStreams) { - EventsQueue->PushEvent({partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), - TClosedEvent(std::move(partitionStream), TClosedEvent::EReason::ConnectionLost)}, + EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + TClosedEvent(std::move(partitionStream), TClosedEvent::EReason::ConnectionLost), deferred); } PartitionStreams.clear(); @@ -1482,42 +1527,11 @@ TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr template<bool UseMigrationProtocol> TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session) : PartitionStream(std::move(partitionStream)) + , DataCount(1) , Session(std::move(session)) {} template<bool UseMigrationProtocol> -TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, - TVector<TMessage> messages, - TVector<TCompressedMessage> compressedMessages) - : PartitionStream(std::move(partitionStream)) - , Event( - NMaybe::TInPlace(), - std::in_place_type_t<TDataReceivedEvent>(), - std::move(messages), - std::move(compressedMessages), - PartitionStream - ) - , Session(std::move(session)) -{ -} - -template<bool UseMigrationProtocol> -void TReadSessionEventInfo<UseMigrationProtocol>::MoveToPartitionStream() { - PartitionStream->InsertEvent(std::move(*Event)); - Event = Nothing(); - Y_ASSERT(PartitionStream->HasEvents()); -} - -template<bool UseMigrationProtocol> -void TReadSessionEventInfo<UseMigrationProtocol>::ExtractFromPartitionStream() { - if (!Event && !IsEmpty()) { - Event = std::move(PartitionStream->TopEvent().GetEvent()); - PartitionStream->PopEvent(); - } -} - -template<bool UseMigrationProtocol> bool TReadSessionEventInfo<UseMigrationProtocol>::IsEmpty() const { return !PartitionStream || !PartitionStream->HasEvents(); } @@ -1528,30 +1542,12 @@ bool TReadSessionEventInfo<UseMigrationProtocol>::IsDataEvent() const { } template<bool UseMigrationProtocol> -bool TReadSessionEventInfo<UseMigrationProtocol>::HasMoreData() const { - return PartitionStream->TopEvent().GetData().HasMoreData(); -} - -template<bool UseMigrationProtocol> -bool TReadSessionEventInfo<UseMigrationProtocol>::HasReadyUnreadData() const { - return PartitionStream->TopEvent().GetData().HasReadyUnreadData(); -} - -template<bool UseMigrationProtocol> void TReadSessionEventInfo<UseMigrationProtocol>::OnUserRetrievedEvent() { if (auto session = Session.lock()) { session->OnUserRetrievedEvent(*Event); } } -template<bool UseMigrationProtocol> -bool TReadSessionEventInfo<UseMigrationProtocol>::TakeData(TVector<TMessage>* messages, - TVector<TCompressedMessage>* compressedMessages, - size_t* maxByteSize) -{ - return PartitionStream->TopEvent().GetData().TakeData(PartitionStream, messages, compressedMessages, maxByteSize); -} - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TReadSessionEventsQueue @@ -1585,16 +1581,30 @@ TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue( } template <bool UseMigrationProtocol> -void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TReadSessionEventInfo<UseMigrationProtocol> eventInfo, - TDeferredActions<UseMigrationProtocol>& deferred) { +void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, + std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> /*session*/, + typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event, + TDeferredActions<UseMigrationProtocol>& deferred) +{ if (TParent::Closed) { return; } with_lock (TParent::Mutex) { - auto partitionStream = eventInfo.PartitionStream; - eventInfo.MoveToPartitionStream(); - SignalReadyEventsImpl(partitionStream.Get(), deferred); + using TClosedEvent = std::conditional_t< + UseMigrationProtocol, + NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, + NTopic::TReadSessionEvent::TPartitionSessionClosedEvent + >; + + if (std::holds_alternative<TClosedEvent>(event)) { + stream->DeleteNotReadyTail(); + } + + stream->InsertEvent(std::move(event)); + Y_ASSERT(stream->HasEvents()); + + SignalReadyEventsImpl(stream.Get(), deferred); } } @@ -1605,53 +1615,67 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalEventImpl( if (TParent::Closed) { return; } + auto session = partitionStream->GetSession(); - TParent::Events.emplace(std::move(partitionStream), std::move(session)); + + if (TParent::Events.empty()) { + TParent::Events.emplace(std::move(partitionStream), std::move(session)); + } else { + auto& event = TParent::Events.back(); + if (event.IsDataEvent() && + (event.PartitionStream == partitionStream)) { + ++event.DataCount; + } else { + TParent::Events.emplace(std::move(partitionStream), std::move(session)); + } + } + SignalWaiterImpl(deferred); } template <bool UseMigrationProtocol> -TDataDecompressionInfo<UseMigrationProtocol>* TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent( - TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, - TPartitionData<UseMigrationProtocol>&& msg) { +void TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + size_t batch, + size_t message, + typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, + std::atomic<bool>& ready) +{ if (this->Closed) { - return nullptr; + return; } with_lock (this->Mutex) { - return &partitionStream->InsertDataEvent(std::move(msg), this->Settings.Decompress_); + partitionStream->InsertDataEvent(batch, message, parent, ready); } } template <bool UseMigrationProtocol> -TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl( - TReadSessionEventInfo<UseMigrationProtocol>& srcDataEventInfo, +typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl( + TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, size_t* maxByteSize) { // Assumes that we're under lock. TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages; TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage> compressedMessages; - TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream = srcDataEventInfo.PartitionStream; - bool messageExtracted = false; - while (srcDataEventInfo.HasReadyUnreadData() && *maxByteSize > 0) { - const bool hasMoreUnpackedData = srcDataEventInfo.TakeData(&messages, &compressedMessages, maxByteSize); - if (!hasMoreUnpackedData) { - const bool messageIsFullyRead = !srcDataEventInfo.HasMoreData(); - if (messageIsFullyRead) { - partitionStream->PopEvent(); - messageExtracted = true; - break; - } - } - } - if (!messageExtracted) { - partitionStream->TopEvent().Signalled = false; + + Y_VERIFY(!TParent::Events.empty()); + + auto& event = TParent::Events.front(); + + Y_VERIFY(event.PartitionStream == stream); + Y_VERIFY(event.DataCount > 0); + + for (; (event.DataCount > 0) && (*maxByteSize > 0); --event.DataCount) { + stream->TopEvent().GetDataEvent().TakeData(stream, &messages, &compressedMessages, maxByteSize); + stream->PopEvent(); } - if (messages.empty() && compressedMessages.empty()) { - return Nothing(); + if (event.DataCount == 0) { + TParent::Events.pop(); } - return TReadSessionEventInfo<UseMigrationProtocol>(partitionStream, partitionStream->GetSession(), - std::move(messages), std::move(compressedMessages)); + + Y_VERIFY(!messages.empty() || !compressedMessages.empty()); + + return {std::move(messages), std::move(compressedMessages), stream}; } template <bool UseMigrationProtocol> @@ -1680,19 +1704,11 @@ bool TReadSessionEventsQueue<UseMigrationProtocol>::ApplyCallbacksToReadyEventsI bool applied = false; while (HasCallbackForNextEventImpl()) { size_t maxSize = std::numeric_limits<size_t>::max(); - TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo = GetEventImpl(&maxSize); - if (!eventInfo) { - break; - } - const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStreamForSignalling = - eventInfo->IsDataEvent() ? eventInfo->PartitionStream : nullptr; + TReadSessionEventInfo<UseMigrationProtocol> eventInfo = GetEventImpl(&maxSize); applied = true; - if (!ApplyHandler(*eventInfo, deferred)) { // Close session event. + if (!ApplyHandler(eventInfo, deferred)) { // Close session event. break; } - if (partitionStreamForSignalling) { - SignalReadyEventsImpl(partitionStreamForSignalling.Get(), deferred); - } } return applied; } @@ -1870,7 +1886,7 @@ i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks( std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Session.lock(); Y_ASSERT(session); ReadyThresholds.emplace_back(); - TDecompressionTask task(this, partitionStream, &ReadyThresholds.back()); + TDecompressionTask task(TDataDecompressionInfo::shared_from_this(), partitionStream, &ReadyThresholds.back()); i64 used = 0; while (availableMemory > 0 && !AllDecompressionTasksStarted()) { const auto& batch = ServerMessage.batches(CurrentDecompressingMessage.first); @@ -1884,6 +1900,12 @@ i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks( Y_VERIFY(estimatedDecompressedSize >= 0); task.Add(CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, size, estimatedDecompressedSize); + session->GetEventsQueue()->PushDataEvent(partitionStream, + CurrentDecompressingMessage.first, + CurrentDecompressingMessage.second, + TDataDecompressionInfo::shared_from_this(), + ReadyThresholds.back().Ready); + used += estimatedDecompressedSize; availableMemory -= estimatedDecompressedSize; } @@ -1896,7 +1918,7 @@ i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks( session->OnCreateNewDecompressionTask(); deferred.DeferStartExecutorTask(executor, std::move(task)); ReadyThresholds.emplace_back(); - task = TDecompressionTask(this, partitionStream, &ReadyThresholds.back()); + task = TDecompressionTask(TDataDecompressionInfo::shared_from_this(), partitionStream, &ReadyThresholds.back()); } } if (task.AddedMessagesCount() > 0) { @@ -2014,6 +2036,91 @@ bool TDataDecompressionInfo<UseMigrationProtocol>::TakeData(const TIntrusivePtr< << minOffset << "-" << maxOffset << ")"); return CurrentReadingMessage <= *readyThreshold; } + +template<bool UseMigrationProtocol> +bool TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages, + size_t* maxByteSize) const +{ + auto& msg = Parent->GetServerMessage(); + i64 minOffset = Max<i64>(); + i64 maxOffset = 0; + auto& batch = *msg.mutable_batches(Batch); + const auto& meta = Parent->GetBatchMeta(Batch); + + const TInstant batchWriteTimestamp = [&batch](){ + if constexpr (UseMigrationProtocol) { + return TInstant::MilliSeconds(batch.write_timestamp_ms()); + } else { + return TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(batch.written_at())); + } + }(); + auto& messageData = *batch.mutable_message_data(Message); + + minOffset = Min(minOffset, static_cast<i64>(messageData.offset())); + maxOffset = Max(maxOffset, static_cast<i64>(messageData.offset())); + + if constexpr (UseMigrationProtocol) { + TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(messageData.offset(), + batch.source_id(), + messageData.seq_no(), + TInstant::MilliSeconds(messageData.create_timestamp_ms()), + batchWriteTimestamp, + batch.ip(), + meta, + messageData.uncompressed_size()); + + if (Parent->GetDoDecompress()) { + messages->emplace_back(messageData.data(), + Parent->GetDecompressionError(Batch, Message), + messageInfo, + partitionStream, + messageData.partition_key(), + messageData.explicit_hash()); + } else { + compressedMessages->emplace_back(static_cast<ECodec>(messageData.codec()), + messageData.data(), + TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo}, + partitionStream, + messageData.partition_key(), + messageData.explicit_hash()); + } + } else { + NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(messageData.offset(), + batch.producer_id(), + messageData.seq_no(), + TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(messageData.created_at())), + batchWriteTimestamp, + meta, + messageData.uncompressed_size(), + messageData.message_group_id()); + + if (Parent->GetDoDecompress()) { + messages->emplace_back(messageData.data(), + Parent->GetDecompressionError(Batch, Message), + messageInfo, + partitionStream); + } else { + compressedMessages->emplace_back(static_cast<NTopic::ECodec>(batch.codec()), + messageData.data(), + messageInfo, + partitionStream); + } + } + + *maxByteSize -= Min(*maxByteSize, messageData.data().size()); + + // Clear data to free internal session's memory. + messageData.clear_data(); + + partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder() + << "Take Data. Partition " << partitionStream->GetPartitionId() + << ". Read: {" << Batch << ", " << Message << "} (" + << minOffset << "-" << maxOffset << ")"); + + return false; +} template<bool UseMigrationProtocol> bool TDataDecompressionInfo<UseMigrationProtocol>::HasReadyUnreadData() const { @@ -2040,9 +2147,9 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::Add(size_ template <bool UseMigrationProtocol> TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::TDecompressionTask( - TDataDecompressionInfo* parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + TDataDecompressionInfo::TPtr parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TReadyMessageThreshold* ready) - : Parent(parent) + : Parent(std::move(parent)) , PartitionStream(std::move(partitionStream)) , Ready(ready) { } @@ -2130,19 +2237,6 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator( } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// TRawPartitionStreamEvent - -template <bool UseMigrationProtocol> -void TRawPartitionStreamEvent<UseMigrationProtocol>::Signal(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, - TReadSessionEventsQueue<UseMigrationProtocol>* queue, - TDeferredActions<UseMigrationProtocol>& deferred) { - if (!Signalled) { - Signalled = true; - queue->SignalEventImpl(partitionStream, deferred); - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TDeferredActions template<bool UseMigrationProtocol> diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index 20e7318421..f069852bf8 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -261,7 +261,7 @@ void TWriteSession::OnCdsResponse( void TWriteSession::InitWriter() { // No Lock, very initial start - no race yet as well. CompressionExecutor = Settings.CompressionExecutor_; IExecutor::TPtr executor; - executor = new TSyncExecutor(); + executor = CreateSyncExecutor(); executor->Start(); Executor = std::move(executor); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index aae392d7b6..5aa2e95386 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -948,9 +948,12 @@ private: bool Started = false; TAdaptiveLock StartLock; }; + IExecutor::TPtr CreateThreadPoolExecutorAdapter(std::shared_ptr<IThreadPool> threadPool); // Thread pool is expected to have been started. IExecutor::TPtr CreateThreadPoolExecutor(size_t threads); +IExecutor::TPtr CreateSyncExecutor(); + //! Events for write session. struct TWriteSessionEvent { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp index 0619536fb6..4bfe965309 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp @@ -184,7 +184,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); TWriteSessionSettings writeSettings; writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id"); - IExecutor::TPtr executor = new TSyncExecutor(); + IExecutor::TPtr executor = CreateSyncExecutor(); writeSettings.CompressionExecutor(executor); // LOGBROKER-7189 //SimpleWriteAndValidateData(setup.get(), writeSettings, 100u, false); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 10c5ec57f1..83b52aa63c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -1180,13 +1180,14 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { .CompressMessage(i, TStringBuilder() << "message" << i)); // Callback will be called. } - for (ui64 i = 1; i <= 2; ++i) { + for (ui64 i = 1; i <= 2; ) { TMaybe<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); auto& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(*event); - UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages()[0].GetData(), TStringBuilder() << "message" << i); + for (ui32 j = 0; j < dataEvent.GetMessages().size(); ++j, ++i) { + UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages()[j].GetData(), TStringBuilder() << "message" << i); + } } setup.AssertNoEvents(); @@ -1588,13 +1589,15 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { } })); - for (int i = 0; i < 2; ++i) { + for (int i = 0; i < 2; ) { TMaybe<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true); UNIT_ASSERT(event); UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent); TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(*event); Cerr << "got data event: " << dataEvent.DebugString() << "\n"; dataEvent.Commit(); + + i += dataEvent.GetMessagesCount(); } UNIT_ASSERT(has1); @@ -1661,11 +1664,13 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { auto calledPromise = NThreading::NewPromise<void>(); int time = 0; setup.Settings.EventHandlers_.DataReceivedHandler([&](TReadSessionEvent::TDataReceivedEvent& event) { - ++time; - UNIT_ASSERT_VALUES_EQUAL(event.GetMessages().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetData(), TStringBuilder() << "message" << time); - if (time == 2) { - calledPromise.SetValue(); + for (ui32 i = 0; i < event.GetMessages().size(); ++i) { + ++time; + UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[i].GetData(), TStringBuilder() << "message" << time); + + if (time == 2) { + calledPromise.SetValue(); + } } }); setup.SuccessfulInit(); @@ -1678,6 +1683,13 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { .PartitionData(2) .Batch("src_id") .CompressMessage(2, "message2")); + + // + // when the PartitionStreamClosed arrives the raw messages are deleted + // we give time to process the messages + // + Sleep(TDuration::Seconds(2)); + setup.MockProcessor->AddServerResponse(TMockReadSessionProcessor::TServerReadInfo() .ForcefulReleasePartitionStream()); TMaybe<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 3855269347..91683d5490 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -4,6 +4,7 @@ #include <ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h> #include <ydb/services/persqueue_v1/ut/test_utils.h> #include <ydb/services/persqueue_v1/ut/persqueue_test_fixture.h> +#include <ydb/services/persqueue_v1/ut/functions_executor_wrapper.h> #include <ydb/core/base/appdata.h> #include <ydb/core/mon/sync_http_mon.h> @@ -1805,18 +1806,114 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } } - Y_UNIT_TEST(CheckKillBalancer) { + Y_UNIT_TEST(EventBatching) { NPersQueue::TTestServer server; server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY}); PrepareForGrpc(server); - TPQDataWriter writer("source1", server); + auto driver = server.AnnoyingClient->GetDriver(); + auto decompressor = CreateSyncExecutorWrapper(); + + NYdb::NPersQueue::TReadSessionSettings settings; + settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"}); + settings.DecompressionExecutor(decompressor); + auto reader = CreateReader(*driver, settings); + + for (ui32 i = 0; i < 2; ++i) { + auto msg = reader->GetEvent(true, 1); + UNIT_ASSERT(msg); + + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg); + UNIT_ASSERT(ev); + + ev->Confirm(); + } + + auto writeDataAndWaitForDecompressionTasks = [&](const TString &message, + const TString &sourceId, + ui32 partitionId, + size_t tasksCount) { + // + // write data + // + auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, sourceId, partitionId, "raw"); + writer->Write(message, 1); + + writer->Close(TDuration::Seconds(10)); + + // + // wait for decompression tasks + // + while (decompressor->GetFuncsCount() < tasksCount) { + Sleep(TDuration::Seconds(1)); + } + }; + + // + // stream #1: [0-, 2-] + // stream #2: [1-, 3-] + // session : [] + // + writeDataAndWaitForDecompressionTasks("111", "source_id_0", 1, 1); // 0 + writeDataAndWaitForDecompressionTasks("333", "source_id_1", 2, 2); // 1 + writeDataAndWaitForDecompressionTasks("222", "source_id_2", 1, 3); // 2 + writeDataAndWaitForDecompressionTasks("444", "source_id_3", 2, 4); // 3 + + // + // stream #1: [0+, 2+] + // stream #2: [1+, 3+] + // session : [(#1: 1), (#2: 1), (#1, 1)] + // + decompressor->StartFuncs({0, 3, 1, 2}); + + auto messages = reader->GetEvents(true); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); + + { + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&messages[0]); + UNIT_ASSERT(ev); + UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages().size(), 1); + + UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages()[0].GetData(), "111"); + } + + { + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&messages[1]); + UNIT_ASSERT(ev); + UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages().size(), 2); + + UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages()[0].GetData(), "333"); + UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages()[1].GetData(), "444"); + } + + { + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&messages[2]); + UNIT_ASSERT(ev); + UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages()[0].GetData(), "222"); + } + + // + // stream #1: [] + // stream #2: [] + // session : [] + // + auto msg = reader->GetEvent(false); + UNIT_ASSERT(!msg); + } + + Y_UNIT_TEST(CheckKillBalancer) { + NPersQueue::TTestServer server; + server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY}); + PrepareForGrpc(server); auto driver = server.AnnoyingClient->GetDriver(); + auto decompressor = CreateThreadPoolExecutorWrapper(2); NYdb::NPersQueue::TReadSessionSettings settings; settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"}); + settings.DecompressionExecutor(decompressor); auto reader = CreateReader(*driver, settings); @@ -1834,18 +1931,31 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } + for (ui32 i = 0; i < 10; ++i) { + auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "source" << i); + bool res = writer->Write("valuevaluevalue", 1); + UNIT_ASSERT(res); + res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + } - server.AnnoyingClient->RestartBalancerTablet(server.CleverServer->GetRuntime(), "rt3.dc1--topic1"); - Cerr << "Balancer killed\n"; - ui32 createEv = 0, destroyEv = 0; - for (ui32 i = 0; i < 4; ++i) { + ui32 createEv = 0, destroyEv = 0, dataEv = 0; + std::vector<ui32> gotDestroy{0, 0}; + + auto doRead = [&]() { auto msg = reader->GetEvent(true, 1); UNIT_ASSERT(msg); Cerr << "Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n"; + + if (std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&*msg)) { + ++dataEv; + return; + } + auto ev1 = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg); auto ev2 = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg); @@ -1853,15 +1963,47 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { if (ev1) { ++destroyEv; + UNIT_ASSERT(ev1->GetPartitionStream()->GetPartitionId() < 2); + gotDestroy[ev1->GetPartitionStream()->GetPartitionId()]++; } if (ev2) { - ev2->Confirm(); + ev2->Confirm(ev2->GetEndOffset()); ++createEv; + UNIT_ASSERT(ev2->GetPartitionStream()->GetPartitionId() < 2); + UNIT_ASSERT_VALUES_EQUAL(gotDestroy[ev2->GetPartitionStream()->GetPartitionId()], 1); } + }; + + decompressor->StartFuncs({0, 1, 2, 3, 4}); + + for (ui32 i = 0; i < 5; ++i) { + doRead(); } - UNIT_ASSERT(createEv == 2); - UNIT_ASSERT(destroyEv == 2); + + UNIT_ASSERT_VALUES_EQUAL(dataEv, 5); + + server.AnnoyingClient->RestartBalancerTablet(server.CleverServer->GetRuntime(), "rt3.dc1--topic1"); + Cerr << "Balancer killed\n"; + + Sleep(TDuration::Seconds(5)); + + for (ui32 i = 0; i < 4; ++i) { + doRead(); + } + + UNIT_ASSERT_VALUES_EQUAL(createEv, 2); + UNIT_ASSERT_VALUES_EQUAL(destroyEv, 2); + + UNIT_ASSERT_VALUES_EQUAL(dataEv, 5); + + decompressor->StartFuncs({5, 6, 7, 8, 9}); + + Sleep(TDuration::Seconds(5)); + + auto msg = reader->GetEvent(false, 1); + + UNIT_ASSERT(!msg); UNIT_ASSERT(!reader->WaitEvent().Wait(TDuration::Seconds(1))); } diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt index f647591235..96cac786f5 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt @@ -46,6 +46,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ) add_test( NAME diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt index a1e317a525..bcbbc35240 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt @@ -50,6 +50,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ) add_test( NAME diff --git a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp new file mode 100644 index 0000000000..f817751350 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp @@ -0,0 +1,56 @@ +#include "functions_executor_wrapper.h" + +namespace NKikimr::NPersQueueTests { + +FunctionExecutorWrapper::FunctionExecutorWrapper(TExecutorPtr executor) : + Executor{std::move(executor)} +{ +} + +bool FunctionExecutorWrapper::IsAsync() const +{ + return Executor->IsAsync(); +} + +void FunctionExecutorWrapper::Post(TFunction &&f) +{ + with_lock (Mutex) { + Funcs.push_back(std::move(f)); + } +} + +void FunctionExecutorWrapper::DoStart() +{ + Executor->Start(); +} + +void FunctionExecutorWrapper::StartFuncs(const std::vector<size_t>& indicies) +{ + with_lock (Mutex) { + for (auto index : indicies) { + Y_VERIFY(index < Funcs.size()); + Y_VERIFY(Funcs[index]); + + Executor->Post(std::move(Funcs[index])); + } + } +} + +size_t FunctionExecutorWrapper::GetFuncsCount() const +{ + with_lock (Mutex) { + return Funcs.size(); + } +} + +TIntrusivePtr<FunctionExecutorWrapper> CreateThreadPoolExecutorWrapper(size_t threads) +{ + return MakeIntrusive<FunctionExecutorWrapper>(NYdb::NPersQueue::CreateThreadPoolExecutor(threads)); +} + +TIntrusivePtr<FunctionExecutorWrapper> CreateSyncExecutorWrapper() +{ + return MakeIntrusive<FunctionExecutorWrapper>(NYdb::NPersQueue::CreateSyncExecutor()); +} + +} diff --git a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h new file mode 100644 index 0000000000..c895ee15d5 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h @@ -0,0 +1,34 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> +#include <util/system/mutex.h> + +#include <vector> + +namespace NKikimr::NPersQueueTests { + +class FunctionExecutorWrapper : public NYdb::NPersQueue::IExecutor { +public: + using TExecutorPtr = NYdb::NPersQueue::IExecutor::TPtr; + + explicit FunctionExecutorWrapper(TExecutorPtr executor); + + bool IsAsync() const override; + void Post(TFunction&& f) override; + + void StartFuncs(const std::vector<size_t>& indicies); + + size_t GetFuncsCount() const; + +private: + void DoStart() override; + + TExecutorPtr Executor; + TMutex Mutex; + std::vector<TFunction> Funcs; +}; + +TIntrusivePtr<FunctionExecutorWrapper> CreateThreadPoolExecutorWrapper(size_t threads); +TIntrusivePtr<FunctionExecutorWrapper> CreateSyncExecutorWrapper(); + +} |