aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2022-08-11 21:05:04 +0300
committershmel1k <shmel1k@ydb.tech>2022-08-11 21:05:04 +0300
commitd6afcd79c70c70a1c877508d75547fd4c36fc168 (patch)
tree0408f5c01c64fc8ee7739ebd2dc781bd3c866d21
parent58e18112b989706d6bb7a46a25b6ec02400cc60e (diff)
downloadydb-d6afcd79c70c70a1c877508d75547fd4c36fc168.tar.gz
[] fixed kikimr import
-rw-r--r--CMakeLists.darwin.txt8
-rw-r--r--CMakeLists.linux.txt8
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp10
-rw-r--r--ydb/library/yql/sql/pg/pg_sql.cpp2
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp423
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_topic.h142
-rw-r--r--ydb/public/lib/ydb_cli/topic/CMakeLists.txt28
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h20
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.cpp278
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read.h92
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp160
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_util.h41
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write.cpp207
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write.h79
-rw-r--r--ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp287
-rw-r--r--ydb/public/lib/ydb_cli/topic/ut/CMakeLists.darwin.txt60
-rw-r--r--ydb/public/lib/ydb_cli/topic/ut/CMakeLists.linux.txt64
-rw-r--r--ydb/public/lib/ydb_cli/topic/ut/CMakeLists.txt13
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h243
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp400
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp30
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp160
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp56
-rw-r--r--ydb/services/persqueue_v1/ut/functions_executor_wrapper.h34
31 files changed, 2575 insertions, 285 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 11de0310a3..c6292a4cc1 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -984,12 +984,13 @@ add_subdirectory(library/cpp/bucket_quoter)
add_subdirectory(ydb/public/lib/ydb_cli/dump/util)
add_subdirectory(ydb/public/lib/ydb_cli/dump)
add_subdirectory(ydb/public/lib/ydb_cli/import)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_export)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_import)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_operation)
+add_subdirectory(ydb/public/lib/ydb_cli/topic)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_persqueue_public)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_export)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_import)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_operation)
add_subdirectory(ydb/core/testlib)
add_subdirectory(library/cpp/testing/gmock_in_unittest)
add_subdirectory(contrib/restricted/googletest/googlemock)
@@ -1334,6 +1335,7 @@ add_subdirectory(ydb/library/yql/public/issue/ut)
add_subdirectory(ydb/library/yql/public/udf/ut)
add_subdirectory(ydb/public/lib/deprecated/kicli/ut)
add_subdirectory(ydb/public/lib/ydb_cli/common/ut)
+add_subdirectory(ydb/public/lib/ydb_cli/topic/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/ut)
add_subdirectory(ydb/public/sdk/cpp/client/extensions/solomon_stats)
add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 9f4cc8e0ca..4d6c28d694 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -991,12 +991,13 @@ add_subdirectory(library/cpp/bucket_quoter)
add_subdirectory(ydb/public/lib/ydb_cli/dump/util)
add_subdirectory(ydb/public/lib/ydb_cli/dump)
add_subdirectory(ydb/public/lib/ydb_cli/import)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_export)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_import)
-add_subdirectory(ydb/public/sdk/cpp/client/ydb_operation)
+add_subdirectory(ydb/public/lib/ydb_cli/topic)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_persqueue_public)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/impl)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_export)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_import)
+add_subdirectory(ydb/public/sdk/cpp/client/ydb_operation)
add_subdirectory(ydb/core/testlib)
add_subdirectory(library/cpp/testing/gmock_in_unittest)
add_subdirectory(contrib/restricted/googletest/googlemock)
@@ -1341,6 +1342,7 @@ add_subdirectory(ydb/library/yql/public/issue/ut)
add_subdirectory(ydb/library/yql/public/udf/ut)
add_subdirectory(ydb/public/lib/deprecated/kicli/ut)
add_subdirectory(ydb/public/lib/ydb_cli/common/ut)
+add_subdirectory(ydb/public/lib/ydb_cli/topic/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic/ut)
add_subdirectory(ydb/public/sdk/cpp/client/extensions/solomon_stats)
add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator)
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index c493d2190c..5040850ad7 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -186,13 +186,13 @@ protected:
virtual void PrepareForExec() {
ExecArgs.resize(Args.size() + 1, nullptr);
- for (uint i = 0; i < Args.size(); ++i) {
+ for (size_t i = 0; i < Args.size(); ++i) {
ExecArgs[i] = const_cast<char*>(Args[i].c_str());
}
ExecEnv.resize(Env.size() + 1, nullptr);
EnvElems.reserve(Env.size());
- uint i = 0;
+ size_t i = 0;
for (const auto& [k, v] : Env) {
EnvElems.push_back(k + "=" + v);
ExecEnv[i++] = const_cast<char*>(EnvElems.back().c_str());
@@ -200,7 +200,7 @@ protected:
}
virtual void Exec() {
- for (uint i = 3; i < 32768; ++i) {
+ for (int i = 3; i < 32768; ++i) {
close(i);
}
@@ -410,13 +410,13 @@ private:
"anon_limit=" + ToString(MemoryLimit)
};
ExecArgs.resize(ArgsElems.size() + 1, nullptr);
- for (uint i = 0; i < ArgsElems.size(); ++i) {
+ for (size_t i = 0; i < ArgsElems.size(); ++i) {
ExecArgs[i] = const_cast<char*>(ArgsElems[i].c_str());
}
}
void Exec() override {
- for (uint i = 3; i < 32768; ++i) {
+ for (int i = 3; i < 32768; ++i) {
close(i);
}
diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp
index 7cfc7c3df0..b6a33eb5ff 100644
--- a/ydb/library/yql/sql/pg/pg_sql.cpp
+++ b/ydb/library/yql/sql/pg/pg_sql.cpp
@@ -1153,6 +1153,8 @@ public:
if (!ParseAlias(value->alias, alias, colnames)) {
return {};
}
+ } else {
+ alias = value->relname;
}
if (view) {
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.txt
index 268d9718d7..5892a1d144 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.txt
@@ -21,6 +21,7 @@ target_link_libraries(clicommands PUBLIC
common
lib-ydb_cli-dump
lib-ydb_cli-import
+ topic
cpp-client-draft
cpp-client-ydb_discovery
cpp-client-ydb_export
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
index ec47a843f9..c3a9004321 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
@@ -1,8 +1,15 @@
+#include <contrib/libs/openssl/include/openssl/sha.h>
+
#include "ydb_service_topic.h"
#include <ydb/public/lib/ydb_cli/commands/ydb_command.h>
#include <ydb/public/lib/ydb_cli/common/command.h>
+#include <ydb/public/lib/ydb_cli/topic/topic_read.h>
+#include <ydb/public/lib/ydb_cli/topic/topic_util.h>
+#include <ydb/public/lib/ydb_cli/topic/topic_write.h>
+#include <util/generic/set.h>
#include <util/stream/str.h>
+#include <util/string/hex.h>
#include <util/string/vector.h>
namespace NYdb::NConsoleClient {
@@ -20,9 +27,47 @@ namespace NYdb::NConsoleClient {
std::pair<TString, NYdb::NTopic::ECodec>("lzop", NYdb::NTopic::ECodec::LZOP),
std::pair<TString, NYdb::NTopic::ECodec>("zstd", NYdb::NTopic::ECodec::ZSTD),
};
+
+ // TODO(shmel1k@): improve docs
+ THashMap<EStreamMetadataField, TString> StreamMetadataFieldsDescriptions = {
+ {EStreamMetadataField::Body, "message content"},
+ {EStreamMetadataField::WriteTime, "message write time"},
+ {EStreamMetadataField::CreateTime, "message creation time"},
+ {EStreamMetadataField::MessageGroupID, "message group id"},
+ {EStreamMetadataField::Offset, "offset"},
+ {EStreamMetadataField::SeqNo, "seqno"},
+ {EStreamMetadataField::Meta, "meta"},
+ };
+
+ const TVector<EStreamMetadataField> AllStreamMetadataFields = {
+ EStreamMetadataField::Body,
+ EStreamMetadataField::WriteTime,
+ EStreamMetadataField::CreateTime,
+ EStreamMetadataField::MessageGroupID,
+ EStreamMetadataField::Offset,
+ EStreamMetadataField::SeqNo,
+ EStreamMetadataField::Meta,
+ };
+
+ const THashMap<TString, EStreamMetadataField> StreamMetadataFieldsMap = {
+ {"body", EStreamMetadataField::Body},
+ {"write_time", EStreamMetadataField::WriteTime},
+ {"create_time", EStreamMetadataField::CreateTime},
+ {"message_group_id", EStreamMetadataField::MessageGroupID},
+ {"offset", EStreamMetadataField::Offset},
+ {"seq_no", EStreamMetadataField::SeqNo},
+ {"meta", EStreamMetadataField::Meta},
+ };
+
+ THashMap<ETransformBody, TString> TransformBodyDescriptions = {
+ {ETransformBody::None, "do not transform body to any format"},
+ {ETransformBody::Base64, "transform body to base64 format"},
+ };
+
+ constexpr TDuration DefaultIdleTimeout = TDuration::Seconds(1);
} // namespace
- void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NTopic::ECodec>& supportedCodecs) {
+ void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) {
TStringStream description;
description << "Comma-separated list of supported codecs. Available codecs: ";
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
@@ -70,6 +115,7 @@ namespace NYdb::NConsoleClient {
AddCommand(std::make_unique<TCommandTopicAlter>());
AddCommand(std::make_unique<TCommandTopicDrop>());
AddCommand(std::make_unique<TCommandTopicConsumer>());
+ AddCommand(std::make_unique<TCommandTopicRead>());
}
TCommandTopicCreate::TCommandTopicCreate()
@@ -283,4 +329,379 @@ namespace NYdb::NConsoleClient {
ThrowOnError(status);
return EXIT_SUCCESS;
}
+
+ TCommandTopicInternal::TCommandTopicInternal()
+ : TClientCommandTree("topic", {}, "Experimental topic operations") {
+ AddCommand(std::make_unique<TCommandTopicWrite>());
+ }
+
+ TCommandTopicRead::TCommandTopicRead()
+ : TYdbCommand("read", {}, "Read from topic command") {
+ }
+
+ void TCommandTopicRead::AddAllowedMetadataFields(TConfig& config) {
+ TStringStream description;
+ description << "Comma-separated list of message fields to print. Available fields: ";
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+ for (const auto& iter : StreamMetadataFieldsDescriptions) {
+ description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second;
+ }
+
+ config.Opts->AddLongOption("with-metadata-fields", description.Str())
+ .Optional()
+ .StoreResult(&WithMetadataFields_);
+ }
+
+ void TCommandTopicRead::AddAllowedTransformFormats(TConfig& config) {
+ TStringStream description;
+ description << "Format to transform received body: Available \"transform\" values: ";
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+ for (const auto& iter : TransformBodyDescriptions) {
+ description << "\n " << colors.BoldColor() << iter.first << colors.OldColor() << "\n " << iter.second;
+ }
+
+ config.Opts->AddLongOption("transform", description.Str())
+ .Optional()
+ .StoreResult(&TransformStr_);
+ }
+
+ void TCommandTopicRead::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+ config.Opts->SetFreeArgsNum(1);
+ SetFreeArgTitle(0, "<stream-path>", "Stream to read data");
+
+ AddFormats(config, {
+ EOutputFormat::Pretty,
+ EOutputFormat::NewlineDelimited,
+ EOutputFormat::Concatenated,
+ });
+
+ // TODO(shmel1k@): improve help.
+ config.Opts->AddLongOption('c', "consumer", "Consumer name")
+ .Required()
+ .StoreResult(&Consumer_);
+ config.Opts->AddLongOption("offset", "Offset to start read from")
+ .Optional()
+ .StoreResult(&Offset_);
+ config.Opts->AddLongOption("partition", "Partition to read from")
+ .Optional()
+ .StoreResult(&Partition_);
+ config.Opts->AddLongOption('f', "file", "File to write data to")
+ .Optional()
+ .StoreResult(&File_);
+ config.Opts->AddLongOption("flush-duration", "Duration for message flushing")
+ .Optional()
+ .StoreResult(&FlushDuration_);
+ config.Opts->AddLongOption("flush-size", "Maximum flush size") // TODO(shmel1k@): improve
+ .Optional()
+ .StoreResult(&FlushSize_);
+ config.Opts->AddLongOption("flush-messages-count", "") // TODO(shmel1k@): improve
+ .Optional()
+ .StoreResult(&FlushMessagesCount_);
+ config.Opts->AddLongOption("idle-timeout", "Max wait duration for new messages")
+ .Optional()
+ .DefaultValue(DefaultIdleTimeout)
+ .StoreResult(&IdleTimeout_);
+ config.Opts->AddLongOption("commit", "Commit messages after successful read")
+ .Optional()
+ .DefaultValue(true)
+ .StoreResult(&Commit_);
+ config.Opts->AddLongOption("message-size-limit", "Maximum message size")
+ .Optional()
+ .StoreResult(&MessageSizeLimit_);
+ config.Opts->AddLongOption("discard-above-limits", "Do not print messages with size more than defined in 'message-size-limit' option")
+ .Optional()
+ .StoreResult(&DiscardAboveLimits_);
+ config.Opts->AddLongOption("limit", "Messages count to read")
+ .Optional()
+ .StoreResult(&Limit_);
+ config.Opts->AddLongOption('w', "wait", "Wait for infinite time for first message received")
+ .Optional()
+ .NoArgument()
+ .StoreValue(&Wait_, true);
+ config.Opts->AddLongOption("timestamp", "Timestamp from which messages will be read")
+ .Optional()
+ .StoreResult(&Timestamp_);
+
+ AddAllowedMetadataFields(config);
+ AddAllowedTransformFormats(config);
+ }
+
+ void TCommandTopicRead::ParseMetadataFields() {
+ MetadataFields_ = AllStreamMetadataFields;
+
+ // TODO(shmel1k@): discuss: disable all fields?
+ if (WithMetadataFields_ == "all") {
+ return;
+ }
+
+ TVector<TString> split = SplitString(WithMetadataFields_, ",");
+ if (split.empty()) {
+ return;
+ }
+
+ TSet<EStreamMetadataField> set;
+ for (const auto& field : split) {
+ auto f = StreamMetadataFieldsMap.find(field);
+ if (f == StreamMetadataFieldsMap.end()) {
+ throw TMisuseException() << "Field " << field << " not found in available fields"; // TODO(shmel1k@): improve message.
+ }
+ set.insert(f->second);
+ }
+
+ TVector<EStreamMetadataField> result;
+ result.reserve(set.size());
+ // NOTE(shmel1k@): preserving the order from AllMetadataFields
+ for (const auto metadataField : set) {
+ auto f = std::find(AllStreamMetadataFields.begin(), AllStreamMetadataFields.end(), metadataField);
+ if (f == AllStreamMetadataFields.end()) {
+ continue;
+ }
+ result.push_back(metadataField);
+ }
+
+ MetadataFields_ = result;
+ }
+
+ void TCommandTopicRead::ParseTransformFormat() {
+ if (!TransformStr_.Defined()) {
+ return;
+ }
+
+ TString val = *TransformStr_;
+ if (val == (TStringBuilder() << ETransformBody::None)) {
+ return;
+ }
+ if (val == (TStringBuilder() << ETransformBody::Base64)) {
+ Transform_ = ETransformBody::Base64;
+ return;
+ }
+
+ throw TMisuseException() << "Transform " << *TransformStr_ << " not found in available \"transform\" values";
+ }
+
+ void TCommandTopicRead::Parse(TConfig& config) {
+ TYdbCommand::Parse(config);
+ ParseTopicName(config, 0);
+ ParseFormats();
+ ParseMetadataFields();
+ ParseTransformFormat();
+ }
+
+ NTopic::TReadSessionSettings TCommandTopicRead::PrepareReadSessionSettings() {
+ NTopic::TReadSessionSettings settings;
+ settings.ConsumerName(Consumer_);
+ // settings.ReadAll(); // TODO(shmel1k@): change to read only original?
+ if (Timestamp_.Defined()) {
+ settings.ReadFromTimestamp(TInstant::Seconds(*(Timestamp_.Get())));
+ }
+
+ // TODO(shmel1k@): partition can be added here.
+ NTopic::TTopicReadSettings readSettings;
+ readSettings.Path(TopicName);
+ settings.AppendTopics(std::move(readSettings));
+ return settings;
+ }
+
+ void TCommandTopicRead::ValidateConfig() {
+ // TODO(shmel1k@): add more formats.
+ if (OutputFormat != EOutputFormat::Default && (Limit_.Defined() && (Limit_ < 0 || Limit_ > 500))) {
+ throw TMisuseException() << "OutputFormat " << OutputFormat << " is not compatible with "
+ << "limit equal '0' or more than '500': '" << *Limit_ << "' was given";
+ }
+ }
+
+ int TCommandTopicRead::Run(TConfig& config) {
+ ValidateConfig();
+
+ auto driver = std::make_unique<TDriver>(CreateDriver(config));
+ NTopic::TTopicClient persQueueClient(*driver);
+ auto readSession = persQueueClient.CreateReadSession(PrepareReadSessionSettings());
+
+ TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient);
+ checker.CheckTopicExistence(TopicName, Consumer_);
+
+ {
+ TTopicReader reader = TTopicReader(std::move(readSession), TTopicReaderSettings(
+ Limit_,
+ Commit_,
+ Wait_,
+ OutputFormat,
+ MetadataFields_,
+ Transform_,
+ IdleTimeout_));
+
+ reader.Init();
+
+ int status = 0;
+ if (File_.Defined()) {
+ TFileOutput out(*File_);
+ status = reader.Run(out);
+ reader.Close(out);
+ } else {
+ status = reader.Run(Cout);
+ reader.Close(Cout);
+ }
+ if (status) {
+ return status;
+ }
+ }
+
+ driver->Stop(true);
+
+ return EXIT_SUCCESS;
+ }
+
+ TCommandTopicWrite::TCommandTopicWrite()
+ : TYdbCommand("write", {}, "Write to topic command") {
+ }
+
+ void TCommandTopicWrite::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+ config.Opts->SetFreeArgsNum(1);
+ SetFreeArgTitle(0, "<topic-path>", "Topic to write data");
+
+ AddInputFormats(config, {
+ EOutputFormat::NewlineDelimited,
+ EOutputFormat::SingleMessage,
+ // EOutputFormat::JsonRawStreamConcat,
+ // EOutputFormat::JsonRawArray,
+ EOutputFormat::SingleMessage,
+ });
+ AddAllowedCodecs(config);
+
+ // TODO(shmel1k@): improve help.
+ config.Opts->AddLongOption('d', "delimiter", "Delimiter to split messages")
+ .Optional()
+ .StoreResult(&Delimiter_);
+ config.Opts->AddLongOption('f', "file", "File to read data from")
+ .Optional()
+ .StoreResult(&File_);
+ config.Opts->AddLongOption("message-size-limit", "Size limit for a single message")
+ .Optional()
+ .StoreResult(&MessageSizeLimitStr_);
+ config.Opts->AddLongOption("batch-duration", "Duration for message batching")
+ .Optional()
+ .StoreResult(&BatchDuration_);
+ config.Opts->AddLongOption("batch-size", "Maximum batch size") // TODO(shmel1k@): improve
+ .Optional()
+ .StoreResult(&BatchSize_);
+ config.Opts->AddLongOption("batch-messages-count", "") // TODO(shmel1k@): improve
+ .Optional()
+ .StoreResult(&BatchMessagesCount_);
+ config.Opts->AddLongOption("message-group-id", "Message Group ID") // TODO(shmel1k@): improve
+ .Optional()
+ .StoreResult(&MessageGroupId_);
+ }
+
+ void TCommandTopicWrite::Parse(TConfig& config) {
+ TYdbCommand::Parse(config);
+ ParseTopicName(config, 0);
+ ParseFormats();
+ ParseCodec();
+
+ if (Delimiter_.Defined() && InputFormat != EOutputFormat::Default) {
+ throw TMisuseException() << "Both mutually exclusive options \"delimiter\"(\"--delimiter\", \"-d\" "
+ << "and \"input format\"(\"--input-format\") were provided.";
+ }
+ }
+
+ NPersQueue::TWriteSessionSettings TCommandTopicWrite::PrepareWriteSessionSettings() {
+ NPersQueue::TWriteSessionSettings settings;
+ settings.Codec(GetCodec()); // TODO(shmel1k@): codecs?
+ settings.Path(TopicName);
+ //settings.BatchFlushInterval(BatchDuration_);
+ //settings.BatchFlushSizeBytes(BatchSize_);
+ settings.ClusterDiscoveryMode(NPersQueue::EClusterDiscoveryMode::Auto);
+
+ if (!MessageGroupId_.Defined()) {
+ const TString rnd = ToString(TInstant::Now().NanoSeconds());
+ SHA_CTX ctx;
+ SHA1_Init(&ctx);
+ SHA1_Update(&ctx, rnd.data(), rnd.size());
+ unsigned char sha1[SHA_DIGEST_LENGTH];
+ SHA1_Final(sha1, &ctx);
+
+ TString hex = HexEncode(TString(reinterpret_cast<const char*>(sha1), SHA_DIGEST_LENGTH));
+ hex.to_lower();
+ MessageGroupId_ = TString(hex.begin(), hex.begin() + 6);
+ }
+
+ settings.MessageGroupId(*MessageGroupId_);
+
+ return settings;
+ }
+
+ void TCommandTopicWrite::CheckOptions(NPersQueue::TPersQueueClient& persQueueClient) {
+ NPersQueue::TAsyncDescribeTopicResult descriptionFuture = persQueueClient.DescribeTopic(TopicName);
+ descriptionFuture.Wait(TDuration::Seconds(1));
+ NPersQueue::TDescribeTopicResult description = descriptionFuture.GetValueSync();
+ ThrowOnError(description);
+
+ NPersQueue::ECodec codec = GetCodec();
+ bool exists = false;
+ for (const auto c : description.TopicSettings().SupportedCodecs()) {
+ if (c == codec) {
+ exists = true;
+ break;
+ }
+ }
+ if (exists) {
+ return;
+ }
+ TStringStream errorMessage;
+ errorMessage << "Codec \"" << (TStringBuilder() << codec) << "\" is not available for topic. Available codecs:\n";
+ for (const auto c : description.TopicSettings().SupportedCodecs()) {
+ errorMessage << " " << (TStringBuilder() << c) << "\n";
+ }
+ throw TMisuseException() << errorMessage.Str();
+ }
+
+ int TCommandTopicWrite::Run(TConfig& config) {
+ SetInterruptHandlers();
+
+ auto driver = std::make_unique<TDriver>(CreateDriver(config));
+ NPersQueue::TPersQueueClient persQueueClient(*driver);
+
+ CheckOptions(persQueueClient);
+
+ // TODO(shmel1k@): return back after IWriteSession in TopicService SDK
+ // TTopicInitializationChecker checker = TTopicInitializationChecker(persQueueClient);
+ // checker.CheckTopicExistence(TopicName);
+
+ {
+ auto writeSession = NPersQueue::TPersQueueClient(*driver).CreateWriteSession(std::move(PrepareWriteSessionSettings()));
+ auto writer = TTopicWriter(writeSession, std::move(TTopicWriterParams(
+ InputFormat,
+ Delimiter_,
+ MessageSizeLimit_,
+ BatchDuration_,
+ BatchSize_,
+ BatchMessagesCount_)));
+
+ if (int status = writer.Init(); status) {
+ return status;
+ }
+
+ int status = 0;
+ if (File_.Defined()) {
+ TFileInput input(*File_);
+ status = writer.Run(input);
+ } else {
+ status = writer.Run(Cin);
+ }
+ if (status) {
+ return status;
+ }
+
+ if (!writer.Close()) {
+ Cerr << "Failed to close session" << Endl;
+ return EXIT_FAILURE;
+ }
+ }
+
+ driver->Stop(true);
+ return EXIT_SUCCESS;
+ }
+
} // namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
index 555af2f99e..95a3f97f11 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
@@ -3,8 +3,12 @@
#include "ydb_command.h"
#include "ydb_common.h"
+#include <ydb/public/lib/ydb_cli/common/interruptible.h>
+#include <ydb/public/lib/ydb_cli/topic/topic_read.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+
namespace NYdb::NConsoleClient {
TVector<NYdb::NTopic::ECodec> InitAllowedCodecs();
const TVector<NYdb::NTopic::ECodec> AllowedCodecs = InitAllowedCodecs();
@@ -87,4 +91,142 @@ namespace NYdb::NConsoleClient {
private:
TString ConsumerName_;
};
+
+ class TCommandTopicInternal: public TClientCommandTree {
+ public:
+ TCommandTopicInternal();
+ };
+
+ class TCommandTopicRead: public TYdbCommand,
+ public TCommandWithFormat,
+ public TInterruptibleCommand,
+ public TCommandWithTopicName {
+ public:
+ TCommandTopicRead();
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
+
+ private:
+ TString Consumer_ = "";
+ TMaybe<uint32_t> Offset_;
+ TMaybe<uint32_t> Partition_;
+ TMaybe<ui64> Timestamp_;
+ TMaybe<TString> File_;
+ TMaybe<TString> TransformStr_;
+
+ TMaybe<TDuration> FlushDuration_;
+ TMaybe<int> FlushSize_;
+ TMaybe<int> FlushMessagesCount_;
+ TDuration IdleTimeout_;
+
+ TString WithMetadataFields_ = "all"; // TODO(shmel1k@): improve.
+ TVector<EStreamMetadataField> MetadataFields_;
+
+ TMaybe<ui64> MessageSizeLimit_;
+ TMaybe<i64> Limit_ = Nothing();
+ ETransformBody Transform_ = ETransformBody::None;
+
+ bool Commit_ = false;
+ bool DiscardAboveLimits_ = false;
+ bool Wait_ = false;
+
+ private:
+ void ValidateConfig();
+ void AddAllowedMetadataFields(TConfig& config);
+ void ParseMetadataFields();
+ void AddAllowedTransformFormats(TConfig& config);
+ void ParseTransformFormat();
+ NTopic::TReadSessionSettings PrepareReadSessionSettings();
+ };
+
+ namespace {
+ const THashMap<NYdb::NPersQueue::ECodec, TString> CodecsDescriptionsMigration = {
+ {NYdb::NPersQueue::ECodec::RAW, "Raw codec. No data compression(default)"},
+ {NYdb::NPersQueue::ECodec::GZIP, "GZIP codec. Data is compressed with GZIP compression algorithm"},
+ {NYdb::NPersQueue::ECodec::LZOP, "LZOP codec. Data is compressed with LZOP compression algorithm"},
+ {NYdb::NPersQueue::ECodec::ZSTD, "ZSTD codec. Data is compressed with ZSTD compression algorithm"},
+ };
+ }
+
+ const TVector<NYdb::NPersQueue::ECodec> AllowedCodecsMigration = {
+ NPersQueue::ECodec::RAW,
+ NPersQueue::ECodec::GZIP,
+ NPersQueue::ECodec::ZSTD,
+ };
+
+ class TCommandWithCodecMigration {
+ // TODO(shmel1k@): remove after TopicService C++ SDK supports IWriteSession
+ protected:
+ void AddAllowedCodecs(TClientCommand::TConfig& config) {
+ TStringStream description;
+ description << "Codec used for writing. Available codecs: ";
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+ for (const auto& codec : AllowedCodecsMigration) {
+ auto findResult = CodecsDescriptionsMigration.find(codec);
+ Y_VERIFY(findResult != CodecsDescriptionsMigration.end(),
+ "Couldn't find description for %s codec", (TStringBuilder() << codec).c_str());
+ description << "\n " << colors.BoldColor() << codec << colors.OldColor()
+ << "\n " << findResult->second;
+ }
+ config.Opts->AddLongOption("codec", description.Str())
+ .RequiredArgument("STRING")
+ .StoreResult(&CodecStr_);
+ }
+
+ void ParseCodec() {
+ TString codec = to_lower(CodecStr_);
+ if (codec == "raw") {
+ Codec_ = NPersQueue::ECodec::RAW;
+ }
+ if (codec == "gzip") {
+ Codec_ = NPersQueue::ECodec::GZIP;
+ }
+ if (codec = "zstd") {
+ Codec_ = NPersQueue::ECodec::ZSTD;
+ }
+ }
+
+ NPersQueue::ECodec GetCodec() const {
+ if (CodecStr_ == "") {
+ return DefaultCodec_;
+ }
+ return Codec_;
+ }
+
+ private:
+ NPersQueue::ECodec DefaultCodec_ = NPersQueue::ECodec::RAW;
+ TString CodecStr_;
+ NPersQueue::ECodec Codec_;
+ };
+
+ class TCommandTopicWrite: public TYdbCommand,
+ public TCommandWithFormat,
+ public TInterruptibleCommand,
+ public TCommandWithTopicName,
+ public TCommandWithCodecMigration {
+ public:
+ TCommandTopicWrite();
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
+
+ private:
+ TMaybe<TString> File_;
+ TMaybe<TString> Delimiter_;
+ TMaybe<TString> MessageSizeLimitStr_; // TODO(shmel1k@): think how to parse
+
+ // TODO(shmel1k@): move to 'TWithBatchingCommand' or something like that.
+ TMaybe<TDuration> BatchDuration_;
+ TMaybe<ui64> BatchSize_;
+ TMaybe<ui64> BatchMessagesCount_;
+ TMaybe<TString> MessageGroupId_;
+
+ ui64 MessageSizeLimit_ = 0;
+ void ParseMessageSizeLimit();
+ void CheckOptions(NPersQueue::TPersQueueClient& persQueueClient);
+
+ private:
+ NPersQueue::TWriteSessionSettings PrepareWriteSessionSettings();
+ };
} // namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/topic/CMakeLists.txt b/ydb/public/lib/ydb_cli/topic/CMakeLists.txt
new file mode 100644
index 0000000000..84b0725ec1
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/CMakeLists.txt
@@ -0,0 +1,28 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(topic)
+target_link_libraries(topic PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ common
+ cpp-client-ydb_proto
+ cpp-client-ydb_persqueue_public
+ cpp-client-ydb_topic
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(topic PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write.cpp
+)
+generate_enum_serilization(topic
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h
+ INCLUDE_HEADERS
+ ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h
+)
diff --git a/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h b/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h
new file mode 100644
index 0000000000..7ecc1bbda0
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/topic_metadata_fields.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <util/string/vector.h>
+
+namespace NYdb::NConsoleClient {
+ enum class EStreamMetadataField {
+ Body /* "body" */,
+ WriteTime /* "write_time" */,
+ CreateTime /* "create_time" */,
+ MessageGroupID /* "message_group_id" */,
+ Offset /* "offset" */,
+ SeqNo /* "seq_no" */,
+ Meta /* "meta" */,
+ };
+
+ enum class ETransformBody {
+ None = 0 /* "none" */,
+ Base64 = 1 /* "base64" */,
+ };
+} // namespace NYdb::NConsoleClient \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.cpp b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
new file mode 100644
index 0000000000..34a213b6f9
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.cpp
@@ -0,0 +1,278 @@
+#include "topic_metadata_fields.h"
+#include "topic_read.h"
+#include "topic_util.h"
+#include "topic_write.h"
+#include <library/cpp/json/json_reader.h>
+#include <library/cpp/string_utils/base64/base64.h>
+#include <util/generic/set.h>
+
+namespace NYdb::NConsoleClient {
+ namespace {
+ constexpr i64 MessagesLimitUnlimited = -1;
+ constexpr i64 MessagesLimitDefaultPrettyFormat = 10;
+ constexpr i64 MessagesLimitDefaultJsonArrayFormat = 500;
+
+ bool IsStreamingFormat(EOutputFormat format) {
+ return format == EOutputFormat::NewlineBase64 || format == EOutputFormat::NewlineDelimited || format == EOutputFormat::Concatenated;
+ }
+ }
+
+ TTopicReaderSettings::TTopicReaderSettings() {
+ }
+
+ TTopicReaderSettings::TTopicReaderSettings(
+ TMaybe<i64> limit,
+ bool commit,
+ bool wait,
+ EOutputFormat format,
+ TVector<EStreamMetadataField> metadataFields,
+ ETransformBody transform,
+ TDuration idleTimeout)
+ : MetadataFields_(metadataFields)
+ , IdleTimeout_(idleTimeout)
+ , OutputFormat_(format)
+ , Transform_(transform)
+ , Limit_(limit)
+ , Commit_(commit)
+ , Wait_(wait) {
+ }
+
+ TTopicReader::TTopicReader(
+ std::shared_ptr<NTopic::IReadSession> readSession,
+ TTopicReaderSettings params)
+ : ReadSession_(readSession)
+ , ReaderParams_(params) {
+ }
+
+ void TTopicReader::Init() {
+ TVector<TString> tableFields;
+ for (const auto& f : ReaderParams_.MetadataFields()) {
+ tableFields.emplace_back(TStringBuilder() << f);
+ }
+ TPrettyTable table(tableFields);
+ OutputTable_ = std::make_unique<TPrettyTable>(table);
+
+ if (!ReaderParams_.Limit().Defined()) {
+ if (IsStreamingFormat(ReaderParams_.OutputFormat())) {
+ MessagesLeft_ = MessagesLimitUnlimited;
+ }
+ if (ReaderParams_.OutputFormat() == EOutputFormat::Pretty) {
+ MessagesLeft_ = MessagesLimitDefaultPrettyFormat;
+ }
+ if (ReaderParams_.OutputFormat() == EOutputFormat::JsonRawArray) {
+ MessagesLeft_ = MessagesLimitDefaultJsonArrayFormat;
+ }
+ return;
+ }
+
+ i64 limit = *(ReaderParams_.Limit());
+ if (IsStreamingFormat(ReaderParams_.OutputFormat()) && limit == 0) {
+ limit = -1;
+ }
+ MessagesLeft_ = limit;
+ }
+
+ namespace {
+ const TString FormatBody(const TString& body, ETransformBody transform) {
+ if (transform == ETransformBody::Base64) {
+ return Base64Encode(body);
+ }
+
+ return body;
+ }
+
+ using TReceivedMessage = NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage;
+
+ void AddMetadataFieldToRow(TPrettyTable::TRow& row, const TReceivedMessage& message, EOutputFormat format, ETransformBody transform, EStreamMetadataField f, size_t idx) {
+ switch (f) {
+ case EStreamMetadataField::Body:
+ if (format == EOutputFormat::PrettyBase64) {
+ row.Column(idx, FormatBody(message.GetData(), transform));
+ }
+ if (format == EOutputFormat::Pretty || format == EOutputFormat::PrettyRaw) {
+ row.Column(idx, FormatBody(message.GetData(), transform));
+ }
+ if (format == EOutputFormat::PrettyUnicode) {
+ row.Column(idx, message.GetData());
+ }
+ break;
+
+ case EStreamMetadataField::CreateTime:
+ row.Column(idx, message.GetCreateTime());
+ break;
+ case EStreamMetadataField::MessageGroupID:
+ row.Column(idx, message.GetMessageGroupId());
+ break;
+ case EStreamMetadataField::Offset:
+ row.Column(idx, message.GetOffset());
+ break;
+ case EStreamMetadataField::WriteTime:
+ row.Column(idx, message.GetWriteTime()); // improve for pretty
+ break;
+ case EStreamMetadataField::SeqNo:
+ row.Column(idx, message.GetSeqNo());
+ break;
+ case EStreamMetadataField::Meta:
+ NJson::TJsonValue json;
+ for (auto const& [k, v] : message.GetMeta()->Fields) {
+ json[k] = v;
+ }
+ row.Column(idx, json);
+ break;
+ }
+ }
+ } // namespace
+
+ void TTopicReader::PrintMessagesInPrettyFormat(IOutputStream& output) {
+ for (const auto& message : ReceivedMessages_) {
+ TPrettyTable::TRow& row = OutputTable_->AddRow();
+ for (size_t i = 0; i < ReaderParams_.MetadataFields().size(); ++i) {
+ EStreamMetadataField f = ReaderParams_.MetadataFields()[i];
+ AddMetadataFieldToRow(row, message, ReaderParams_.OutputFormat(), ReaderParams_.Transform(), f, i);
+ }
+ }
+
+ OutputTable_->Print(output);
+ }
+
+ void TTopicReader::PrintMessagesInJsonArrayFormat(IOutputStream& output) {
+ // TODO(shmel1k@): not implemented yet.
+ Y_UNUSED(output);
+ }
+
+ void TTopicReader::Close(IOutputStream& output, TDuration closeTimeout) {
+ if (ReaderParams_.OutputFormat() == EOutputFormat::Pretty ||
+ ReaderParams_.OutputFormat() == EOutputFormat::PrettyBase64 ||
+ ReaderParams_.OutputFormat() == EOutputFormat::PrettyRaw) {
+ PrintMessagesInPrettyFormat(output);
+ }
+ if (ReaderParams_.OutputFormat() == EOutputFormat::JsonRawArray ||
+ ReaderParams_.OutputFormat() == EOutputFormat::JsonBase64Array ||
+ ReaderParams_.OutputFormat() == EOutputFormat::JsonUnicodeArray) {
+ PrintMessagesInJsonArrayFormat(output);
+ }
+ output.Flush();
+ bool success = ReadSession_->Close(closeTimeout);
+ if (!success) {
+ throw yexception() << "Failed to close read session\n";
+ }
+ }
+
+ void TTopicReader::HandleReceivedMessage(const TReceivedMessage& message, IOutputStream& output) {
+ EOutputFormat outputFormat = ReaderParams_.OutputFormat();
+ if (outputFormat == EOutputFormat::Default || outputFormat == EOutputFormat::Concatenated) {
+ output << FormatBody(message.GetData(), ReaderParams_.Transform());
+ output.Flush();
+ return;
+ }
+ if (outputFormat == EOutputFormat::NewlineDelimited) {
+ output << FormatBody(message.GetData(), ReaderParams_.Transform());
+ output << "\n";
+ output.Flush();
+ return;
+ }
+ if (outputFormat == EOutputFormat::Default) {
+ output << FormatBody(message.GetData(), ReaderParams_.Transform());
+ return;
+ }
+
+ ReceivedMessages_.push_back(message);
+ }
+
+ int TTopicReader::HandleDataReceivedEvent(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent* event, IOutputStream& output) {
+ HasFirstMessage_ = true;
+
+ NTopic::TDeferredCommit defCommit;
+ for (const auto& message : event->GetMessages()) {
+ HandleReceivedMessage(message, output);
+ if (ReaderParams_.Commit()) {
+ defCommit.Add(message);
+ }
+
+ if (MessagesLeft_ == MessagesLimitUnlimited) {
+ continue;
+ }
+
+ --MessagesLeft_;
+ if (MessagesLeft_ == 0) {
+ break;
+ }
+ }
+
+ if (ReaderParams_.Commit()) {
+ defCommit.Commit();
+ }
+ LastMessageReceivedTs_ = Now();
+ return EXIT_SUCCESS;
+ }
+
+ int TTopicReader::HandleStartPartitionSessionEvent(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent* event) {
+ event->Confirm();
+
+ return EXIT_SUCCESS;
+ }
+
+ int TTopicReader::HandleCommitOffsetAcknowledgementEvent(NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent* event) {
+ Y_UNUSED(event);
+
+ return EXIT_SUCCESS;
+ }
+
+ int TTopicReader::HandleEvent(TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent>& event, IOutputStream& output) {
+ if (!event) {
+ return 0;
+ }
+
+ if (auto* dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
+ return HandleDataReceivedEvent(dataEvent, output);
+ } else if (auto* createPartitionStreamEvent = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
+ return HandleStartPartitionSessionEvent(createPartitionStreamEvent);
+ } else if (auto* commitEvent = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&*event)) {
+ return HandleCommitOffsetAcknowledgementEvent(commitEvent);
+ } else if (auto* sessionClosedEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) {
+ ThrowOnError(*sessionClosedEvent);
+ return 1;
+ }
+ return 0;
+ }
+
+ int TTopicReader::Run(IOutputStream& output) {
+ // TODO(shmel1k@): improve behavior according to documentation
+ constexpr TDuration MaxWaitTime = TDuration::Seconds(1); // TODO(shmel1k@): to consts
+
+ LastMessageReceivedTs_ = TInstant::Now();
+
+ bool waitForever = ReaderParams_.Wait() && (ReaderParams_.OutputFormat() == EOutputFormat::NewlineDelimited || ReaderParams_.OutputFormat() == EOutputFormat::Concatenated);
+
+ while ((MessagesLeft_ > 0 || MessagesLeft_ == -1) && !IsInterrupted()) {
+ TInstant messageReceiveDeadline = LastMessageReceivedTs_ + MaxWaitTime;
+ NThreading::TFuture<void> future = ReadSession_->WaitEvent();
+ future.Wait(messageReceiveDeadline);
+ if (!future.HasValue()) {
+ if (ReaderParams_.Wait() && !HasFirstMessage_) {
+ LastMessageReceivedTs_ = Now();
+ continue;
+ }
+
+ if (waitForever) {
+ continue;
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ // TODO(shmel1k@): throttling?
+ // TODO(shmel1k@): think about limiting size of events
+ TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = ReadSession_->GetEvent(true);
+ if (!event) {
+ // TODO(shmel1k@): does it work properly?
+ continue;
+ }
+
+ if (int status = HandleEvent(event, output); status) {
+ return status;
+ }
+ }
+ return EXIT_SUCCESS;
+ }
+} // namespace NYdb::NConsoleClient \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read.h b/ydb/public/lib/ydb_cli/topic/topic_read.h
new file mode 100644
index 0000000000..f3297007bd
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/topic_read.h
@@ -0,0 +1,92 @@
+#pragma once
+
+#include "topic_metadata_fields.h"
+#include "ydb/public/lib/ydb_cli/commands/ydb_command.h"
+#include <util/stream/null.h>
+#include <ydb/public/lib/ydb_cli/common/format.h>
+#include <ydb/public/lib/ydb_cli/common/interruptible.h>
+#include <ydb/public/lib/ydb_cli/common/pretty_table.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+namespace NYdb::NConsoleClient {
+#define GETTER(TYPE, NAME) \
+ TYPE NAME() const { \
+ return NAME##_; \
+ }
+
+ class TTopicReaderSettings {
+ public:
+ TTopicReaderSettings(
+ TMaybe<i64> limit,
+ bool commit,
+ bool wait,
+ EOutputFormat format,
+ TVector<EStreamMetadataField> metadataFields,
+ ETransformBody transform,
+ TDuration idleTimeout);
+
+ TTopicReaderSettings();
+ TTopicReaderSettings(const TTopicReaderSettings&) = default;
+ TTopicReaderSettings(TTopicReaderSettings&&) = default;
+
+ GETTER(TVector<EStreamMetadataField>, MetadataFields);
+ GETTER(bool, Commit);
+ GETTER(TMaybe<i64>, Limit);
+ GETTER(bool, Wait);
+ GETTER(EOutputFormat, OutputFormat);
+ GETTER(ETransformBody, Transform);
+ GETTER(TDuration, IdleTimeout);
+ // TODO(shmel1k@): add batching settings.
+
+ private:
+ TVector<EStreamMetadataField> MetadataFields_;
+ TMaybe<TDuration> FlushDuration_;
+ TMaybe<int> FlushSize_;
+ TMaybe<int> FlushMessagesCount_;
+ TDuration IdleTimeout_;
+
+ EOutputFormat OutputFormat_ = EOutputFormat::Default;
+ ETransformBody Transform_ = ETransformBody::None;
+ TMaybe<i64> Limit_ = Nothing();
+ bool Commit_ = false;
+ bool Wait_ = false;
+ };
+
+ class TTopicReaderTests;
+
+ // TODO(shmel1k@): think about interruption here.
+ class TTopicReader: public TInterruptibleCommand {
+ using TReceivedMessage = NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage;
+
+ public:
+ TTopicReader(std::shared_ptr<NTopic::IReadSession>, TTopicReaderSettings);
+
+ void Close(IOutputStream& output, TDuration closeTimeout = TDuration::Max());
+ void Init();
+ int Run(IOutputStream&);
+
+ void HandleReceivedMessage(const TReceivedMessage& message, IOutputStream& output);
+
+ int HandleStartPartitionSessionEvent(NTopic::TReadSessionEvent::TStartPartitionSessionEvent*);
+ int HandleDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent*, IOutputStream&);
+ int HandleCommitOffsetAcknowledgementEvent(NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent*);
+ int HandleEvent(TMaybe<NTopic::TReadSessionEvent::TEvent>&, IOutputStream&);
+
+ private:
+ void PrintMessagesInPrettyFormat(IOutputStream& output);
+ void PrintMessagesInJsonArrayFormat(IOutputStream& output);
+
+ private:
+ std::shared_ptr<NTopic::IReadSession> ReadSession_;
+ const TTopicReaderSettings ReaderParams_;
+
+ i64 MessagesLeft_ = 1; // Messages left to read. -1 means 'unlimited'
+ bool HasFirstMessage_ = false;
+ TInstant LastMessageReceivedTs_;
+
+ std::unique_ptr<TPrettyTable> OutputTable_;
+ TVector<TReceivedMessage> ReceivedMessages_;
+
+ friend class TTopicReaderTests;
+ };
+} // namespace NYdb::NConsoleClient \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp
new file mode 100644
index 0000000000..d18bc53c4d
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp
@@ -0,0 +1,160 @@
+#include "topic_read.h"
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.h>
+#include <ydb/services/persqueue_v1/ut/persqueue_test_fixture.h>
+
+namespace NYdb::NConsoleClient {
+ class TTopicReaderTests: public NUnitTest::TTestBase {
+ using IReadSession = std::shared_ptr<NYdb::NTopic::IReadSession>;
+
+ public:
+ UNIT_TEST_SUITE(TTopicReaderTests)
+ UNIT_TEST(TestRun_ReadOneMessage);
+ UNIT_TEST(TestRun_ReadTwoMessages_With_Limit_1);
+ UNIT_TEST(TestRun_ReadMoreMessagesThanLimit_Without_Wait_NewlineDelimited);
+ UNIT_TEST(TestRun_ReadMoreMessagesThanLimit_Without_Wait_NoDelimiter);
+ UNIT_TEST(TestRun_ReadMessages_Output_Base64);
+ UNIT_TEST(TestRun_Read_Less_Messages_Than_Sent);
+ UNIT_TEST_SUITE_END();
+
+ void TestRun_ReadOneMessage() {
+ RunTest({
+ "some simple message",
+ },
+ {
+ "some simple message",
+ },
+ "", TTopicReaderSettings(Nothing(), false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1)));
+ }
+
+ void TestRun_ReadTwoMessages_With_Limit_1() {
+ RunTest({
+ "message1",
+ "message2",
+ },
+ {
+ "message1",
+ },
+ "", TTopicReaderSettings(1, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1)));
+ }
+
+ void TestRun_ReadMoreMessagesThanLimit_Without_Wait_NewlineDelimited() {
+ ui64 limit = 4;
+ RunTest(
+ {
+ "message1",
+ "message2",
+ "message3",
+ },
+ {
+ "message1",
+ "message2",
+ "message3",
+ },
+ "\n", TTopicReaderSettings(limit, false, false, EOutputFormat::NewlineDelimited, {}, ETransformBody::None, TDuration::Seconds(1)));
+ }
+
+ void TestRun_ReadMoreMessagesThanLimit_Without_Wait_NoDelimiter() {
+ ui64 limit = 5;
+ RunTest(
+ {"message1",
+ "message2",
+ "message3",
+ "message4"},
+ {
+ "message1message2message3message4",
+ },
+ "", TTopicReaderSettings(limit, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1)));
+ }
+
+ void TestRun_ReadMessages_Output_Base64() {
+ ui64 limit = 3;
+ RunTest(
+ {
+ "message1",
+ "message2",
+ "message3",
+ },
+ {
+ "bWVzc2FnZTE=",
+ "bWVzc2FnZTI=",
+ "bWVzc2FnZTM=",
+ },
+ "\n", TTopicReaderSettings(limit, false, false, EOutputFormat::NewlineDelimited, {}, ETransformBody::Base64, TDuration::Seconds(1)));
+ }
+
+ void TestRun_Read_Less_Messages_Than_Sent() {
+ ui64 limit = 2;
+ RunTest(
+ {
+ "message1",
+ "message2",
+ "message3",
+ },
+ {
+ "message1message2",
+ },
+ "", TTopicReaderSettings(limit, false, false, EOutputFormat::Default, {}, ETransformBody::None, TDuration::Seconds(1)));
+ }
+
+ private:
+ void WriteTestData(NYdb::TDriver* driver, const TString& topicPath, const TVector<TString>& data) {
+ auto writer = CreateSimpleWriter(*driver, topicPath, "source1", {}, TString("raw"));
+ for (size_t i = 1; i <= data.size(); ++i) {
+ UNIT_ASSERT(writer->Write(data[i - 1], i));
+ }
+ writer->Close();
+ }
+
+ void RunTest(
+ const TVector<TString>& dataToWrite,
+ const TVector<TString>& expected,
+ const TString& delimiter,
+ TTopicReaderSettings&& settings) {
+ Cerr << "=== Starting PQ server\n";
+ // NOTE(shmel1k@): old PQ configuration. Waiting for topicservice.
+ TPersQueueV1TestServer server;
+ Cerr << "=== Started PQ server\n";
+
+ SET_LOCALS;
+
+ const TString topicPath = server.GetTopic();
+ auto driver = server.Server->AnnoyingClient->GetDriver();
+ server.Server->AnnoyingClient->CreateConsumer("cli");
+ NPersQueue::TPersQueueClient persQueueClient(*driver);
+
+ WriteTestData(driver, topicPath, dataToWrite);
+ NTopic::TReadSessionSettings readSessionSettings = PrepareReadSessionSettings(topicPath);
+ IReadSession sess = CreateTopicReader(*driver, readSessionSettings);
+ TTopicReader reader(sess, settings);
+ reader.Init();
+
+ TStringStream output;
+ int status = reader.Run(output);
+ UNIT_ASSERT_EQUAL(status, 0);
+
+ TVector<TString> split;
+ Split(output.Str(), delimiter, split);
+
+ UNIT_ASSERT_VALUES_EQUAL(split.size(), expected.size());
+ for (size_t i = 0; i < split.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(split[i], expected[i]);
+ }
+ }
+
+ NTopic::TReadSessionSettings PrepareReadSessionSettings(const TString& topicPath) {
+ NTopic::TReadSessionSettings settings;
+ settings.ConsumerName("cli");
+ settings.AppendTopics(topicPath);
+
+ return settings;
+ }
+
+ private:
+ IReadSession CreateTopicReader(NYdb::TDriver& driver, NYdb::NTopic::TReadSessionSettings& settings) {
+ return NTopic::TTopicClient(driver).CreateReadSession(settings);
+ }
+ };
+
+ UNIT_TEST_SUITE_REGISTRATION(TTopicReaderTests);
+} // namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/topic/topic_util.h b/ydb/public/lib/ydb_cli/topic/topic_util.h
new file mode 100644
index 0000000000..54fd9ee7de
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/topic_util.h
@@ -0,0 +1,41 @@
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+namespace NYdb::NConsoleClient {
+ namespace {
+ constexpr TDuration DefaultCheckTopicExistenceTimeout = TDuration::Seconds(1);
+ }
+
+ class TTopicInitializationChecker {
+ public:
+ TTopicInitializationChecker(NTopic::TTopicClient& topicClient)
+ : TopicClient_(topicClient) {
+ }
+
+ void CheckTopicExistence(TString& topicPath, TString consumer = "", TDuration timeout = DefaultCheckTopicExistenceTimeout) {
+ NTopic::TAsyncDescribeTopicResult descriptionFuture = TopicClient_.DescribeTopic(topicPath);
+ descriptionFuture.Wait(timeout);
+ NTopic::TDescribeTopicResult description = descriptionFuture.GetValueSync();
+ ThrowOnError(description);
+
+ if (consumer == "") {
+ return;
+ }
+
+ bool hasConsumer = false;
+ for (const auto& rr : description.GetTopicDescription().GetConsumers()) {
+ if (rr.GetConsumerName() == consumer) {
+ hasConsumer = true;
+ break;
+ }
+ }
+
+ if (!hasConsumer) {
+ throw yexception() << "No consumer \"" << consumer << "\" found";
+ }
+ }
+
+ private:
+ NTopic::TTopicClient TopicClient_;
+ };
+} // namespace NYdb::NConsoleClient \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.cpp b/ydb/public/lib/ydb_cli/topic/topic_write.cpp
new file mode 100644
index 0000000000..93d8c09eb6
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/topic_write.cpp
@@ -0,0 +1,207 @@
+#include "topic_util.h"
+#include "topic_write.h"
+#include <contrib/libs/openssl/include/openssl/sha.h>
+#include <util/generic/overloaded.h>
+#include <util/stream/tokenizer.h>
+#include <util/string/hex.h>
+
+namespace NYdb::NConsoleClient {
+ namespace {
+ constexpr TDuration DefaultMessagesWaitTimeout = TDuration::Seconds(1);
+ }
+
+ TTopicWriterParams::TTopicWriterParams() {
+ }
+
+ TTopicWriterParams::TTopicWriterParams(EOutputFormat inputFormat, TMaybe<TString> delimiter,
+ ui64 messageSizeLimit, TMaybe<TDuration> batchDuration,
+ TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount)
+ : InputFormat_(inputFormat)
+ , BatchDuration_(batchDuration)
+ , BatchSize_(batchSize)
+ , BatchMessagesCount_(batchMessagesCount)
+ , MessageSizeLimit_(messageSizeLimit) {
+ if (inputFormat == EOutputFormat::NewlineDelimited || inputFormat == EOutputFormat::Concatenated) {
+ Delimiter_ = TMaybe<char>('\n');
+ }
+ if (delimiter.Defined()) {
+ // TODO(shmel1k@): remove when string delimiter is supported.
+ // TODO(shmel1k@): think about better way.
+ if (delimiter == "\\n") {
+ Delimiter_ = TMaybe<char>('\n');
+ return;
+ }
+ if (delimiter == "\\t") {
+ Delimiter_ = TMaybe<char>('\t');
+ return;
+ }
+ if (delimiter == "\r") {
+ Delimiter_ = TMaybe<char>('\r');
+ return;
+ }
+ if (delimiter == "\0") {
+ Delimiter_ = TMaybe<char>('\0');
+ return;
+ }
+ if (delimiter == "") {
+ Delimiter_ = Nothing();
+ return;
+ }
+
+ Y_ENSURE(delimiter->Size() == 1, "Invalid delimiter size, should be <= 1");
+ Delimiter_ = TMaybe<char>(delimiter->at(0));
+ }
+ }
+
+ TTopicWriter::TTopicWriter() {
+ }
+
+ TTopicWriter::TTopicWriter(std::shared_ptr<NYdb::NPersQueue::IWriteSession> writeSession,
+ TTopicWriterParams params)
+ : WriteSession_(writeSession)
+ , WriterParams_(params) {
+ }
+
+ int TTopicWriter::Init() {
+ TInstant endPreparationTime = Now() + DefaultMessagesWaitTimeout;
+ NThreading::TFuture<ui64> initSeqNo = WriteSession_->GetInitSeqNo();
+
+ while (Now() < endPreparationTime) {
+ // TODO(shmel1k@): handle situation if seqNo already exists but with exception.
+ if (!initSeqNo.HasValue() && !initSeqNo.Wait(TDuration::Seconds(1))) {
+ // TODO(shmel1k@): change logs
+ Cerr << "no init seqno yet" << Endl;
+ continue;
+ }
+ break;
+ }
+
+ if (!initSeqNo.HasValue()) {
+ // TODO(shmel1k@): logging
+ return EXIT_FAILURE;
+ }
+
+ CurrentSeqNo_ = initSeqNo.GetValue() + 1;
+ return EXIT_SUCCESS;
+ }
+
+ int TTopicWriter::HandleAcksEvent(const NPersQueue::TWriteSessionEvent::TAcksEvent& event) {
+ Y_UNUSED(event);
+ return EXIT_SUCCESS;
+ }
+
+ int TTopicWriter::HandleReadyToAcceptEvent(NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ ContinuationToken_ = std::move(event.ContinuationToken);
+ return EXIT_SUCCESS;
+ }
+
+ int TTopicWriter::HandleSessionClosedEvent(const NPersQueue::TSessionClosedEvent& event) {
+ ThrowOnError(event);
+ return EXIT_FAILURE;
+ }
+
+ int TTopicWriter::HandleEvent(NPersQueue::TWriteSessionEvent::TEvent& event) {
+ return std::visit(TOverloaded{
+ [&](const NPersQueue::TWriteSessionEvent::TAcksEvent& event) {
+ return HandleAcksEvent(event);
+ },
+ [&](NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ return HandleReadyToAcceptEvent(event);
+ },
+ [&](const NPersQueue::TSessionClosedEvent& event) {
+ return HandleSessionClosedEvent(event);
+ },
+ },
+ event);
+ }
+
+ TTopicWriter::TSendMessageData TTopicWriter::EnterMessage(IInputStream& input) {
+ // TODO(shmel1k@): add interruption here.
+ // TODO(shmel1k@): add JSONStreamReader & etc interfaces.
+ // TODO(shmel1k@): add stream parsing here & improve performance.
+ if (!WriterParams_.Delimiter().Defined()) {
+ // TODO(shmel1k@): interruption?
+ return TSendMessageData{
+ .Data = input.ReadAll(),
+ .NeedSend = true,
+ .ContinueSending = false,
+ };
+ }
+
+ TString buffer;
+ char delimiter = *(WriterParams_.Delimiter());
+ size_t read = input.ReadTo(buffer, delimiter);
+ if (read == 0) {
+ return TSendMessageData{
+ .Data = "",
+ .NeedSend = false,
+ .ContinueSending = false,
+ };
+ }
+ return TSendMessageData{
+ .Data = buffer,
+ .NeedSend = true,
+ .ContinueSending = true,
+ };
+ }
+
+ int TTopicWriter::Run(IInputStream& input) {
+ // TODO(shmel1k@): add notificator about failures.
+ bool continueSending = true;
+ while (continueSending) {
+ while (!ContinuationToken_.Defined()) {
+ TMaybe<NPersQueue::TWriteSessionEvent::TEvent> event = WriteSession_->GetEvent(true);
+ if (event.Empty()) {
+ continue;
+ }
+ if (int status = HandleEvent(*event); status) {
+ return status;
+ }
+ }
+ TTopicWriter::TSendMessageData message = EnterMessage(input);
+ continueSending = message.ContinueSending;
+ if (!message.NeedSend) {
+ continue;
+ }
+
+ WriteSession_->Write(std::move(*ContinuationToken_), std::move(message.Data), CurrentSeqNo_++);
+ ContinuationToken_ = Nothing();
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ bool TTopicWriter::Close(TDuration closeTimeout) {
+ Y_UNUSED(closeTimeout);
+ if (WriteSession_->Close(TDuration::Hours(12))) {
+ return true;
+ }
+ TVector<NPersQueue::TWriteSessionEvent::TEvent> events = WriteSession_->GetEvents(true);
+ if (events.empty()) {
+ return false;
+ }
+ for (auto& evt : events) {
+ bool hasFailure = false;
+ std::visit(TOverloaded{
+ [&](const NPersQueue::TWriteSessionEvent::TAcksEvent& event) {
+ Y_UNUSED(event);
+ },
+ [&](NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent& event) {
+ Y_UNUSED(event);
+ },
+ [&](const NPersQueue::TSessionClosedEvent& event) {
+ int result = HandleSessionClosedEvent(event);
+ if (result == EXIT_FAILURE) {
+ hasFailure = true;
+ }
+ Y_UNUSED(result);
+ },
+ },
+ evt);
+ if (hasFailure) {
+ return false;
+ }
+ }
+ return true;
+ }
+} // namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write.h b/ydb/public/lib/ydb_cli/topic/topic_write.h
new file mode 100644
index 0000000000..445e3c9016
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/topic_write.h
@@ -0,0 +1,79 @@
+#pragma once
+
+#include "ydb/public/lib/ydb_cli/commands/ydb_command.h"
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+#include <ydb/public/lib/ydb_cli/common/format.h>
+#include <ydb/public/lib/ydb_cli/common/interruptible.h>
+
+namespace NYdb::NConsoleClient {
+#define GETTER(TYPE, NAME) \
+ TYPE NAME() const { \
+ return NAME##_; \
+ }
+
+ class TTopicWriterParams {
+ public:
+ TTopicWriterParams();
+ TTopicWriterParams(EOutputFormat inputFormat, TMaybe<TString> delimiter,
+ ui64 messageSizeLimit, TMaybe<TDuration> batchDuration,
+ TMaybe<ui64> batchSize, TMaybe<ui64> batchMessagesCount);
+ TTopicWriterParams(const TTopicWriterParams&) = default;
+ TTopicWriterParams(TTopicWriterParams&&) = default;
+
+ GETTER(TMaybe<char>, Delimiter);
+ GETTER(TMaybe<TDuration>, BatchDuration);
+ GETTER(TMaybe<ui64>, BatchSize);
+ GETTER(TMaybe<ui64>, BatchMessagesCount);
+ GETTER(ui64, MessageSizeLimit);
+ GETTER(EOutputFormat, InputFormat);
+
+ private:
+ TMaybe<TString> File_;
+ TMaybe<char> Delimiter_;
+ EOutputFormat InputFormat_ = EOutputFormat::Default;
+
+ // TODO(shmel1k@): move to 'TWithBatchingCommand' or something like that.
+ TMaybe<TDuration> BatchDuration_;
+ TMaybe<ui64> BatchSize_;
+ TMaybe<ui64> BatchMessagesCount_;
+
+ ui64 MessageSizeLimit_ = 0;
+ };
+
+ class TTopicWriterTests;
+
+ // TODO(shmel1k@): think about interruption here.
+ class TTopicWriter {
+ public:
+ TTopicWriter();
+ TTopicWriter(const TTopicWriter&) = default;
+ TTopicWriter(TTopicWriter&&) = default;
+ TTopicWriter(std::shared_ptr<NPersQueue::IWriteSession>, TTopicWriterParams);
+
+ bool Close(TDuration closeTimeout = TDuration::Max());
+ int Init();
+ int Run(IInputStream&);
+
+ private:
+ struct TSendMessageData {
+ TString Data = "";
+ bool NeedSend = false;
+ bool ContinueSending = false;
+ };
+
+ int HandleEvent(NPersQueue::TWriteSessionEvent::TEvent&);
+ int HandleAcksEvent(const NPersQueue::TWriteSessionEvent::TAcksEvent&);
+ int HandleReadyToAcceptEvent(NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent&);
+ int HandleSessionClosedEvent(const NPersQueue::TSessionClosedEvent&);
+ TTopicWriter::TSendMessageData EnterMessage(IInputStream&); // TODO(shmel1k@): make static or like a helper function
+
+ std::shared_ptr<NPersQueue::IWriteSession> WriteSession_;
+ const TTopicWriterParams WriterParams_;
+
+ TMaybe<NPersQueue::TContinuationToken> ContinuationToken_ = Nothing();
+
+ ui64 CurrentSeqNo_ = 0;
+
+ friend class TTopicWriterTests;
+ };
+} // namespace NYdb::NConsoleClient \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
new file mode 100644
index 0000000000..8540e64778
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
@@ -0,0 +1,287 @@
+#include "topic_write.h"
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NYdb::NConsoleClient {
+ class TTopicWriterTests: public NUnitTest::TTestBase {
+ using TMessages = TVector<TTopicWriter::TSendMessageData>;
+
+ public:
+ UNIT_TEST_SUITE(TTopicWriterTests);
+ UNIT_TEST(TestEnterMessage_EmptyInput);
+ UNIT_TEST(TestEnterMessage_OnlyDelimiters);
+ UNIT_TEST(TestEnterMessage_1KiB_No_Delimiter);
+ UNIT_TEST(TestEnterMessage_1KiB_Newline_Delimiter);
+ UNIT_TEST(TestEnterMessage_1KiB_Newline_Delimited_With_Two_Delimiters_In_A_Row);
+ UNIT_TEST(TestEnterMessage_SomeBinaryData);
+ UNIT_TEST(TestEnterMessage_ZeroSymbol_Delimited);
+ UNIT_TEST(TestEnterMessage_Custom_Delimiter_Delimited);
+
+ UNIT_TEST(TestTopicWriterParams_Format_NewlineDelimited);
+ UNIT_TEST(TestTopicWriterParams_Format_Concatenated);
+ UNIT_TEST(TestTopicWriterParams_No_Delimiter);
+ UNIT_TEST(TestTopicWriterParams_InvalidDelimiter);
+ UNIT_TEST_SUITE_END();
+
+ void TestEnterMessage_EmptyInput() {
+ TStringStream str;
+ TTopicWriter wr;
+ TMessages got = EnterMessageHelper(wr, str);
+ AssertMessagesEqual({{
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = false,
+ }},
+ got);
+ }
+
+ void TestEnterMessage_OnlyDelimiters() {
+ TStringStream str = TString("\n") * 6;
+ TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\n", 0, Nothing(), Nothing(), Nothing()));
+ TMessages got = EnterMessageHelper(wr, str);
+ AssertMessagesEqual({
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = false,
+ .ContinueSending = false,
+ },
+ },
+ got);
+ }
+
+ void TestEnterMessage_1KiB_No_Delimiter() {
+ /*
+ Only one message with a * 1024 size expected
+ */
+ TStringStream str = TString("a") * 1_KB;
+ TTopicWriter wr;
+ TMessages got = EnterMessageHelper(wr, str);
+ AssertMessagesEqual({
+ {
+ .Data = TString("a") * 1_KB,
+ .NeedSend = true,
+ .ContinueSending = false,
+ },
+ },
+ got);
+ }
+
+ void TestEnterMessage_1KiB_Newline_Delimiter() {
+ TStringStream str = TString("a") * 512 + "\n" + TString("b") * 512;
+ TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\n", 0, Nothing(), Nothing(), Nothing()));
+ TMessages got = EnterMessageHelper(wr, str);
+ AssertMessagesEqual({
+ {
+ .Data = TString("a") * 512,
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = TString("b") * 512,
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = false,
+ .ContinueSending = false,
+ },
+ },
+ got);
+ }
+
+ void TestEnterMessage_1KiB_Newline_Delimited_With_Two_Delimiters_In_A_Row() {
+ TStringStream str = TString("a") * 512 + "\n\n" + TString("b") * 512;
+ TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing()));
+ TMessages got = EnterMessageHelper(wr, str);
+ AssertMessagesEqual({
+ {
+ .Data = TString("a") * 512,
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = TString("b") * 512,
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = false,
+ .ContinueSending = false,
+ },
+ },
+ got);
+ }
+
+ void TestEnterMessage_SomeBinaryData() {
+ TStringStream str = TString("\0\0\n\n\r\n\r\n");
+ TTopicWriter wr;
+ TMessages got = EnterMessageHelper(wr, str);
+ AssertMessagesEqual({
+ {
+ .Data = TString("\0\0\n\n\r\n\r\n"),
+ .NeedSend = true,
+ .ContinueSending = false,
+ },
+ },
+ got);
+ }
+
+ void TestEnterMessage_ZeroSymbol_Delimited() {
+ auto& s = "\0\0\0\0\n\nprivet";
+ TStringStream str = TString(std::begin(s), std::end(s));
+ TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "\0", 0, Nothing(), Nothing(), Nothing()));
+ TMessages got = EnterMessageHelper(wr, str);
+ AssertMessagesEqual({
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "\n\nprivet",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = false,
+ .ContinueSending = false,
+ },
+ },
+ got);
+ }
+
+ void TestEnterMessage_Custom_Delimiter_Delimited() {
+ TStringStream str = TString("privet_vasya_kak_dela?");
+ TTopicWriter wr(nullptr, TTopicWriterParams(EOutputFormat::Default, "_", 0, Nothing(), Nothing(), Nothing()));
+ TMessages got = EnterMessageHelper(wr, str);
+ AssertMessagesEqual({
+ {
+ .Data = "privet",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "vasya",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "kak",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "dela?",
+ .NeedSend = true,
+ .ContinueSending = true,
+ },
+ {
+ .Data = "",
+ .NeedSend = false,
+ .ContinueSending = false,
+ },
+ },
+ got);
+ }
+
+ void TestTopicWriterParams_Format_NewlineDelimited() {
+ TTopicWriterParams p = TTopicWriterParams(EOutputFormat::NewlineDelimited, Nothing(), 0, Nothing(), Nothing(), Nothing());
+ UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::NewlineDelimited);
+ UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n');
+ }
+
+ void TestTopicWriterParams_Format_Concatenated() {
+ TTopicWriterParams p = TTopicWriterParams(EOutputFormat::Concatenated, Nothing(), 0, Nothing(), Nothing(), Nothing());
+ UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::Concatenated);
+ UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), '\n');
+ }
+
+ void TestTopicWriterParams_No_Delimiter() {
+ TTopicWriterParams p = TTopicWriterParams(EOutputFormat::Default, Nothing(), 0, Nothing(), Nothing(), Nothing());
+ UNIT_ASSERT_VALUES_EQUAL(p.InputFormat(), EOutputFormat::Default);
+ UNIT_ASSERT_VALUES_EQUAL(p.Delimiter(), Nothing());
+ }
+
+ void TestTopicWriterParams_InvalidDelimiter() {
+ UNIT_ASSERT_EXCEPTION(TTopicWriterParams(EOutputFormat::Default, "invalid", 0, Nothing(), Nothing(), Nothing()), yexception);
+ }
+
+ private:
+ TMessages EnterMessageHelper(TTopicWriter& wr, IInputStream& input) {
+ TMessages result;
+ while (true) {
+ TTopicWriter::TSendMessageData message = wr.EnterMessage(input);
+ result.emplace_back(message);
+ if (result.back().ContinueSending == false) {
+ break;
+ }
+ }
+ return result;
+ }
+
+ void AssertMessagesEqual(TMessages want, TMessages& got) {
+ UNIT_ASSERT_VALUES_EQUAL(want.size(), got.size());
+ for (size_t i = 0; i < want.size(); ++i) {
+ TTopicWriter::TSendMessageData w = want[i];
+ TTopicWriter::TSendMessageData g = got[i];
+ UNIT_ASSERT_VALUES_EQUAL(g.ContinueSending, w.ContinueSending);
+ UNIT_ASSERT_VALUES_EQUAL(g.Data, w.Data);
+ UNIT_ASSERT_VALUES_EQUAL(g.NeedSend, w.NeedSend);
+ }
+ }
+ };
+
+ UNIT_TEST_SUITE_REGISTRATION(TTopicWriterTests);
+} // namespace NYdb::NConsoleClient
diff --git a/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.darwin.txt b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..7d61324672
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.darwin.txt
@@ -0,0 +1,60 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-public-lib-ydb_cli-topic-ut)
+target_compile_options(ydb-public-lib-ydb_cli-topic-ut PRIVATE
+ -DACTORLIB_HUGE_PB_SIZE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-public-lib-ydb_cli-topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic
+)
+target_link_libraries(ydb-public-lib-ydb_cli-topic-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ topic
+ cpp-histogram-hdr
+ cpp-threading-local_executor
+ yq-libs-private_client
+ cpp-client-ydb_persqueue_public
+ public-lib-experimental
+ clicommands
+ common
+ public-lib-yq
+ public-lib-yson_value
+ cpp-client-ydb_proto
+ ydb_persqueue_core-ut-ut_utils
+)
+target_link_options(ydb-public-lib-ydb_cli-topic-ut PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-public-lib-ydb_cli-topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp
+)
+add_test(
+ NAME
+ ydb-public-lib-ydb_cli-topic-ut
+ COMMAND
+ ydb-public-lib-ydb_cli-topic-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-public-lib-ydb_cli-topic-ut)
diff --git a/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.linux.txt b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.linux.txt
new file mode 100644
index 0000000000..d23103b4dc
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.linux.txt
@@ -0,0 +1,64 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-public-lib-ydb_cli-topic-ut)
+target_compile_options(ydb-public-lib-ydb_cli-topic-ut PRIVATE
+ -DACTORLIB_HUGE_PB_SIZE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-public-lib-ydb_cli-topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic
+)
+target_link_libraries(ydb-public-lib-ydb_cli-topic-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ topic
+ cpp-histogram-hdr
+ cpp-threading-local_executor
+ yq-libs-private_client
+ cpp-client-ydb_persqueue_public
+ public-lib-experimental
+ clicommands
+ common
+ public-lib-yq
+ public-lib-yson_value
+ cpp-client-ydb_proto
+ ydb_persqueue_core-ut-ut_utils
+)
+target_link_options(ydb-public-lib-ydb_cli-topic-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-public-lib-ydb_cli-topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_write_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/topic/topic_read_ut.cpp
+)
+add_test(
+ NAME
+ ydb-public-lib-ydb_cli-topic-ut
+ COMMAND
+ ydb-public-lib-ydb_cli-topic-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-public-lib-ydb_cli-topic-ut)
diff --git a/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.txt b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.txt
new file mode 100644
index 0000000000..fc7b1ee73c
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/topic/ut/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (UNIX AND NOT APPLE)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp
index 5aa49006b8..17bc321c25 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp
@@ -191,4 +191,9 @@ TThreadPoolExecutor::TThreadPoolExecutor(size_t threadsCount)
ThreadsCount = threadsCount;
}
+IExecutor::TPtr CreateSyncExecutor()
+{
+ return MakeIntrusive<TSyncExecutor>();
+}
+
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index edb4a770ad..e5de6e3b64 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -168,8 +168,10 @@ private:
};
template <bool UseMigrationProtocol>
-class TDataDecompressionInfo {
+class TDataDecompressionInfo : public std::enable_shared_from_this<TDataDecompressionInfo<UseMigrationProtocol>> {
public:
+ using TPtr = std::shared_ptr<TDataDecompressionInfo<UseMigrationProtocol>>;
+
TDataDecompressionInfo(const TDataDecompressionInfo&) = default;
TDataDecompressionInfo(TDataDecompressionInfo&&) = default;
TDataDecompressionInfo(
@@ -205,6 +207,10 @@ public:
return ServerMessage;
}
+ bool GetDoDecompress() const {
+ return DoDecompress;
+ }
+
TMaybe<std::pair<size_t, size_t>> GetReadyThreshold() const {
size_t readyCount = 0;
std::pair<size_t, size_t> ret;
@@ -252,7 +258,7 @@ private:
};
struct TDecompressionTask {
- explicit TDecompressionTask(TDataDecompressionInfo* parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TReadyMessageThreshold* ready);
+ TDecompressionTask(TDataDecompressionInfo::TPtr parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TReadyMessageThreshold* ready);
// Decompress and notify about memory consumption changes.
void operator()();
@@ -267,7 +273,7 @@ private:
}
private:
- TDataDecompressionInfo* Parent;
+ TDataDecompressionInfo::TPtr Parent;
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream;
i64 SourceDataSize = 0;
i64 EstimatedDecompressedSize = 0;
@@ -299,6 +305,33 @@ private:
TAdaptiveLock DecompressionErrorsStructLock;
std::vector<std::vector<std::exception_ptr>> DecompressionErrors;
};
+
+template <bool UseMigrationProtocol>
+class TDataDecompressionEvent {
+public:
+ TDataDecompressionEvent(size_t batch, size_t message, typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent, std::atomic<bool> &ready) :
+ Batch{batch},
+ Message{message},
+ Parent{std::move(parent)},
+ Ready{ready}
+ {
+ }
+
+ bool IsReady() const {
+ return Ready;
+ }
+
+ bool TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages,
+ size_t* maxByteSize) const;
+
+private:
+ size_t Batch;
+ size_t Message;
+ typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr Parent;
+ std::atomic<bool> &Ready;
+};
template <bool UseMigrationProtocol>
struct IUserRetrievedEventCallback {
@@ -317,6 +350,7 @@ struct TReadSessionEventInfo {
// Event with only partition stream ref.
// Partition stream holds all its events.
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream;
+ size_t DataCount = 0;
TMaybe<TEvent> Event;
std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> Session;
@@ -333,34 +367,16 @@ struct TReadSessionEventInfo {
// Data event.
TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session);
- TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
- TVector<TMessage> messages,
- TVector<TCompressedMessage> compressedMessages);
-
bool IsEmpty() const;
bool IsDataEvent() const;
- // Takes data. Returns true if event has more unpacked data.
- bool TakeData(TVector<TMessage>* messages,
- TVector<TCompressedMessage>* comressedMessages,
- size_t* maxByteSize);
-
TEvent& GetEvent() {
Y_ASSERT(Event);
return *Event;
}
- // Move event to partition stream queue.
- void MoveToPartitionStream();
-
- void ExtractFromPartitionStream();
-
void OnUserRetrievedEvent();
- bool HasMoreData() const; // Has unread data.
- bool HasReadyUnreadData() const; // Has ready unread data.
-
bool IsSessionClosedEvent() const {
return Event && std::holds_alternative<TASessionClosedEvent<UseMigrationProtocol>>(*Event);
}
@@ -371,18 +387,20 @@ template <bool UseMigrationProtocol>
struct TRawPartitionStreamEvent {
using TEvent = typename TAReadSessionEvent<UseMigrationProtocol>::TEvent;
- std::variant<TDataDecompressionInfo<UseMigrationProtocol>, TEvent> Event;
- bool Signalled = false;
+ std::variant<TDataDecompressionEvent<UseMigrationProtocol>, TEvent> Event;
TRawPartitionStreamEvent(const TRawPartitionStreamEvent&) = default;
TRawPartitionStreamEvent(TRawPartitionStreamEvent&&) = default;
- TRawPartitionStreamEvent(
- TPartitionData<UseMigrationProtocol>&& msg,
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session,
- bool doDecompress
- )
- : Event(std::in_place_type_t<TDataDecompressionInfo<UseMigrationProtocol>>(), std::move(msg), std::move(session), doDecompress)
+ TRawPartitionStreamEvent(size_t batch,
+ size_t message,
+ typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent,
+ std::atomic<bool> &ready)
+ : Event(std::in_place_type_t<TDataDecompressionEvent<UseMigrationProtocol>>(),
+ batch,
+ message,
+ std::move(parent),
+ ready)
{
}
@@ -393,17 +411,12 @@ struct TRawPartitionStreamEvent {
}
bool IsDataEvent() const {
- return std::holds_alternative<TDataDecompressionInfo<UseMigrationProtocol>>(Event);
- }
-
- const TDataDecompressionInfo<UseMigrationProtocol>& GetData() const {
- Y_ASSERT(IsDataEvent());
- return std::get<TDataDecompressionInfo<UseMigrationProtocol>>(Event);
+ return std::holds_alternative<TDataDecompressionEvent<UseMigrationProtocol>>(Event);
}
- TDataDecompressionInfo<UseMigrationProtocol>& GetData() {
+ const TDataDecompressionEvent<UseMigrationProtocol>& GetDataEvent() const {
Y_ASSERT(IsDataEvent());
- return std::get<TDataDecompressionInfo<UseMigrationProtocol>>(Event);
+ return std::get<TDataDecompressionEvent<UseMigrationProtocol>>(Event);
}
TEvent& GetEvent() {
@@ -417,13 +430,60 @@ struct TRawPartitionStreamEvent {
}
bool IsReady() const {
- return !IsDataEvent() || GetData().IsReady();
- }
+ if (!IsDataEvent()) {
+ return true;
+ }
- void Signal(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TReadSessionEventsQueue<UseMigrationProtocol>* queue, TDeferredActions<UseMigrationProtocol>& deferred);
+ return std::get<TDataDecompressionEvent<UseMigrationProtocol>>(Event).IsReady();
+ }
};
+template <bool UseMigrationProtocol>
+class TRawPartitionStreamEventQueue {
+public:
+ TRawPartitionStreamEventQueue() = default;
+
+ template <class... Ts>
+ TRawPartitionStreamEvent<UseMigrationProtocol>& emplace_back(Ts&&... event)
+ {
+ return NotReady.emplace_back(std::forward<Ts>(event)...);
+ }
+
+ bool empty() const
+ {
+ return Ready.empty() && NotReady.empty();
+ }
+
+ TRawPartitionStreamEvent<UseMigrationProtocol>& front()
+ {
+ Y_VERIFY(!empty());
+
+ return (Ready.empty() ? NotReady : Ready).front();
+ }
+
+ void pop_front()
+ {
+ Y_VERIFY(!empty());
+ (Ready.empty() ? NotReady : Ready).pop_front();
+ }
+
+ void pop_back()
+ {
+ Y_VERIFY(!empty());
+
+ (NotReady.empty() ? Ready : NotReady).pop_back();
+ }
+
+ void SignalReadyEvents(TPartitionStreamImpl<UseMigrationProtocol>& stream,
+ TReadSessionEventsQueue<UseMigrationProtocol>& queue,
+ TDeferredActions<UseMigrationProtocol>& deferred);
+ void DeleteNotReadyTail();
+
+private:
+ std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> Ready;
+ std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> NotReady;
+};
template <bool UseMigrationProtocol>
class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
@@ -517,12 +577,13 @@ public:
EventsQueue.emplace_back(std::forward<T>(event));
}
- TDataDecompressionInfo<UseMigrationProtocol>& InsertDataEvent(
- TPartitionData<UseMigrationProtocol>&& msg,
- bool doDecompress
- ) {
+ void InsertDataEvent(size_t batch,
+ size_t message,
+ typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent,
+ std::atomic<bool> &ready)
+ {
++DataDecompressionEventsCount;
- return EventsQueue.emplace_back(std::move(msg), Session, doDecompress).GetData();
+ EventsQueue.emplace_back(batch, message, std::move(parent), ready);
}
bool IsWaitingForDataDecompression() const {
@@ -537,10 +598,6 @@ public:
return EventsQueue.front();
}
- const TRawPartitionStreamEvent<UseMigrationProtocol>& TopEvent() const {
- return EventsQueue.front();
- }
-
void PopEvent() {
if (EventsQueue.front().IsDataEvent()) {
--DataDecompressionEventsCount;
@@ -615,6 +672,7 @@ public:
return true;
}
+ void DeleteNotReadyTail();
private:
const TKey Key;
@@ -622,7 +680,7 @@ private:
ui64 FirstNotReadOffset;
std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session;
typename IErrorHandler<UseMigrationProtocol>::TPtr ErrorHandler;
- std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> EventsQueue;
+ TRawPartitionStreamEventQueue<UseMigrationProtocol> EventsQueue;
size_t DataDecompressionEventsCount = 0;
ui64 MaxReadOffset = 0;
ui64 MaxCommittedOffset = 0;
@@ -646,50 +704,45 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
public:
explicit TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session);
- TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> GetDataEventImpl(TReadSessionEventInfo<UseMigrationProtocol>& srcDataEventInfo, size_t* maxByteSize); // Assumes that we're under lock.
+ typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent
+ GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, size_t* maxByteSize); // Assumes that we're under lock.
- TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> TryGetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock.
+ TReadSessionEventInfo<UseMigrationProtocol> GetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock.
Y_ASSERT(TParent::HasEventsImpl());
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages;
+
if (!TParent::Events.empty()) {
- TReadSessionEventInfo<UseMigrationProtocol> event = std::move(TParent::Events.front());
- TParent::Events.pop();
- TParent::RenewWaiterImpl();
- auto partitionStream = event.PartitionStream;
+ TReadSessionEventInfo<UseMigrationProtocol>& front = TParent::Events.front();
+ auto partitionStream = front.PartitionStream;
if (!partitionStream->HasEvents()) {
Y_FAIL("can't be here - got events in global queue, but nothing in partition queue");
- return Nothing();
}
+ TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> event;
+
if (partitionStream->TopEvent().IsDataEvent()) {
- return GetDataEventImpl(event, maxByteSize);
+ event = GetDataEventImpl(partitionStream, maxByteSize);
+ } else {
+ event = std::move(partitionStream->TopEvent().GetEvent());
+ partitionStream->PopEvent();
+
+ TParent::Events.pop();
}
- event = TReadSessionEventInfo<UseMigrationProtocol>(partitionStream.Get(), event.Session, partitionStream->TopEvent().GetEvent());
- partitionStream->PopEvent();
- return event;
+ TParent::RenewWaiterImpl();
+
+ return {partitionStream, front.Session, std::move(*event)};
}
Y_ASSERT(TParent::CloseEvent);
- return TReadSessionEventInfo<UseMigrationProtocol>(*TParent::CloseEvent, Session);
- }
- TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> GetEventImpl(size_t* maxByteSize) { // Assumes that we're under lock and that the event queue has events.
- do {
- TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> result = TryGetEventImpl(maxByteSize); // We could have read all the data in current message previous time.
- if (result) {
- return result;
- }
- } while (TParent::HasEventsImpl());
- return Nothing();
+ return {*TParent::CloseEvent, Session};
}
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(), size_t maxByteSize = std::numeric_limits<size_t>::max()) {
TVector<TReadSessionEventInfo<UseMigrationProtocol>> eventInfos;
const size_t maxCount = maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max();
TDeferredActions<UseMigrationProtocol> deferred;
- std::vector<TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>>> partitionStreamsForSignalling;
with_lock (TParent::Mutex) {
eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount));
do {
@@ -700,23 +753,14 @@ public:
ApplyCallbacksToReadyEventsImpl(deferred);
while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) {
- TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> event = GetEventImpl(&maxByteSize);
- if (event) {
- const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStreamForSignalling = event->IsDataEvent() ? event->PartitionStream : nullptr;
- eventInfos.emplace_back(std::move(*event));
- if (eventInfos.back().IsSessionClosedEvent()) {
- break;
- }
- if (partitionStreamForSignalling) {
- partitionStreamsForSignalling.emplace_back(std::move(partitionStreamForSignalling));
- }
+ TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(&maxByteSize);
+ eventInfos.emplace_back(std::move(event));
+ if (eventInfos.back().IsSessionClosedEvent()) {
+ break;
}
}
} while (block && (eventInfos.empty() || eventInfos.back().IsSessionClosedEvent()));
ApplyCallbacksToReadyEventsImpl(deferred);
- for (const auto& partitionStreamForSignalling : partitionStreamsForSignalling) {
- SignalReadyEventsImpl(partitionStreamForSignalling.Get(), deferred);
- }
}
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> result;
@@ -732,7 +776,6 @@ public:
TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo;
TDeferredActions<UseMigrationProtocol> deferred;
with_lock (TParent::Mutex) {
- TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStreamForSignalling;
do {
if (block) {
TParent::WaitEventsImpl();
@@ -742,17 +785,11 @@ public:
if (TParent::HasEventsImpl()) {
eventInfo = GetEventImpl(&maxByteSize);
- if (eventInfo && eventInfo->IsDataEvent()) {
- partitionStreamForSignalling = eventInfo->PartitionStream;
- }
} else if (!appliedCallbacks) {
return Nothing();
}
} while (block && !eventInfo);
ApplyCallbacksToReadyEventsImpl(deferred);
- if (partitionStreamForSignalling) {
- SignalReadyEventsImpl(partitionStreamForSignalling.Get(), deferred);
- }
}
if (eventInfo) {
eventInfo->OnUserRetrievedEvent();
@@ -780,12 +817,17 @@ public:
bool ApplyCallbacksToReadyEventsImpl(TDeferredActions<UseMigrationProtocol>& deferred);
// Push usual event.
- void PushEvent(TReadSessionEventInfo<UseMigrationProtocol> eventInfo, TDeferredActions<UseMigrationProtocol>& deferred);
+ void PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+ std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
+ typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
+ TDeferredActions<UseMigrationProtocol>& deferred);
// Push data event.
- TDataDecompressionInfo<UseMigrationProtocol>*
- PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- TPartitionData<UseMigrationProtocol>&& msg);
+ void PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
+ size_t batch,
+ size_t message,
+ typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent,
+ std::atomic<bool> &ready);
void SignalEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TDeferredActions<UseMigrationProtocol>& deferred); // Assumes that we're under lock.
@@ -1088,13 +1130,14 @@ private:
};
struct TDecompressionQueueItem {
- TDecompressionQueueItem(TDataDecompressionInfo<UseMigrationProtocol>* batchInfo, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream)
- : BatchInfo(batchInfo)
+ TDecompressionQueueItem(typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr batchInfo,
+ TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream)
+ : BatchInfo(std::move(batchInfo))
, PartitionStream(std::move(partitionStream))
{
}
- TDataDecompressionInfo<UseMigrationProtocol>* BatchInfo;
+ typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr BatchInfo;
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> PartitionStream;
};
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 85ae2de2e9..208eeec8f1 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -118,15 +118,47 @@ void TPartitionStreamImpl<UseMigrationProtocol>::ResumeReading() {
template<bool UseMigrationProtocol>
void TPartitionStreamImpl<UseMigrationProtocol>::SignalReadyEvents(TReadSessionEventsQueue<UseMigrationProtocol>* queue, TDeferredActions<UseMigrationProtocol>& deferred) {
- for (auto& event : EventsQueue) {
- event.Signal(this, queue, deferred);
+ Y_VERIFY(queue);
+ EventsQueue.SignalReadyEvents(*this, *queue, deferred);
+}
+
+template<bool UseMigrationProtocol>
+void TPartitionStreamImpl<UseMigrationProtocol>::DeleteNotReadyTail()
+{
+ EventsQueue.DeleteNotReadyTail();
+}
+
+template<bool UseMigrationProtocol>
+void TRawPartitionStreamEventQueue<UseMigrationProtocol>::SignalReadyEvents(TPartitionStreamImpl<UseMigrationProtocol>& stream,
+ TReadSessionEventsQueue<UseMigrationProtocol>& queue,
+ TDeferredActions<UseMigrationProtocol>& deferred)
+{
+ while (!NotReady.empty() && NotReady.front().IsReady()) {
+ auto& event = NotReady.front();
+
+ queue.SignalEventImpl(&stream, deferred);
+ Ready.push_back(std::move(event));
+ NotReady.pop_front();
+ }
+}
+
+template<bool UseMigrationProtocol>
+void TRawPartitionStreamEventQueue<UseMigrationProtocol>::DeleteNotReadyTail()
+{
+ std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> head;
+
+ for (auto& event : NotReady) {
if (!event.IsReady()) {
break;
}
+
+ head.push_back(std::move(event));
}
-}
+ swap(head, NotReady);
+}
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TSingleClusterReadSessionImpl
@@ -511,20 +543,23 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
return;
}
- using TClosedEvent = typename std::conditional_t<UseMigrationProtocol, NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
- NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>;
+ using TClosedEvent = std::conditional_t<
+ UseMigrationProtocol,
+ NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
+ NTopic::TReadSessionEvent::TPartitionSessionClosedEvent
+ >;
CookieMapping.RemoveMapping(GetPartitionStreamId(partitionStream));
PartitionStreams.erase(partitionStream->GetAssignId());
if constexpr (UseMigrationProtocol) {
- EventsQueue->PushEvent({partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
- TClosedEvent(partitionStream, TClosedEvent::EReason::DestroyConfirmedByUser)},
- deferred);
+ EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
+ TClosedEvent(partitionStream, TClosedEvent::EReason::DestroyConfirmedByUser),
+ deferred);
} else {
- EventsQueue->PushEvent({partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
- TClosedEvent(partitionStream, TClosedEvent::EReason::StopConfirmedByUser)},
- deferred);
+ EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
+ TClosedEvent(partitionStream, TClosedEvent::EReason::StopConfirmedByUser),
+ deferred);
}
TClientMessage<UseMigrationProtocol> req;
@@ -859,8 +894,12 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
deferred);
return;
}
- TDataDecompressionInfo<true>* decompressionInfo = EventsQueue->PushDataEvent(partitionStream, std::move(partitionData));
+
+ auto decompressionInfo = std::make_shared<TDataDecompressionInfo<true>>(std::move(partitionData),
+ shared_from_this(),
+ Settings.Decompress_);
Y_VERIFY(decompressionInfo);
+
if (decompressionInfo) {
DecompressionQueue.emplace_back(decompressionInfo, partitionStream);
StartDecompressionTasksImpl(deferred);
@@ -890,17 +929,17 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
if (currentPartitionStream) {
CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId());
EventsQueue->PushEvent(
- {currentPartitionStream, weak_from_this(),
+ currentPartitionStream, weak_from_this(),
TReadSessionEvent::TPartitionStreamClosedEvent(
- currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost)},
+ currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost),
deferred);
}
currentPartitionStream = partitionStream;
// Send event to user.
EventsQueue->PushEvent(
- {partitionStream, weak_from_this(),
- TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset())},
+ partitionStream, weak_from_this(),
+ TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset()),
deferred);
}
@@ -917,14 +956,14 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
if (msg.forceful_release()) {
PartitionStreams.erase(msg.assign_id());
CookieMapping.RemoveMapping(partitionStream->GetPartitionStreamId());
- EventsQueue->PushEvent({partitionStream, weak_from_this(),
+ EventsQueue->PushEvent(partitionStream, weak_from_this(),
TReadSessionEvent::TPartitionStreamClosedEvent(
- partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost)},
+ partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost),
deferred);
} else {
EventsQueue->PushEvent(
- {partitionStream, weak_from_this(),
- TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset())},
+ partitionStream, weak_from_this(),
+ TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset()),
deferred);
}
}
@@ -947,8 +986,8 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
}
for (auto& [id, partitionStream] : partitionStreams) {
EventsQueue->PushEvent(
- {partitionStream, weak_from_this(),
- TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset())},
+ partitionStream, weak_from_this(),
+ TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset()),
deferred);
}
@@ -958,8 +997,8 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
auto partitionStream = partitionStreamIt->second;
partitionStream->UpdateMaxCommittedOffset(rangeProto.end_offset());
EventsQueue->PushEvent(
- {partitionStream, weak_from_this(),
- TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset())},
+ partitionStream, weak_from_this(),
+ TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset()),
deferred);
}
}
@@ -974,11 +1013,11 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
if (partitionStreamIt == PartitionStreams.end()) {
return;
}
- EventsQueue->PushEvent({partitionStreamIt->second, weak_from_this(),
+ EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(),
TReadSessionEvent::TPartitionStreamStatusEvent(
partitionStreamIt->second, msg.committed_offset(),
0, // TODO: support read offset in status
- msg.end_offset(), TInstant::MilliSeconds(msg.write_watermark_ms()))},
+ msg.end_offset(), TInstant::MilliSeconds(msg.write_watermark_ms())),
deferred);
}
@@ -1055,8 +1094,12 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
return;
}
partitionStream->SetFirstNotReadOffset(desiredOffset);
- TDataDecompressionInfo<false>* decompressionInfo = EventsQueue->PushDataEvent(partitionStream, std::move(partitionData));
+
+ auto decompressionInfo = std::make_shared<TDataDecompressionInfo<false>>(std::move(partitionData),
+ shared_from_this(),
+ Settings.Decompress_);
Y_VERIFY(decompressionInfo);
+
if (decompressionInfo) {
DecompressionQueue.emplace_back(decompressionInfo, partitionStream);
StartDecompressionTasksImpl(deferred);
@@ -1082,17 +1125,17 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
TIntrusivePtr<TPartitionStreamImpl<false>>& currentPartitionStream = PartitionStreams[partitionStream->GetAssignId()];
if (currentPartitionStream) {
EventsQueue->PushEvent(
- {currentPartitionStream, weak_from_this(),
+ currentPartitionStream, weak_from_this(),
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent(
- currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost)},
+ currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
deferred);
}
currentPartitionStream = partitionStream;
// Send event to user.
- EventsQueue->PushEvent({partitionStream, weak_from_this(),
+ EventsQueue->PushEvent(partitionStream, weak_from_this(),
NTopic::TReadSessionEvent::TStartPartitionSessionEvent(
- partitionStream, msg.committed_offset(), msg.partition_offsets().end())},
+ partitionStream, msg.committed_offset(), msg.partition_offsets().end()),
deferred);
}
@@ -1108,14 +1151,14 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
TIntrusivePtr<TPartitionStreamImpl<false>> partitionStream = partitionStreamIt->second;
if (!msg.graceful()) {
PartitionStreams.erase(msg.partition_session_id());
- EventsQueue->PushEvent({partitionStream, weak_from_this(),
+ EventsQueue->PushEvent(partitionStream, weak_from_this(),
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent(
- partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost)},
+ partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
deferred);
} else {
EventsQueue->PushEvent(
- {partitionStream, weak_from_this(),
- NTopic::TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset())},
+ partitionStream, weak_from_this(),
+ NTopic::TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset()),
deferred);
}
}
@@ -1133,9 +1176,9 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
if (partitionStreamIt != PartitionStreams.end()) {
auto partitionStream = partitionStreamIt->second;
partitionStream->UpdateMaxCommittedOffset(rangeProto.committed_offset());
- EventsQueue->PushEvent({partitionStream, weak_from_this(),
+ EventsQueue->PushEvent(partitionStream, weak_from_this(),
NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent(
- partitionStream, rangeProto.committed_offset())},
+ partitionStream, rangeProto.committed_offset()),
deferred);
}
}
@@ -1150,13 +1193,13 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
if (partitionStreamIt == PartitionStreams.end()) {
return;
}
- EventsQueue->PushEvent({partitionStreamIt->second, weak_from_this(),
+ EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(),
NTopic::TReadSessionEvent::TPartitionSessionStatusEvent(
partitionStreamIt->second, msg.committed_offset(),
0, // TODO: support read offset in status
msg.partition_offsets().end(),
TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(
- msg.write_time_high_watermark())))},
+ msg.write_time_high_watermark()))),
deferred);
}
@@ -1198,13 +1241,15 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::StartDecompressionTask
template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DestroyAllPartitionStreamsImpl(TDeferredActions<UseMigrationProtocol>& deferred) {
- using TClosedEvent =
- typename std::conditional_t<UseMigrationProtocol, NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
- NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>;
+ using TClosedEvent = std::conditional_t<
+ UseMigrationProtocol,
+ NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
+ NTopic::TReadSessionEvent::TPartitionSessionClosedEvent
+ >;
for (auto&& [key, partitionStream] : PartitionStreams) {
- EventsQueue->PushEvent({partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
- TClosedEvent(std::move(partitionStream), TClosedEvent::EReason::ConnectionLost)},
+ EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
+ TClosedEvent(std::move(partitionStream), TClosedEvent::EReason::ConnectionLost),
deferred);
}
PartitionStreams.clear();
@@ -1482,42 +1527,11 @@ TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr
template<bool UseMigrationProtocol>
TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session)
: PartitionStream(std::move(partitionStream))
+ , DataCount(1)
, Session(std::move(session))
{}
template<bool UseMigrationProtocol>
-TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
- TVector<TMessage> messages,
- TVector<TCompressedMessage> compressedMessages)
- : PartitionStream(std::move(partitionStream))
- , Event(
- NMaybe::TInPlace(),
- std::in_place_type_t<TDataReceivedEvent>(),
- std::move(messages),
- std::move(compressedMessages),
- PartitionStream
- )
- , Session(std::move(session))
-{
-}
-
-template<bool UseMigrationProtocol>
-void TReadSessionEventInfo<UseMigrationProtocol>::MoveToPartitionStream() {
- PartitionStream->InsertEvent(std::move(*Event));
- Event = Nothing();
- Y_ASSERT(PartitionStream->HasEvents());
-}
-
-template<bool UseMigrationProtocol>
-void TReadSessionEventInfo<UseMigrationProtocol>::ExtractFromPartitionStream() {
- if (!Event && !IsEmpty()) {
- Event = std::move(PartitionStream->TopEvent().GetEvent());
- PartitionStream->PopEvent();
- }
-}
-
-template<bool UseMigrationProtocol>
bool TReadSessionEventInfo<UseMigrationProtocol>::IsEmpty() const {
return !PartitionStream || !PartitionStream->HasEvents();
}
@@ -1528,30 +1542,12 @@ bool TReadSessionEventInfo<UseMigrationProtocol>::IsDataEvent() const {
}
template<bool UseMigrationProtocol>
-bool TReadSessionEventInfo<UseMigrationProtocol>::HasMoreData() const {
- return PartitionStream->TopEvent().GetData().HasMoreData();
-}
-
-template<bool UseMigrationProtocol>
-bool TReadSessionEventInfo<UseMigrationProtocol>::HasReadyUnreadData() const {
- return PartitionStream->TopEvent().GetData().HasReadyUnreadData();
-}
-
-template<bool UseMigrationProtocol>
void TReadSessionEventInfo<UseMigrationProtocol>::OnUserRetrievedEvent() {
if (auto session = Session.lock()) {
session->OnUserRetrievedEvent(*Event);
}
}
-template<bool UseMigrationProtocol>
-bool TReadSessionEventInfo<UseMigrationProtocol>::TakeData(TVector<TMessage>* messages,
- TVector<TCompressedMessage>* compressedMessages,
- size_t* maxByteSize)
-{
- return PartitionStream->TopEvent().GetData().TakeData(PartitionStream, messages, compressedMessages, maxByteSize);
-}
-
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TReadSessionEventsQueue
@@ -1585,16 +1581,30 @@ TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue(
}
template <bool UseMigrationProtocol>
-void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TReadSessionEventInfo<UseMigrationProtocol> eventInfo,
- TDeferredActions<UseMigrationProtocol>& deferred) {
+void TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
+ std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> /*session*/,
+ typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
+ TDeferredActions<UseMigrationProtocol>& deferred)
+{
if (TParent::Closed) {
return;
}
with_lock (TParent::Mutex) {
- auto partitionStream = eventInfo.PartitionStream;
- eventInfo.MoveToPartitionStream();
- SignalReadyEventsImpl(partitionStream.Get(), deferred);
+ using TClosedEvent = std::conditional_t<
+ UseMigrationProtocol,
+ NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
+ NTopic::TReadSessionEvent::TPartitionSessionClosedEvent
+ >;
+
+ if (std::holds_alternative<TClosedEvent>(event)) {
+ stream->DeleteNotReadyTail();
+ }
+
+ stream->InsertEvent(std::move(event));
+ Y_ASSERT(stream->HasEvents());
+
+ SignalReadyEventsImpl(stream.Get(), deferred);
}
}
@@ -1605,53 +1615,67 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalEventImpl(
if (TParent::Closed) {
return;
}
+
auto session = partitionStream->GetSession();
- TParent::Events.emplace(std::move(partitionStream), std::move(session));
+
+ if (TParent::Events.empty()) {
+ TParent::Events.emplace(std::move(partitionStream), std::move(session));
+ } else {
+ auto& event = TParent::Events.back();
+ if (event.IsDataEvent() &&
+ (event.PartitionStream == partitionStream)) {
+ ++event.DataCount;
+ } else {
+ TParent::Events.emplace(std::move(partitionStream), std::move(session));
+ }
+ }
+
SignalWaiterImpl(deferred);
}
template <bool UseMigrationProtocol>
-TDataDecompressionInfo<UseMigrationProtocol>* TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(
- TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- TPartitionData<UseMigrationProtocol>&& msg) {
+void TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+ size_t batch,
+ size_t message,
+ typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr parent,
+ std::atomic<bool>& ready)
+{
if (this->Closed) {
- return nullptr;
+ return;
}
with_lock (this->Mutex) {
- return &partitionStream->InsertDataEvent(std::move(msg), this->Settings.Decompress_);
+ partitionStream->InsertDataEvent(batch, message, parent, ready);
}
}
template <bool UseMigrationProtocol>
-TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(
- TReadSessionEventInfo<UseMigrationProtocol>& srcDataEventInfo,
+typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(
+ TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
size_t* maxByteSize) { // Assumes that we're under lock.
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages;
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage> compressedMessages;
- TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream = srcDataEventInfo.PartitionStream;
- bool messageExtracted = false;
- while (srcDataEventInfo.HasReadyUnreadData() && *maxByteSize > 0) {
- const bool hasMoreUnpackedData = srcDataEventInfo.TakeData(&messages, &compressedMessages, maxByteSize);
- if (!hasMoreUnpackedData) {
- const bool messageIsFullyRead = !srcDataEventInfo.HasMoreData();
- if (messageIsFullyRead) {
- partitionStream->PopEvent();
- messageExtracted = true;
- break;
- }
- }
- }
- if (!messageExtracted) {
- partitionStream->TopEvent().Signalled = false;
+
+ Y_VERIFY(!TParent::Events.empty());
+
+ auto& event = TParent::Events.front();
+
+ Y_VERIFY(event.PartitionStream == stream);
+ Y_VERIFY(event.DataCount > 0);
+
+ for (; (event.DataCount > 0) && (*maxByteSize > 0); --event.DataCount) {
+ stream->TopEvent().GetDataEvent().TakeData(stream, &messages, &compressedMessages, maxByteSize);
+ stream->PopEvent();
}
- if (messages.empty() && compressedMessages.empty()) {
- return Nothing();
+ if (event.DataCount == 0) {
+ TParent::Events.pop();
}
- return TReadSessionEventInfo<UseMigrationProtocol>(partitionStream, partitionStream->GetSession(),
- std::move(messages), std::move(compressedMessages));
+
+ Y_VERIFY(!messages.empty() || !compressedMessages.empty());
+
+ return {std::move(messages), std::move(compressedMessages), stream};
}
template <bool UseMigrationProtocol>
@@ -1680,19 +1704,11 @@ bool TReadSessionEventsQueue<UseMigrationProtocol>::ApplyCallbacksToReadyEventsI
bool applied = false;
while (HasCallbackForNextEventImpl()) {
size_t maxSize = std::numeric_limits<size_t>::max();
- TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo = GetEventImpl(&maxSize);
- if (!eventInfo) {
- break;
- }
- const TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStreamForSignalling =
- eventInfo->IsDataEvent() ? eventInfo->PartitionStream : nullptr;
+ TReadSessionEventInfo<UseMigrationProtocol> eventInfo = GetEventImpl(&maxSize);
applied = true;
- if (!ApplyHandler(*eventInfo, deferred)) { // Close session event.
+ if (!ApplyHandler(eventInfo, deferred)) { // Close session event.
break;
}
- if (partitionStreamForSignalling) {
- SignalReadyEventsImpl(partitionStreamForSignalling.Get(), deferred);
- }
}
return applied;
}
@@ -1870,7 +1886,7 @@ i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks(
std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Session.lock();
Y_ASSERT(session);
ReadyThresholds.emplace_back();
- TDecompressionTask task(this, partitionStream, &ReadyThresholds.back());
+ TDecompressionTask task(TDataDecompressionInfo::shared_from_this(), partitionStream, &ReadyThresholds.back());
i64 used = 0;
while (availableMemory > 0 && !AllDecompressionTasksStarted()) {
const auto& batch = ServerMessage.batches(CurrentDecompressingMessage.first);
@@ -1884,6 +1900,12 @@ i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks(
Y_VERIFY(estimatedDecompressedSize >= 0);
task.Add(CurrentDecompressingMessage.first, CurrentDecompressingMessage.second, size, estimatedDecompressedSize);
+ session->GetEventsQueue()->PushDataEvent(partitionStream,
+ CurrentDecompressingMessage.first,
+ CurrentDecompressingMessage.second,
+ TDataDecompressionInfo::shared_from_this(),
+ ReadyThresholds.back().Ready);
+
used += estimatedDecompressedSize;
availableMemory -= estimatedDecompressedSize;
}
@@ -1896,7 +1918,7 @@ i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks(
session->OnCreateNewDecompressionTask();
deferred.DeferStartExecutorTask(executor, std::move(task));
ReadyThresholds.emplace_back();
- task = TDecompressionTask(this, partitionStream, &ReadyThresholds.back());
+ task = TDecompressionTask(TDataDecompressionInfo::shared_from_this(), partitionStream, &ReadyThresholds.back());
}
}
if (task.AddedMessagesCount() > 0) {
@@ -2014,6 +2036,91 @@ bool TDataDecompressionInfo<UseMigrationProtocol>::TakeData(const TIntrusivePtr<
<< minOffset << "-" << maxOffset << ")");
return CurrentReadingMessage <= *readyThreshold;
}
+
+template<bool UseMigrationProtocol>
+bool TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>* messages,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>* compressedMessages,
+ size_t* maxByteSize) const
+{
+ auto& msg = Parent->GetServerMessage();
+ i64 minOffset = Max<i64>();
+ i64 maxOffset = 0;
+ auto& batch = *msg.mutable_batches(Batch);
+ const auto& meta = Parent->GetBatchMeta(Batch);
+
+ const TInstant batchWriteTimestamp = [&batch](){
+ if constexpr (UseMigrationProtocol) {
+ return TInstant::MilliSeconds(batch.write_timestamp_ms());
+ } else {
+ return TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(batch.written_at()));
+ }
+ }();
+ auto& messageData = *batch.mutable_message_data(Message);
+
+ minOffset = Min(minOffset, static_cast<i64>(messageData.offset()));
+ maxOffset = Max(maxOffset, static_cast<i64>(messageData.offset()));
+
+ if constexpr (UseMigrationProtocol) {
+ TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(messageData.offset(),
+ batch.source_id(),
+ messageData.seq_no(),
+ TInstant::MilliSeconds(messageData.create_timestamp_ms()),
+ batchWriteTimestamp,
+ batch.ip(),
+ meta,
+ messageData.uncompressed_size());
+
+ if (Parent->GetDoDecompress()) {
+ messages->emplace_back(messageData.data(),
+ Parent->GetDecompressionError(Batch, Message),
+ messageInfo,
+ partitionStream,
+ messageData.partition_key(),
+ messageData.explicit_hash());
+ } else {
+ compressedMessages->emplace_back(static_cast<ECodec>(messageData.codec()),
+ messageData.data(),
+ TVector<TReadSessionEvent::TDataReceivedEvent::TMessageInformation>{messageInfo},
+ partitionStream,
+ messageData.partition_key(),
+ messageData.explicit_hash());
+ }
+ } else {
+ NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(messageData.offset(),
+ batch.producer_id(),
+ messageData.seq_no(),
+ TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(messageData.created_at())),
+ batchWriteTimestamp,
+ meta,
+ messageData.uncompressed_size(),
+ messageData.message_group_id());
+
+ if (Parent->GetDoDecompress()) {
+ messages->emplace_back(messageData.data(),
+ Parent->GetDecompressionError(Batch, Message),
+ messageInfo,
+ partitionStream);
+ } else {
+ compressedMessages->emplace_back(static_cast<NTopic::ECodec>(batch.codec()),
+ messageData.data(),
+ messageInfo,
+ partitionStream);
+ }
+ }
+
+ *maxByteSize -= Min(*maxByteSize, messageData.data().size());
+
+ // Clear data to free internal session's memory.
+ messageData.clear_data();
+
+ partitionStream->GetLog().Write(TLOG_DEBUG, TStringBuilder()
+ << "Take Data. Partition " << partitionStream->GetPartitionId()
+ << ". Read: {" << Batch << ", " << Message << "} ("
+ << minOffset << "-" << maxOffset << ")");
+
+ return false;
+}
template<bool UseMigrationProtocol>
bool TDataDecompressionInfo<UseMigrationProtocol>::HasReadyUnreadData() const {
@@ -2040,9 +2147,9 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::Add(size_
template <bool UseMigrationProtocol>
TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::TDecompressionTask(
- TDataDecompressionInfo* parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+ TDataDecompressionInfo::TPtr parent, TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
TReadyMessageThreshold* ready)
- : Parent(parent)
+ : Parent(std::move(parent))
, PartitionStream(std::move(partitionStream))
, Ready(ready) {
}
@@ -2130,19 +2237,6 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// TRawPartitionStreamEvent
-
-template <bool UseMigrationProtocol>
-void TRawPartitionStreamEvent<UseMigrationProtocol>::Signal(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream,
- TReadSessionEventsQueue<UseMigrationProtocol>* queue,
- TDeferredActions<UseMigrationProtocol>& deferred) {
- if (!Signalled) {
- Signalled = true;
- queue->SignalEventImpl(partitionStream, deferred);
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TDeferredActions
template<bool UseMigrationProtocol>
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index 20e7318421..f069852bf8 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -261,7 +261,7 @@ void TWriteSession::OnCdsResponse(
void TWriteSession::InitWriter() { // No Lock, very initial start - no race yet as well.
CompressionExecutor = Settings.CompressionExecutor_;
IExecutor::TPtr executor;
- executor = new TSyncExecutor();
+ executor = CreateSyncExecutor();
executor->Start();
Executor = std::move(executor);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
index aae392d7b6..5aa2e95386 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
@@ -948,9 +948,12 @@ private:
bool Started = false;
TAdaptiveLock StartLock;
};
+
IExecutor::TPtr CreateThreadPoolExecutorAdapter(std::shared_ptr<IThreadPool> threadPool); // Thread pool is expected to have been started.
IExecutor::TPtr CreateThreadPoolExecutor(size_t threads);
+IExecutor::TPtr CreateSyncExecutor();
+
//! Events for write session.
struct TWriteSessionEvent {
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp
index 0619536fb6..4bfe965309 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp
@@ -184,7 +184,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
TWriteSessionSettings writeSettings;
writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id");
- IExecutor::TPtr executor = new TSyncExecutor();
+ IExecutor::TPtr executor = CreateSyncExecutor();
writeSettings.CompressionExecutor(executor);
// LOGBROKER-7189
//SimpleWriteAndValidateData(setup.get(), writeSettings, 100u, false);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
index 10c5ec57f1..83b52aa63c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
@@ -1180,13 +1180,14 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
.CompressMessage(i, TStringBuilder() << "message" << i)); // Callback will be called.
}
- for (ui64 i = 1; i <= 2; ++i) {
+ for (ui64 i = 1; i <= 2; ) {
TMaybe<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
UNIT_ASSERT(event);
UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
auto& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
- UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages()[0].GetData(), TStringBuilder() << "message" << i);
+ for (ui32 j = 0; j < dataEvent.GetMessages().size(); ++j, ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(dataEvent.GetMessages()[j].GetData(), TStringBuilder() << "message" << i);
+ }
}
setup.AssertNoEvents();
@@ -1588,13 +1589,15 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
}
}));
- for (int i = 0; i < 2; ++i) {
+ for (int i = 0; i < 2; ) {
TMaybe<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
UNIT_ASSERT(event);
UNIT_ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
TReadSessionEvent::TDataReceivedEvent& dataEvent = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
Cerr << "got data event: " << dataEvent.DebugString() << "\n";
dataEvent.Commit();
+
+ i += dataEvent.GetMessagesCount();
}
UNIT_ASSERT(has1);
@@ -1661,11 +1664,13 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
auto calledPromise = NThreading::NewPromise<void>();
int time = 0;
setup.Settings.EventHandlers_.DataReceivedHandler([&](TReadSessionEvent::TDataReceivedEvent& event) {
- ++time;
- UNIT_ASSERT_VALUES_EQUAL(event.GetMessages().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetData(), TStringBuilder() << "message" << time);
- if (time == 2) {
- calledPromise.SetValue();
+ for (ui32 i = 0; i < event.GetMessages().size(); ++i) {
+ ++time;
+ UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[i].GetData(), TStringBuilder() << "message" << time);
+
+ if (time == 2) {
+ calledPromise.SetValue();
+ }
}
});
setup.SuccessfulInit();
@@ -1678,6 +1683,13 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
.PartitionData(2)
.Batch("src_id")
.CompressMessage(2, "message2"));
+
+ //
+ // when the PartitionStreamClosed arrives the raw messages are deleted
+ // we give time to process the messages
+ //
+ Sleep(TDuration::Seconds(2));
+
setup.MockProcessor->AddServerResponse(TMockReadSessionProcessor::TServerReadInfo()
.ForcefulReleasePartitionStream());
TMaybe<TReadSessionEvent::TEvent> event = setup.EventsQueue->GetEvent(true);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 3855269347..91683d5490 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -4,6 +4,7 @@
#include <ydb/services/persqueue_v1/ut/rate_limiter_test_setup.h>
#include <ydb/services/persqueue_v1/ut/test_utils.h>
#include <ydb/services/persqueue_v1/ut/persqueue_test_fixture.h>
+#include <ydb/services/persqueue_v1/ut/functions_executor_wrapper.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/mon/sync_http_mon.h>
@@ -1805,18 +1806,114 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}
- Y_UNIT_TEST(CheckKillBalancer) {
+ Y_UNIT_TEST(EventBatching) {
NPersQueue::TTestServer server;
server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY});
PrepareForGrpc(server);
- TPQDataWriter writer("source1", server);
+ auto driver = server.AnnoyingClient->GetDriver();
+ auto decompressor = CreateSyncExecutorWrapper();
+
+ NYdb::NPersQueue::TReadSessionSettings settings;
+ settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"});
+ settings.DecompressionExecutor(decompressor);
+ auto reader = CreateReader(*driver, settings);
+
+ for (ui32 i = 0; i < 2; ++i) {
+ auto msg = reader->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg);
+ UNIT_ASSERT(ev);
+
+ ev->Confirm();
+ }
+
+ auto writeDataAndWaitForDecompressionTasks = [&](const TString &message,
+ const TString &sourceId,
+ ui32 partitionId,
+ size_t tasksCount) {
+ //
+ // write data
+ //
+ auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, sourceId, partitionId, "raw");
+ writer->Write(message, 1);
+
+ writer->Close(TDuration::Seconds(10));
+
+ //
+ // wait for decompression tasks
+ //
+ while (decompressor->GetFuncsCount() < tasksCount) {
+ Sleep(TDuration::Seconds(1));
+ }
+ };
+
+ //
+ // stream #1: [0-, 2-]
+ // stream #2: [1-, 3-]
+ // session : []
+ //
+ writeDataAndWaitForDecompressionTasks("111", "source_id_0", 1, 1); // 0
+ writeDataAndWaitForDecompressionTasks("333", "source_id_1", 2, 2); // 1
+ writeDataAndWaitForDecompressionTasks("222", "source_id_2", 1, 3); // 2
+ writeDataAndWaitForDecompressionTasks("444", "source_id_3", 2, 4); // 3
+
+ //
+ // stream #1: [0+, 2+]
+ // stream #2: [1+, 3+]
+ // session : [(#1: 1), (#2: 1), (#1, 1)]
+ //
+ decompressor->StartFuncs({0, 3, 1, 2});
+
+ auto messages = reader->GetEvents(true);
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
+
+ {
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&messages[0]);
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages().size(), 1);
+
+ UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages()[0].GetData(), "111");
+ }
+
+ {
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&messages[1]);
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages().size(), 2);
+
+ UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages()[0].GetData(), "333");
+ UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages()[1].GetData(), "444");
+ }
+
+ {
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&messages[2]);
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(ev->GetMessages()[0].GetData(), "222");
+ }
+
+ //
+ // stream #1: []
+ // stream #2: []
+ // session : []
+ //
+ auto msg = reader->GetEvent(false);
+ UNIT_ASSERT(!msg);
+ }
+
+ Y_UNIT_TEST(CheckKillBalancer) {
+ NPersQueue::TTestServer server;
+ server.EnableLogs({ NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_READ_PROXY});
+ PrepareForGrpc(server);
auto driver = server.AnnoyingClient->GetDriver();
+ auto decompressor = CreateThreadPoolExecutorWrapper(2);
NYdb::NPersQueue::TReadSessionSettings settings;
settings.ConsumerName("shared/user").AppendTopics(SHORT_TOPIC_NAME).ReadOriginal({"dc1"});
+ settings.DecompressionExecutor(decompressor);
auto reader = CreateReader(*driver, settings);
@@ -1834,18 +1931,31 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
+ for (ui32 i = 0; i < 10; ++i) {
+ auto writer = CreateSimpleWriter(*driver, SHORT_TOPIC_NAME, TStringBuilder() << "source" << i);
+ bool res = writer->Write("valuevaluevalue", 1);
+ UNIT_ASSERT(res);
+ res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+ }
- server.AnnoyingClient->RestartBalancerTablet(server.CleverServer->GetRuntime(), "rt3.dc1--topic1");
- Cerr << "Balancer killed\n";
- ui32 createEv = 0, destroyEv = 0;
- for (ui32 i = 0; i < 4; ++i) {
+ ui32 createEv = 0, destroyEv = 0, dataEv = 0;
+ std::vector<ui32> gotDestroy{0, 0};
+
+ auto doRead = [&]() {
auto msg = reader->GetEvent(true, 1);
UNIT_ASSERT(msg);
Cerr << "Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n";
+
+ if (std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&*msg)) {
+ ++dataEv;
+ return;
+ }
+
auto ev1 = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent>(&*msg);
auto ev2 = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg);
@@ -1853,15 +1963,47 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
if (ev1) {
++destroyEv;
+ UNIT_ASSERT(ev1->GetPartitionStream()->GetPartitionId() < 2);
+ gotDestroy[ev1->GetPartitionStream()->GetPartitionId()]++;
}
if (ev2) {
- ev2->Confirm();
+ ev2->Confirm(ev2->GetEndOffset());
++createEv;
+ UNIT_ASSERT(ev2->GetPartitionStream()->GetPartitionId() < 2);
+ UNIT_ASSERT_VALUES_EQUAL(gotDestroy[ev2->GetPartitionStream()->GetPartitionId()], 1);
}
+ };
+
+ decompressor->StartFuncs({0, 1, 2, 3, 4});
+
+ for (ui32 i = 0; i < 5; ++i) {
+ doRead();
}
- UNIT_ASSERT(createEv == 2);
- UNIT_ASSERT(destroyEv == 2);
+
+ UNIT_ASSERT_VALUES_EQUAL(dataEv, 5);
+
+ server.AnnoyingClient->RestartBalancerTablet(server.CleverServer->GetRuntime(), "rt3.dc1--topic1");
+ Cerr << "Balancer killed\n";
+
+ Sleep(TDuration::Seconds(5));
+
+ for (ui32 i = 0; i < 4; ++i) {
+ doRead();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(createEv, 2);
+ UNIT_ASSERT_VALUES_EQUAL(destroyEv, 2);
+
+ UNIT_ASSERT_VALUES_EQUAL(dataEv, 5);
+
+ decompressor->StartFuncs({5, 6, 7, 8, 9});
+
+ Sleep(TDuration::Seconds(5));
+
+ auto msg = reader->GetEvent(false, 1);
+
+ UNIT_ASSERT(!msg);
UNIT_ASSERT(!reader->WaitEvent().Wait(TDuration::Seconds(1)));
}
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
index f647591235..96cac786f5 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
@@ -46,6 +46,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
)
add_test(
NAME
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
index a1e317a525..bcbbc35240 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
@@ -50,6 +50,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
)
add_test(
NAME
diff --git a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
new file mode 100644
index 0000000000..f817751350
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
@@ -0,0 +1,56 @@
+#include "functions_executor_wrapper.h"
+
+namespace NKikimr::NPersQueueTests {
+
+FunctionExecutorWrapper::FunctionExecutorWrapper(TExecutorPtr executor) :
+ Executor{std::move(executor)}
+{
+}
+
+bool FunctionExecutorWrapper::IsAsync() const
+{
+ return Executor->IsAsync();
+}
+
+void FunctionExecutorWrapper::Post(TFunction &&f)
+{
+ with_lock (Mutex) {
+ Funcs.push_back(std::move(f));
+ }
+}
+
+void FunctionExecutorWrapper::DoStart()
+{
+ Executor->Start();
+}
+
+void FunctionExecutorWrapper::StartFuncs(const std::vector<size_t>& indicies)
+{
+ with_lock (Mutex) {
+ for (auto index : indicies) {
+ Y_VERIFY(index < Funcs.size());
+ Y_VERIFY(Funcs[index]);
+
+ Executor->Post(std::move(Funcs[index]));
+ }
+ }
+}
+
+size_t FunctionExecutorWrapper::GetFuncsCount() const
+{
+ with_lock (Mutex) {
+ return Funcs.size();
+ }
+}
+
+TIntrusivePtr<FunctionExecutorWrapper> CreateThreadPoolExecutorWrapper(size_t threads)
+{
+ return MakeIntrusive<FunctionExecutorWrapper>(NYdb::NPersQueue::CreateThreadPoolExecutor(threads));
+}
+
+TIntrusivePtr<FunctionExecutorWrapper> CreateSyncExecutorWrapper()
+{
+ return MakeIntrusive<FunctionExecutorWrapper>(NYdb::NPersQueue::CreateSyncExecutor());
+}
+
+}
diff --git a/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h
new file mode 100644
index 0000000000..c895ee15d5
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/functions_executor_wrapper.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+#include <util/system/mutex.h>
+
+#include <vector>
+
+namespace NKikimr::NPersQueueTests {
+
+class FunctionExecutorWrapper : public NYdb::NPersQueue::IExecutor {
+public:
+ using TExecutorPtr = NYdb::NPersQueue::IExecutor::TPtr;
+
+ explicit FunctionExecutorWrapper(TExecutorPtr executor);
+
+ bool IsAsync() const override;
+ void Post(TFunction&& f) override;
+
+ void StartFuncs(const std::vector<size_t>& indicies);
+
+ size_t GetFuncsCount() const;
+
+private:
+ void DoStart() override;
+
+ TExecutorPtr Executor;
+ TMutex Mutex;
+ std::vector<TFunction> Funcs;
+};
+
+TIntrusivePtr<FunctionExecutorWrapper> CreateThreadPoolExecutorWrapper(size_t threads);
+TIntrusivePtr<FunctionExecutorWrapper> CreateSyncExecutorWrapper();
+
+}