aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-08-30 17:37:01 +0300
committerabcdef <akotov@ydb.tech>2023-08-30 18:06:43 +0300
commit4790a6241fcc411a4fbb7c8f1e6808cc8acb7a3a (patch)
tree480bfd530db027c9f5a67255ff7a57e10fbc412e
parent869faeec69e4aa549f3f73d3ec01ba5a5b6e423f (diff)
downloadydb-4790a6241fcc411a4fbb7c8f1e6808cc8acb7a3a.tar.gz
the `workload transfer topic-to-table run` command
команда `workload transfer topic-to-table run`
-rw-r--r--ydb/apps/ydb/ut/run_ydb.cpp12
-rw-r--r--ydb/apps/ydb/ut/run_ydb.h2
-rw-r--r--ydb/apps/ydb/ut/workload-topic.cpp7
-rw-r--r--ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp15
-rw-r--r--ydb/apps/ydb/ut/ya.make1
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp11
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h3
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp110
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h25
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp122
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h34
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp2
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp6
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h5
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp23
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h5
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h1
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/ya.make3
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp2
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h1
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp4
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp3
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp108
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.h20
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make1
33 files changed, 509 insertions, 25 deletions
diff --git a/ydb/apps/ydb/ut/run_ydb.cpp b/ydb/apps/ydb/ut/run_ydb.cpp
index d1ef793821..61d8177f89 100644
--- a/ydb/apps/ydb/ut/run_ydb.cpp
+++ b/ydb/apps/ydb/ut/run_ydb.cpp
@@ -3,6 +3,8 @@
#include <util/generic/yexception.h>
#include <util/system/shellcommand.h>
#include <util/system/env.h>
+#include <util/string/cast.h>
+#include <util/string/split.h>
#include <library/cpp/testing/common/env.h>
@@ -43,3 +45,13 @@ TString RunYdb(const TList<TString>& args1, const TList<TString>& args2)
return command.GetOutput();
}
+
+ui64 GetFullTimeValue(const TString& output)
+{
+ TVector<TString> lines, columns;
+
+ Split(output, "\n", lines);
+ Split(lines.back(), "\t", columns);
+
+ return FromString<ui64>(columns.back());
+}
diff --git a/ydb/apps/ydb/ut/run_ydb.h b/ydb/apps/ydb/ut/run_ydb.h
index 974a79ff87..cd32581fe8 100644
--- a/ydb/apps/ydb/ut/run_ydb.h
+++ b/ydb/apps/ydb/ut/run_ydb.h
@@ -7,3 +7,5 @@ TString GetYdbEndpoint();
TString GetYdbDatabase();
TString RunYdb(const TList<TString>& args1, const TList<TString>& args2);
+
+ui64 GetFullTimeValue(const TString& output);
diff --git a/ydb/apps/ydb/ut/workload-topic.cpp b/ydb/apps/ydb/ut/workload-topic.cpp
index a9bd5c8d74..2ffb1dff54 100644
--- a/ydb/apps/ydb/ut/workload-topic.cpp
+++ b/ydb/apps/ydb/ut/workload-topic.cpp
@@ -43,17 +43,14 @@ void ExpectTopic(const TTopicConfigurationMatcher& matcher)
UNIT_ASSERT_VALUES_EQUAL(description.GetConsumers().size(), matcher.Consumers);
}
-Y_UNIT_TEST(RunFull) {
+Y_UNIT_TEST(Default_RunFull) {
ExecYdb({"init"});
auto output = ExecYdb({"run", "full", "-s", "10"});
ExecYdb({"clean"});
TVector<TString> lines, columns;
- Split(output, "\n", lines);
- Split(lines.back(), "\t", columns);
-
- auto fullTime = FromString<ui64>(columns.back());
+ ui64 fullTime = GetFullTimeValue(output);
UNIT_ASSERT_GE(fullTime, 0);
UNIT_ASSERT_LT(fullTime, 10'000);
diff --git a/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp b/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp
index 74fb027cc2..35509e3b44 100644
--- a/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp
+++ b/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp
@@ -79,7 +79,7 @@ TString ExecYdb(const TList<TString>& args)
//
// ydb -e grpc://${YDB_ENDPOINT} -d /${YDB_DATABASE} workload transfer topic-to-table ${args}
//
- return RunYdb({"workload", "transfer", "topic-to-table"}, args);
+ return RunYdb({"-v", "--user", "root", "--no-password", "workload", "transfer", "topic-to-table"}, args);
}
void RunYdb(const TList<TString>& args,
@@ -91,6 +91,19 @@ void RunYdb(const TList<TString>& args,
ExpectTable({.Name=table, .Partitions=tablePartitions});
}
+Y_UNIT_TEST(Default_Run) {
+ RunYdb({"-v", "yql", "-s", R"(ALTER USER root PASSWORD "")"}, TList<TString>());
+
+ ExecYdb({"init"});
+ auto output = ExecYdb({"run", "-s", "10"});
+ ExecYdb({"clean"});
+
+ ui64 fullTime = GetFullTimeValue(output);
+
+ UNIT_ASSERT_GE(fullTime, 0);
+ UNIT_ASSERT_LT(fullTime, 10'000);
+}
+
Y_UNIT_TEST(Default_Init_Clean)
{
const TString topic = "transfer-topic";
diff --git a/ydb/apps/ydb/ut/ya.make b/ydb/apps/ydb/ut/ya.make
index a7bc653394..85315398bb 100644
--- a/ydb/apps/ydb/ut/ya.make
+++ b/ydb/apps/ydb/ut/ya.make
@@ -8,6 +8,7 @@ DEPENDS(
)
ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb")
+ENV(YDB_FEATURE_FLAGS="enable_topic_service_tx")
SRCS(
workload-topic.cpp
diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
index 4f424d70c1..10410f2640 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
@@ -175,12 +175,16 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
.StartedCount = count,
.Database = database,
.TopicName = TopicName,
+ .TableName = TableName,
.ConsumerIdx = consumerIdx,
.ConsumerPrefix = ConsumerPrefix,
- .ReaderIdx = readerIdx
+ .ReaderIdx = readerIdx,
+ .UseTransactions = UseTransactions,
+ .UseTopicApiCommit = UseTopicApiCommit,
+ .CommitPeriod = CommitPeriod
};
- threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::ReaderLoop(readerParams); }));
+ threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::RetryableReaderLoop(readerParams); }));
}
}
@@ -213,7 +217,8 @@ void TTopicOperationsScenario::StartProducerThreads(std::vector<std::future<void
.ProducerId = TGUID::CreateTimebased().AsGuidString(),
.PartitionId = (partitionSeed + writerIdx) % partitionCount,
.Direct = Direct,
- .Codec = Codec
+ .Codec = Codec,
+ .UseTransactions = UseTransactions
};
threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(writerParams); }));
diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
index b448701c61..d08847a035 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
@@ -60,6 +60,9 @@ public:
ui32 Codec;
TString TableName;
ui32 TablePartitionCount = 1;
+ bool UseTransactions = false;
+ size_t CommitPeriod = 15;
+ bool UseTopicApiCommit = false;
protected:
void CreateTopic(const TString& database,
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt
index be9991b8c9..fe429be436 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt
@@ -38,5 +38,6 @@ target_sources(topic_workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
)
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt
index da8be69159..f00c9d397f 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt
@@ -39,5 +39,6 @@ target_sources(topic_workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
)
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt
index da8be69159..f00c9d397f 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt
@@ -39,5 +39,6 @@ target_sources(topic_workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
)
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt
index be9991b8c9..fe429be436 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt
@@ -38,5 +38,6 @@ target_sources(topic_workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp
)
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
index e6dae133cc..2c9620ffb1 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
@@ -1,4 +1,5 @@
#include "topic_workload_reader.h"
+#include "topic_workload_reader_transaction_support.h"
#include "topic_workload_describe.h"
@@ -7,17 +8,36 @@
using namespace NYdb::NConsoleClient;
-void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) {
+void TTopicWorkloadReader::RetryableReaderLoop(TTopicWorkloadReaderParams& params) {
+ const TInstant endTime = Now() + TDuration::Seconds(params.TotalSec + 3);
+
+ while (!*params.ErrorFlag && Now() < endTime) {
+ try {
+ ReaderLoop(params, endTime);
+ } catch (const yexception& ex) {
+ WRITE_LOG(params.Log, ELogPriority::TLOG_WARNING, TStringBuilder() << ex);
+ }
+ }
+}
+
+void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInstant endTime) {
auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(params.Driver);
+ std::optional<TTransactionSupport> txSupport;
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver);
auto consumers = describeTopicResult.GetConsumers();
+
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
{
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
exit(EXIT_FAILURE);
}
+
+ if (params.UseTransactions) {
+ txSupport.emplace(params.Driver, params.TableName);
+ }
+
NYdb::NTopic::TReadSessionSettings settings;
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
@@ -34,7 +54,9 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) {
(*params.StartedCount)++;
- const TInstant endTime = Now() + TDuration::Seconds(params.TotalSec + 3);
+ TInstant commitTime = Now() + TDuration::Seconds(params.CommitPeriod);
+
+ TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent> stopPartitionSessionEvents;
while (Now() < endTime && !*params.ErrorFlag) {
TInstant st = TInstant::Now();
@@ -48,7 +70,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) {
}
readSession->WaitEvent().Wait(TDuration::Seconds(1));
- auto events = readSession->GetEvents(false);
+ TVector<NYdb::NTopic::TReadSessionEvent::TEvent> events = GetEvents(*readSession, params, txSupport);
auto now = TInstant::Now();
for (auto& event : events) {
@@ -59,12 +81,19 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) {
ui64 fullTime = (now - message.GetCreateTime()).MilliSeconds();
params.StatsCollector->AddReaderEvent(params.ReaderIdx, {message.GetData().Size(), fullTime});
- WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << message.GetMessageGroupId()
+ if (txSupport) {
+ txSupport->AppendRow(message.GetData());
+ }
+
+ WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << message.GetMessageGroupId()
<< " topic " << message.GetPartitionSession()->GetTopicPath() << " partition " << message.GetPartitionSession()->GetPartitionId()
<< " offset " << message.GetOffset() << " seqNo " << message.GetSeqNo()
<< " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
}
- dataEvent->Commit();
+
+ if (!txSupport || params.UseTopicApiCommit) {
+ dataEvent->Commit();
+ }
} else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
auto stream = createPartitionStreamEvent->GetPartitionSession();
ui64 startOffset = streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].StartOffset;
@@ -74,7 +103,13 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) {
} else if (auto* destroyPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
auto stream = destroyPartitionStreamEvent->GetPartitionSession();
streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].Stream = nullptr;
- destroyPartitionStreamEvent->Confirm();
+
+ if (txSupport) {
+ // gracefull shutdown. we will send confirmations later
+ stopPartitionSessionEvents.push_back(std::move(*destroyPartitionStreamEvent));
+ } else {
+ destroyPartitionStreamEvent->Confirm();
+ }
} else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
WRITE_LOG(params.Log, ELogPriority::TLOG_ERR, TStringBuilder() << "Read session closed: " << closeSessionEvent->DebugString());
*params.ErrorFlag = 1;
@@ -90,5 +125,68 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) {
WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << ackEvent->DebugString());
}
}
+
+ if (txSupport) {
+ TryCommitTx(params, txSupport, commitTime, stopPartitionSessionEvents);
+ }
+ }
+}
+
+TVector<NYdb::NTopic::TReadSessionEvent::TEvent> TTopicWorkloadReader::GetEvents(NYdb::NTopic::IReadSession& readSession,
+ TTopicWorkloadReaderParams& params,
+ std::optional<TTransactionSupport>& txSupport)
+{
+ TVector<NYdb::NTopic::TReadSessionEvent::TEvent> events;
+ NTopic::TReadSessionGetEventSettings settings;
+
+ settings.Block(false);
+
+ if (txSupport && !params.UseTopicApiCommit) {
+ if (!txSupport->Transaction) {
+ txSupport->BeginTx();
+ }
+
+ settings.Tx(*txSupport->Transaction);
+ }
+
+ return readSession.GetEvents(settings);
+}
+
+void TTopicWorkloadReader::TryCommitTx(TTopicWorkloadReaderParams& params,
+ std::optional<TTransactionSupport>& txSupport,
+ TInstant& commitTime,
+ TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents)
+{
+ Y_VERIFY(txSupport);
+
+ if (commitTime > Now()) {
+ return;
+ }
+
+ TryCommitTableChanges(params, txSupport);
+ GracefullShutdown(stopPartitionSessionEvents);
+
+ commitTime += TDuration::Seconds(params.CommitPeriod);
+}
+
+void TTopicWorkloadReader::TryCommitTableChanges(TTopicWorkloadReaderParams& params,
+ std::optional<TTransactionSupport>& txSupport)
+{
+ if (txSupport->Rows.empty()) {
+ return;
+ }
+
+ auto begin = TInstant::Now();
+ txSupport->CommitTx();
+ ui64 duration = (TInstant::Now() - begin).MilliSeconds();
+
+ params.StatsCollector->AddCommitTxEvent(params.ReaderIdx, {duration});
+}
+
+void TTopicWorkloadReader::GracefullShutdown(TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents)
+{
+ for (auto& event : stopPartitionSessionEvents) {
+ event.Confirm();
}
+ stopPartitionSessionEvents.clear();
}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h
index 0d3406312b..7a74f131e0 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h
@@ -3,6 +3,8 @@
#include "topic_workload_defines.h"
#include "topic_workload_stats_collector.h"
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
#include <library/cpp/logger/log.h>
#include <util/system/types.h>
#include <util/string/type.h>
@@ -19,14 +21,35 @@ namespace NYdb {
std::shared_ptr<std::atomic_uint> StartedCount;
TString Database;
TString TopicName;
+ TString TableName;
ui32 ConsumerIdx;
TString ConsumerPrefix;
ui64 ReaderIdx;
+ bool UseTransactions = false;
+ bool UseTopicApiCommit = false;
+ size_t CommitPeriod = 15;
};
+ class TTransactionSupport;
+
class TTopicWorkloadReader {
public:
- static void ReaderLoop(TTopicWorkloadReaderParams& params);
+ static void RetryableReaderLoop(TTopicWorkloadReaderParams& params);
+
+ private:
+ static void ReaderLoop(TTopicWorkloadReaderParams& params, TInstant endTime);
+
+ static TVector<NYdb::NTopic::TReadSessionEvent::TEvent> GetEvents(NYdb::NTopic::IReadSession& readSession,
+ TTopicWorkloadReaderParams& params,
+ std::optional<TTransactionSupport>& txSupport);
+
+ static void TryCommitTx(TTopicWorkloadReaderParams& params,
+ std::optional<TTransactionSupport>& txSupport,
+ TInstant& commitTime,
+ TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents);
+ static void TryCommitTableChanges(TTopicWorkloadReaderParams& params,
+ std::optional<TTransactionSupport>& txSupport);
+ static void GracefullShutdown(TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents);
};
}
}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp
new file mode 100644
index 0000000000..2669d95a67
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp
@@ -0,0 +1,122 @@
+#include "topic_workload_reader_transaction_support.h"
+
+#include <util/random/random.h>
+
+namespace NYdb::NConsoleClient {
+
+static void EnsureSuccess(const NYdb::TStatus& status, std::string_view name)
+{
+ Y_VERIFY(!status.IsTransportError());
+
+ if (!status.IsSuccess()) {
+ ythrow yexception() << "error on " << name << ": " << status;
+ }
+}
+
+TTransactionSupport::TTransactionSupport(const NYdb::TDriver& driver,
+ const TString& tableName) :
+ TableClient(driver),
+ TableName(tableName)
+{
+}
+
+void TTransactionSupport::BeginTx()
+{
+ Y_VERIFY(!Transaction);
+
+ if (!Session) {
+ CreateSession();
+ }
+
+ Y_VERIFY(Session);
+
+ auto settings = NYdb::NTable::TTxSettings::SerializableRW();
+ auto result = Session->BeginTransaction(settings).GetValueSync();
+ EnsureSuccess(result, "BeginTransaction");
+
+ Transaction = result.GetTransaction();
+}
+
+void TTransactionSupport::CommitTx()
+{
+ Y_VERIFY(Transaction);
+
+ UpsertIntoTable();
+ Commit();
+
+ Rows.clear();
+ Transaction = std::nullopt;
+}
+
+void TTransactionSupport::AppendRow(const TString& m)
+{
+ TRow row;
+ row.Key = RandomNumber<ui64>();
+ row.Value = m;
+
+ Rows.push_back(std::move(row));
+}
+
+void TTransactionSupport::CreateSession()
+{
+ Y_VERIFY(!Session);
+
+ auto result = TableClient.GetSession(NYdb::NTable::TCreateSessionSettings()).GetValueSync();
+ EnsureSuccess(result, "GetSession");
+
+ Session = result.GetSession();
+}
+
+void TTransactionSupport::UpsertIntoTable()
+{
+ Y_VERIFY(Transaction);
+
+ TString query = R"(
+ DECLARE $rows AS List<Struct<
+ id: Uint64,
+ value: String
+ >>;
+
+ UPSERT INTO `)" + TableName + R"(` (SELECT id, value FROM AS_TABLE($rows));
+ )";
+
+ NYdb::TParamsBuilder builder;
+
+ auto& rows = builder.AddParam("$rows");
+ rows.BeginList();
+ for (auto& row : Rows) {
+ rows.AddListItem()
+ .BeginStruct()
+ .AddMember("id").Uint64(row.Key)
+ .AddMember("value").String(row.Value)
+ .EndStruct();
+ }
+ rows.EndList();
+ rows.Build();
+
+ auto params = builder.Build();
+
+ NYdb::NTable::TExecDataQuerySettings settings;
+ settings.KeepInQueryCache(true);
+
+ auto runQuery = [this, &query, &params, &settings](NYdb::NTable::TSession) -> NYdb::TStatus {
+ return Transaction->GetSession().ExecuteDataQuery(query,
+ NYdb::NTable::TTxControl::Tx(*Transaction),
+ params,
+ settings).GetValueSync();
+ };
+
+ auto result = TableClient.RetryOperationSync(runQuery);
+ EnsureSuccess(result, "UPSERT");
+}
+
+void TTransactionSupport::Commit()
+{
+ Y_VERIFY(Transaction);
+
+ auto settings = NYdb::NTable::TCommitTxSettings();
+ auto result = Transaction->Commit(settings).GetValueSync();
+ EnsureSuccess(result, "COMMIT");
+}
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h
new file mode 100644
index 0000000000..80f74fa2a2
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+namespace NYdb::NConsoleClient {
+
+class TTransactionSupport {
+public:
+ TTransactionSupport(const NYdb::TDriver& driver,
+ const TString& tableName);
+
+ void BeginTx();
+ void CommitTx();
+ void AppendRow(const TString& data);
+
+ struct TRow {
+ ui64 Key;
+ TString Value;
+ };
+
+ std::optional<NYdb::NTable::TTransaction> Transaction;
+ TVector<TRow> Rows;
+
+private:
+ void CreateSession();
+ void UpsertIntoTable();
+ void Commit();
+
+ NYdb::NTable::TTableClient TableClient;
+ TString TableName;
+ std::optional<NYdb::NTable::TSession> Session;
+};
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
index 04d0377f47..9da4379246 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp
@@ -83,5 +83,7 @@ void TCommandWorkloadTopicRunFull::Parse(TConfig& config)
int TCommandWorkloadTopicRunFull::Run(TConfig& config)
{
+ Scenario.UseTransactions = false;
+
return Scenario.Run(config);
}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
index d9eb975093..871078a8a6 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
@@ -12,6 +12,7 @@ TTopicWorkloadStats::TTopicWorkloadStats()
, ReadBytes(0)
, ReadMessages(0)
, FullTimeHist(HighestTrackableTime, 5)
+ , CommitTxTimeHist(HighestTrackableTime, 2)
{
}
@@ -35,3 +36,8 @@ void TTopicWorkloadStats::AddEvent(const LagEvent& event)
LagMessagesHist.RecordValue(Min(event.LagMessages, HighestTrackableMessageCount));
LagTimeHist.RecordValue(Min(event.LagTime, HighestTrackableTime));
}
+
+void TTopicWorkloadStats::AddEvent(const CommitTxEvent& event)
+{
+ CommitTxTimeHist.RecordValue(Min(event.Time, HighestTrackableTime));
+}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h
index d7c85fbffb..c1a1026485 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h
@@ -19,12 +19,16 @@ namespace NYdb {
ui64 LagMessages;
ui64 LagTime;
};
+ struct CommitTxEvent {
+ ui64 Time;
+ };
TTopicWorkloadStats();
void AddEvent(const WriterEvent& event);
void AddEvent(const ReaderEvent& event);
void AddEvent(const LagEvent& event);
+ void AddEvent(const CommitTxEvent& event);
ui64 WriteBytes;
ui64 WriteMessages;
@@ -35,6 +39,7 @@ namespace NYdb {
ui64 ReadBytes;
ui64 ReadMessages;
NHdr::THistogram FullTimeHist;
+ NHdr::THistogram CommitTxTimeHist;
private:
constexpr static ui64 HighestTrackableTime = 100000;
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
index c5c30c4602..b1f57e45a8 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
@@ -24,15 +24,13 @@ TTopicWorkloadStatsCollector::TTopicWorkloadStatsCollector(
, WindowStats(MakeHolder<TTopicWorkloadStats>())
{
for (size_t writerIdx = 0; writerIdx < writerCount; writerIdx++) {
- auto writerQueue = MakeHolder<TAutoLockFreeQueue<TTopicWorkloadStats::WriterEvent>>();
- WriterEventQueues.emplace_back(std::move(writerQueue));
+ AddQueue(WriterEventQueues);
}
for (size_t readerIdx = 0; readerIdx < readerCount; readerIdx++) {
- auto readerQueue = MakeHolder<TAutoLockFreeQueue<TTopicWorkloadStats::ReaderEvent>>();
- ReaderEventQueues.emplace_back(std::move(readerQueue));
- auto lagQueue = MakeHolder<TAutoLockFreeQueue<TTopicWorkloadStats::LagEvent>>();
- LagEventQueues.emplace_back(std::move(lagQueue));
+ AddQueue(ReaderEventQueues);
+ AddQueue(LagEventQueues);
+ AddQueue(CommitTxEventQueues);
}
}
@@ -127,6 +125,7 @@ void TTopicWorkloadStatsCollector::CollectThreadEvents()
CollectThreadEvents(WriterEventQueues);
CollectThreadEvents(ReaderEventQueues);
CollectThreadEvents(LagEventQueues);
+ CollectThreadEvents(CommitTxEventQueues);
}
template<class T>
@@ -141,6 +140,13 @@ void TTopicWorkloadStatsCollector::CollectThreadEvents(TEventQueues<T>& queues)
}
}
+template<class T>
+void TTopicWorkloadStatsCollector::AddQueue(TEventQueues<T>& queues)
+{
+ auto queue = MakeHolder<TAutoLockFreeQueue<T>>();
+ queues.emplace_back(std::move(queue));
+}
+
ui64 TTopicWorkloadStatsCollector::GetTotalReadMessages() const {
return TotalStats.ReadMessages;
}
@@ -164,6 +170,11 @@ void TTopicWorkloadStatsCollector::AddLagEvent(size_t readerIdx, const TTopicWor
AddEvent(readerIdx, LagEventQueues, event);
}
+void TTopicWorkloadStatsCollector::AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event)
+{
+ AddEvent(readerIdx, CommitTxEventQueues, event);
+}
+
template<class T>
void TTopicWorkloadStatsCollector::AddEvent(size_t index, TEventQueues<T>& queues, const T& event)
{
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h
index eaa391f0ea..b6d856a6ab 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h
@@ -25,6 +25,7 @@ namespace NYdb {
void AddWriterEvent(size_t writerIdx, const TTopicWorkloadStats::WriterEvent& event);
void AddReaderEvent(size_t readerIdx, const TTopicWorkloadStats::ReaderEvent& event);
void AddLagEvent(size_t readerIdx, const TTopicWorkloadStats::LagEvent& event);
+ void AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event);
ui64 GetTotalReadMessages() const;
ui64 GetTotalWriteMessages() const;
@@ -38,6 +39,9 @@ namespace NYdb {
void CollectThreadEvents(TEventQueues<T>& queues);
template<class T>
+ static void AddQueue(TEventQueues<T>& queues);
+
+ template<class T>
void AddEvent(size_t index, TEventQueues<T>& queues, const T& event);
void PrintWindowStats(ui32 windowIt);
@@ -49,6 +53,7 @@ namespace NYdb {
TEventQueues<TTopicWorkloadStats::WriterEvent> WriterEventQueues;
TEventQueues<TTopicWorkloadStats::ReaderEvent> ReaderEventQueues;
TEventQueues<TTopicWorkloadStats::LagEvent> LagEventQueues;
+ TEventQueues<TTopicWorkloadStats::CommitTxEvent> CommitTxEventQueues;
bool Quiet;
bool PrintTimestamp;
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
index 2d42f31e90..8f2ba5c7c0 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h
@@ -29,6 +29,7 @@ namespace NYdb {
ui32 PartitionId;
bool Direct;
ui32 Codec = 0;
+ bool UseTransactions = false;
};
class TTopicWorkloadWriterWorker {
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/ya.make b/ydb/public/lib/ydb_cli/commands/topic_workload/ya.make
index 235b627878..1246090a32 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/ya.make
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/ya.make
@@ -12,6 +12,7 @@ SRCS(
topic_workload_stats_collector.cpp
topic_workload_writer.cpp
topic_workload_reader.cpp
+ topic_workload_reader_transaction_support.cpp
topic_workload.cpp
)
@@ -36,4 +37,4 @@ END()
RECURSE_FOR_TESTS(
ut
-) \ No newline at end of file
+)
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt
index dc7f515b84..4a4b5327b7 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt
@@ -31,5 +31,6 @@ target_sources(transfer_workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
)
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt
index 3bbdc78d3d..8419632422 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt
@@ -32,5 +32,6 @@ target_sources(transfer_workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
)
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt
index 3bbdc78d3d..8419632422 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt
@@ -32,5 +32,6 @@ target_sources(transfer_workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
)
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt
index dc7f515b84..4a4b5327b7 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt
@@ -31,5 +31,6 @@ target_sources(transfer_workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
)
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
index 02a7d19536..e5e971d031 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp
@@ -1,8 +1,10 @@
#include "transfer_workload_defines.h"
+#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h>
namespace NYdb::NConsoleClient::NWorkloadTransfer {
const TString TOPIC = "transfer-topic";
const TString TABLE = "transfer-table";
+const TString CONSUMER_PREFIX = NConsoleClient::CONSUMER_PREFIX;
}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h
index c5df4ab60a..be6cd055e1 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h
@@ -5,6 +5,7 @@
namespace NYdb::NConsoleClient::NWorkloadTransfer {
extern const TString TOPIC;
+extern const TString CONSUMER_PREFIX;
extern const TString TABLE;
}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
index 108696ba5e..c217f9eb13 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp
@@ -1,12 +1,11 @@
#include "transfer_workload_topic_to_table.h"
#include "transfer_workload_topic_to_table_init.h"
#include "transfer_workload_topic_to_table_clean.h"
+#include "transfer_workload_topic_to_table_run.h"
#include "transfer_workload_defines.h"
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
-#include <util/string/printf.h>
-
using namespace NYdb::NConsoleClient;
TCommandWorkloadTransferTopicToTable::TCommandWorkloadTransferTopicToTable() :
@@ -14,4 +13,5 @@ TCommandWorkloadTransferTopicToTable::TCommandWorkloadTransferTopicToTable() :
{
AddCommand(std::make_unique<TCommandWorkloadTransferTopicToTableInit>());
AddCommand(std::make_unique<TCommandWorkloadTransferTopicToTableClean>());
+ AddCommand(std::make_unique<TCommandWorkloadTransferTopicToTableRun>());
}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
index 35c52164ad..d8ee6ca626 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
@@ -42,6 +42,9 @@ void TCommandWorkloadTransferTopicToTableInit::Config(TConfig& config)
config.Opts->AddLongOption("topic", "Topic name.")
.DefaultValue(NWorkloadTransfer::TOPIC)
.StoreResult(&Scenario.TopicName);
+ config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.")
+ .DefaultValue(NWorkloadTransfer::CONSUMER_PREFIX)
+ .StoreResult(&Scenario.ConsumerPrefix);
config.Opts->AddLongOption("table", "Table name.")
.DefaultValue(NWorkloadTransfer::TABLE)
.StoreResult(&Scenario.TableName);
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp
new file mode 100644
index 0000000000..01757f7841
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp
@@ -0,0 +1,108 @@
+#include "transfer_workload_topic_to_table_run.h"
+#include "transfer_workload_defines.h"
+
+#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_describe.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_command.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_service_topic.h>
+
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h>
+#undef INCLUDE_YDB_INTERNAL_H
+
+#include <util/generic/guid.h>
+
+#include <future>
+#include <thread>
+
+using namespace NYdb::NConsoleClient;
+
+TCommandWorkloadTransferTopicToTableRun::TCommandWorkloadTransferTopicToTableRun() :
+ TWorkloadCommand("run", {}, "Run workload")
+{
+}
+
+void TCommandWorkloadTransferTopicToTableRun::Config(TConfig& config)
+{
+ TYdbCommand::Config(config);
+
+ config.SetFreeArgsNum(0);
+
+ config.Opts->AddLongOption('s', "seconds", "Seconds to run workload.")
+ .DefaultValue(10)
+ .StoreResult(&Scenario.TotalSec);
+ config.Opts->AddLongOption('w', "window", "Output window duration in seconds.")
+ .DefaultValue(1)
+ .StoreResult(&Scenario.WindowSec);
+ config.Opts->AddLongOption('q', "quiet", "Quiet mode. Doesn't print statistics each second.")
+ .StoreTrue(&Scenario.Quiet);
+ config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.")
+ .StoreTrue(&Scenario.PrintTimestamp);
+ config.Opts->AddLongOption("percentile", "Percentile for output statistics.")
+ .DefaultValue(50)
+ .StoreResult(&Scenario.Percentile);
+ config.Opts->AddLongOption("warmup", "Warm-up time in seconds.")
+ .DefaultValue(1)
+ .StoreResult(&Scenario.WarmupSec);
+ config.Opts->AddLongOption("topic", "Topic name.")
+ .DefaultValue(NWorkloadTransfer::TOPIC)
+ .StoreResult(&Scenario.TopicName);
+ config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.")
+ .DefaultValue(CONSUMER_PREFIX)
+ .StoreResult(&Scenario.ConsumerPrefix);
+ config.Opts->AddLongOption("table", "Table name.")
+ .DefaultValue(NWorkloadTransfer::TABLE)
+ .StoreResult(&Scenario.TableName);
+
+ config.Opts->AddLongOption('p', "producer-threads", "Number of producer threads.")
+ .DefaultValue(1)
+ .StoreResult(&Scenario.ProducerThreadCount);
+ config.Opts->AddLongOption('t', "consumer-threads", "Number of consumer threads.")
+ .DefaultValue(1)
+ .StoreResult(&Scenario.ConsumerThreadCount);
+ config.Opts->AddLongOption('c', "consumers", "Number of consumers in a topic.")
+ .DefaultValue(1)
+ .StoreResult(&Scenario.ConsumerCount);
+ config.Opts->AddLongOption('m', "message-size", "Message size.")
+ .DefaultValue(10_KB)
+ .StoreMappedResultT<TString>(&Scenario.MessageSize, &TCommandWorkloadTopicParams::StrToBytes);
+ config.Opts->AddLongOption("message-rate", "Total message rate for all producer threads (messages per second). Exclusive with --byte-rate.")
+ .DefaultValue(0)
+ .StoreResult(&Scenario.MessageRate);
+ config.Opts->AddLongOption("byte-rate", "Total message rate for all producer threads (bytes per second). Exclusive with --message-rate.")
+ .DefaultValue(0)
+ .StoreMappedResultT<TString>(&Scenario.ByteRate, &TCommandWorkloadTopicParams::StrToBytes);
+ config.Opts->AddLongOption("codec", PrepareAllowedCodecsDescription("Client-side compression algorithm. When read, data will be uncompressed transparently with a codec used on write", InitAllowedCodecs()))
+ .Optional()
+ .DefaultValue((TStringBuilder() << NTopic::ECodec::RAW))
+ .StoreMappedResultT<TString>(&Scenario.Codec, &TCommandWorkloadTopicParams::StrToCodec);
+ config.Opts->AddLongOption("commit-period", "Waiting time between commit.")
+ .DefaultValue(1)
+ .StoreResult(&Scenario.CommitPeriod);
+ config.Opts->AddLongOption("use-topic-commit", "Use TopicAPI commit.")
+ .DefaultValue(false)
+ .StoreTrue(&Scenario.UseTopicApiCommit);
+
+ config.Opts->MutuallyExclusive("message-rate", "byte-rate");
+
+ config.IsNetworkIntensive = true;
+}
+
+void TCommandWorkloadTransferTopicToTableRun::Parse(TConfig& config)
+{
+ TClientCommand::Parse(config);
+
+ Scenario.EnsurePercentileIsValid();
+ Scenario.EnsureWarmupSecIsValid();
+}
+
+int TCommandWorkloadTransferTopicToTableRun::Run(TConfig& config)
+{
+ Scenario.UseTransactions = true;
+
+ return Scenario.Run(config);
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.h
new file mode 100644
index 0000000000..b1aa8ff745
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h>
+#include <ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.h>
+
+namespace NYdb::NConsoleClient {
+
+class TCommandWorkloadTransferTopicToTableRun : public TWorkloadCommand {
+public:
+ TCommandWorkloadTransferTopicToTableRun();
+
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
+
+private:
+ TTopicReadWriteScenario Scenario;
+};
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make b/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make
index 6fb31b9d6f..16b5e9d136 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make
@@ -5,6 +5,7 @@ SRCS(
transfer_workload_topic_to_table.cpp
transfer_workload_topic_to_table_init.cpp
transfer_workload_topic_to_table_clean.cpp
+ transfer_workload_topic_to_table_run.cpp
transfer_workload_defines.cpp
)