diff options
author | abcdef <akotov@ydb.tech> | 2023-08-30 17:37:01 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-08-30 18:06:43 +0300 |
commit | 4790a6241fcc411a4fbb7c8f1e6808cc8acb7a3a (patch) | |
tree | 480bfd530db027c9f5a67255ff7a57e10fbc412e | |
parent | 869faeec69e4aa549f3f73d3ec01ba5a5b6e423f (diff) | |
download | ydb-4790a6241fcc411a4fbb7c8f1e6808cc8acb7a3a.tar.gz |
the `workload transfer topic-to-table run` command
команда `workload transfer topic-to-table run`
33 files changed, 509 insertions, 25 deletions
diff --git a/ydb/apps/ydb/ut/run_ydb.cpp b/ydb/apps/ydb/ut/run_ydb.cpp index d1ef793821..61d8177f89 100644 --- a/ydb/apps/ydb/ut/run_ydb.cpp +++ b/ydb/apps/ydb/ut/run_ydb.cpp @@ -3,6 +3,8 @@ #include <util/generic/yexception.h> #include <util/system/shellcommand.h> #include <util/system/env.h> +#include <util/string/cast.h> +#include <util/string/split.h> #include <library/cpp/testing/common/env.h> @@ -43,3 +45,13 @@ TString RunYdb(const TList<TString>& args1, const TList<TString>& args2) return command.GetOutput(); } + +ui64 GetFullTimeValue(const TString& output) +{ + TVector<TString> lines, columns; + + Split(output, "\n", lines); + Split(lines.back(), "\t", columns); + + return FromString<ui64>(columns.back()); +} diff --git a/ydb/apps/ydb/ut/run_ydb.h b/ydb/apps/ydb/ut/run_ydb.h index 974a79ff87..cd32581fe8 100644 --- a/ydb/apps/ydb/ut/run_ydb.h +++ b/ydb/apps/ydb/ut/run_ydb.h @@ -7,3 +7,5 @@ TString GetYdbEndpoint(); TString GetYdbDatabase(); TString RunYdb(const TList<TString>& args1, const TList<TString>& args2); + +ui64 GetFullTimeValue(const TString& output); diff --git a/ydb/apps/ydb/ut/workload-topic.cpp b/ydb/apps/ydb/ut/workload-topic.cpp index a9bd5c8d74..2ffb1dff54 100644 --- a/ydb/apps/ydb/ut/workload-topic.cpp +++ b/ydb/apps/ydb/ut/workload-topic.cpp @@ -43,17 +43,14 @@ void ExpectTopic(const TTopicConfigurationMatcher& matcher) UNIT_ASSERT_VALUES_EQUAL(description.GetConsumers().size(), matcher.Consumers); } -Y_UNIT_TEST(RunFull) { +Y_UNIT_TEST(Default_RunFull) { ExecYdb({"init"}); auto output = ExecYdb({"run", "full", "-s", "10"}); ExecYdb({"clean"}); TVector<TString> lines, columns; - Split(output, "\n", lines); - Split(lines.back(), "\t", columns); - - auto fullTime = FromString<ui64>(columns.back()); + ui64 fullTime = GetFullTimeValue(output); UNIT_ASSERT_GE(fullTime, 0); UNIT_ASSERT_LT(fullTime, 10'000); diff --git a/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp b/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp index 74fb027cc2..35509e3b44 100644 --- a/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp +++ b/ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp @@ -79,7 +79,7 @@ TString ExecYdb(const TList<TString>& args) // // ydb -e grpc://${YDB_ENDPOINT} -d /${YDB_DATABASE} workload transfer topic-to-table ${args} // - return RunYdb({"workload", "transfer", "topic-to-table"}, args); + return RunYdb({"-v", "--user", "root", "--no-password", "workload", "transfer", "topic-to-table"}, args); } void RunYdb(const TList<TString>& args, @@ -91,6 +91,19 @@ void RunYdb(const TList<TString>& args, ExpectTable({.Name=table, .Partitions=tablePartitions}); } +Y_UNIT_TEST(Default_Run) { + RunYdb({"-v", "yql", "-s", R"(ALTER USER root PASSWORD "")"}, TList<TString>()); + + ExecYdb({"init"}); + auto output = ExecYdb({"run", "-s", "10"}); + ExecYdb({"clean"}); + + ui64 fullTime = GetFullTimeValue(output); + + UNIT_ASSERT_GE(fullTime, 0); + UNIT_ASSERT_LT(fullTime, 10'000); +} + Y_UNIT_TEST(Default_Init_Clean) { const TString topic = "transfer-topic"; diff --git a/ydb/apps/ydb/ut/ya.make b/ydb/apps/ydb/ut/ya.make index a7bc653394..85315398bb 100644 --- a/ydb/apps/ydb/ut/ya.make +++ b/ydb/apps/ydb/ut/ya.make @@ -8,6 +8,7 @@ DEPENDS( ) ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") +ENV(YDB_FEATURE_FLAGS="enable_topic_service_tx") SRCS( workload-topic.cpp diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp index 4f424d70c1..10410f2640 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp @@ -175,12 +175,16 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void .StartedCount = count, .Database = database, .TopicName = TopicName, + .TableName = TableName, .ConsumerIdx = consumerIdx, .ConsumerPrefix = ConsumerPrefix, - .ReaderIdx = readerIdx + .ReaderIdx = readerIdx, + .UseTransactions = UseTransactions, + .UseTopicApiCommit = UseTopicApiCommit, + .CommitPeriod = CommitPeriod }; - threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::ReaderLoop(readerParams); })); + threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::RetryableReaderLoop(readerParams); })); } } @@ -213,7 +217,8 @@ void TTopicOperationsScenario::StartProducerThreads(std::vector<std::future<void .ProducerId = TGUID::CreateTimebased().AsGuidString(), .PartitionId = (partitionSeed + writerIdx) % partitionCount, .Direct = Direct, - .Codec = Codec + .Codec = Codec, + .UseTransactions = UseTransactions }; threads.push_back(std::async([writerParams = std::move(writerParams)]() mutable { TTopicWorkloadWriterWorker::WriterLoop(writerParams); })); diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h index b448701c61..d08847a035 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h +++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h @@ -60,6 +60,9 @@ public: ui32 Codec; TString TableName; ui32 TablePartitionCount = 1; + bool UseTransactions = false; + size_t CommitPeriod = 15; + bool UseTopicApiCommit = false; protected: void CreateTopic(const TString& database, diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt index be9991b8c9..fe429be436 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.darwin-x86_64.txt @@ -38,5 +38,6 @@ target_sources(topic_workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp ) diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt index da8be69159..f00c9d397f 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-aarch64.txt @@ -39,5 +39,6 @@ target_sources(topic_workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp ) diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt index da8be69159..f00c9d397f 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.linux-x86_64.txt @@ -39,5 +39,6 @@ target_sources(topic_workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp ) diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt index be9991b8c9..fe429be436 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/CMakeLists.windows-x86_64.txt @@ -38,5 +38,6 @@ target_sources(topic_workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload.cpp ) diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp index e6dae133cc..2c9620ffb1 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp @@ -1,4 +1,5 @@ #include "topic_workload_reader.h" +#include "topic_workload_reader_transaction_support.h" #include "topic_workload_describe.h" @@ -7,17 +8,36 @@ using namespace NYdb::NConsoleClient; -void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) { +void TTopicWorkloadReader::RetryableReaderLoop(TTopicWorkloadReaderParams& params) { + const TInstant endTime = Now() + TDuration::Seconds(params.TotalSec + 3); + + while (!*params.ErrorFlag && Now() < endTime) { + try { + ReaderLoop(params, endTime); + } catch (const yexception& ex) { + WRITE_LOG(params.Log, ELogPriority::TLOG_WARNING, TStringBuilder() << ex); + } + } +} + +void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInstant endTime) { auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(params.Driver); + std::optional<TTransactionSupport> txSupport; auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx); auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver); auto consumers = describeTopicResult.GetConsumers(); + if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; })) { WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'."); exit(EXIT_FAILURE); } + + if (params.UseTransactions) { + txSupport.emplace(params.Driver, params.TableName); + } + NYdb::NTopic::TReadSessionSettings settings; settings.ConsumerName(consumerName).AppendTopics(params.TopicName); @@ -34,7 +54,9 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) { (*params.StartedCount)++; - const TInstant endTime = Now() + TDuration::Seconds(params.TotalSec + 3); + TInstant commitTime = Now() + TDuration::Seconds(params.CommitPeriod); + + TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent> stopPartitionSessionEvents; while (Now() < endTime && !*params.ErrorFlag) { TInstant st = TInstant::Now(); @@ -48,7 +70,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) { } readSession->WaitEvent().Wait(TDuration::Seconds(1)); - auto events = readSession->GetEvents(false); + TVector<NYdb::NTopic::TReadSessionEvent::TEvent> events = GetEvents(*readSession, params, txSupport); auto now = TInstant::Now(); for (auto& event : events) { @@ -59,12 +81,19 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) { ui64 fullTime = (now - message.GetCreateTime()).MilliSeconds(); params.StatsCollector->AddReaderEvent(params.ReaderIdx, {message.GetData().Size(), fullTime}); - WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << message.GetMessageGroupId() + if (txSupport) { + txSupport->AppendRow(message.GetData()); + } + + WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << "Got message: " << message.GetMessageGroupId() << " topic " << message.GetPartitionSession()->GetTopicPath() << " partition " << message.GetPartitionSession()->GetPartitionId() << " offset " << message.GetOffset() << " seqNo " << message.GetSeqNo() << " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime); } - dataEvent->Commit(); + + if (!txSupport || params.UseTopicApiCommit) { + dataEvent->Commit(); + } } else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) { auto stream = createPartitionStreamEvent->GetPartitionSession(); ui64 startOffset = streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].StartOffset; @@ -74,7 +103,13 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) { } else if (auto* destroyPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) { auto stream = destroyPartitionStreamEvent->GetPartitionSession(); streamState[std::make_pair(stream->GetTopicPath(), stream->GetPartitionId())].Stream = nullptr; - destroyPartitionStreamEvent->Confirm(); + + if (txSupport) { + // gracefull shutdown. we will send confirmations later + stopPartitionSessionEvents.push_back(std::move(*destroyPartitionStreamEvent)); + } else { + destroyPartitionStreamEvent->Confirm(); + } } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) { WRITE_LOG(params.Log, ELogPriority::TLOG_ERR, TStringBuilder() << "Read session closed: " << closeSessionEvent->DebugString()); *params.ErrorFlag = 1; @@ -90,5 +125,68 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params) { WRITE_LOG(params.Log, ELogPriority::TLOG_DEBUG, TStringBuilder() << ackEvent->DebugString()); } } + + if (txSupport) { + TryCommitTx(params, txSupport, commitTime, stopPartitionSessionEvents); + } + } +} + +TVector<NYdb::NTopic::TReadSessionEvent::TEvent> TTopicWorkloadReader::GetEvents(NYdb::NTopic::IReadSession& readSession, + TTopicWorkloadReaderParams& params, + std::optional<TTransactionSupport>& txSupport) +{ + TVector<NYdb::NTopic::TReadSessionEvent::TEvent> events; + NTopic::TReadSessionGetEventSettings settings; + + settings.Block(false); + + if (txSupport && !params.UseTopicApiCommit) { + if (!txSupport->Transaction) { + txSupport->BeginTx(); + } + + settings.Tx(*txSupport->Transaction); + } + + return readSession.GetEvents(settings); +} + +void TTopicWorkloadReader::TryCommitTx(TTopicWorkloadReaderParams& params, + std::optional<TTransactionSupport>& txSupport, + TInstant& commitTime, + TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents) +{ + Y_VERIFY(txSupport); + + if (commitTime > Now()) { + return; + } + + TryCommitTableChanges(params, txSupport); + GracefullShutdown(stopPartitionSessionEvents); + + commitTime += TDuration::Seconds(params.CommitPeriod); +} + +void TTopicWorkloadReader::TryCommitTableChanges(TTopicWorkloadReaderParams& params, + std::optional<TTransactionSupport>& txSupport) +{ + if (txSupport->Rows.empty()) { + return; + } + + auto begin = TInstant::Now(); + txSupport->CommitTx(); + ui64 duration = (TInstant::Now() - begin).MilliSeconds(); + + params.StatsCollector->AddCommitTxEvent(params.ReaderIdx, {duration}); +} + +void TTopicWorkloadReader::GracefullShutdown(TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents) +{ + for (auto& event : stopPartitionSessionEvents) { + event.Confirm(); } + stopPartitionSessionEvents.clear(); } diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h index 0d3406312b..7a74f131e0 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h @@ -3,6 +3,8 @@ #include "topic_workload_defines.h" #include "topic_workload_stats_collector.h" +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + #include <library/cpp/logger/log.h> #include <util/system/types.h> #include <util/string/type.h> @@ -19,14 +21,35 @@ namespace NYdb { std::shared_ptr<std::atomic_uint> StartedCount; TString Database; TString TopicName; + TString TableName; ui32 ConsumerIdx; TString ConsumerPrefix; ui64 ReaderIdx; + bool UseTransactions = false; + bool UseTopicApiCommit = false; + size_t CommitPeriod = 15; }; + class TTransactionSupport; + class TTopicWorkloadReader { public: - static void ReaderLoop(TTopicWorkloadReaderParams& params); + static void RetryableReaderLoop(TTopicWorkloadReaderParams& params); + + private: + static void ReaderLoop(TTopicWorkloadReaderParams& params, TInstant endTime); + + static TVector<NYdb::NTopic::TReadSessionEvent::TEvent> GetEvents(NYdb::NTopic::IReadSession& readSession, + TTopicWorkloadReaderParams& params, + std::optional<TTransactionSupport>& txSupport); + + static void TryCommitTx(TTopicWorkloadReaderParams& params, + std::optional<TTransactionSupport>& txSupport, + TInstant& commitTime, + TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents); + static void TryCommitTableChanges(TTopicWorkloadReaderParams& params, + std::optional<TTransactionSupport>& txSupport); + static void GracefullShutdown(TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents); }; } } diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp new file mode 100644 index 0000000000..2669d95a67 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp @@ -0,0 +1,122 @@ +#include "topic_workload_reader_transaction_support.h" + +#include <util/random/random.h> + +namespace NYdb::NConsoleClient { + +static void EnsureSuccess(const NYdb::TStatus& status, std::string_view name) +{ + Y_VERIFY(!status.IsTransportError()); + + if (!status.IsSuccess()) { + ythrow yexception() << "error on " << name << ": " << status; + } +} + +TTransactionSupport::TTransactionSupport(const NYdb::TDriver& driver, + const TString& tableName) : + TableClient(driver), + TableName(tableName) +{ +} + +void TTransactionSupport::BeginTx() +{ + Y_VERIFY(!Transaction); + + if (!Session) { + CreateSession(); + } + + Y_VERIFY(Session); + + auto settings = NYdb::NTable::TTxSettings::SerializableRW(); + auto result = Session->BeginTransaction(settings).GetValueSync(); + EnsureSuccess(result, "BeginTransaction"); + + Transaction = result.GetTransaction(); +} + +void TTransactionSupport::CommitTx() +{ + Y_VERIFY(Transaction); + + UpsertIntoTable(); + Commit(); + + Rows.clear(); + Transaction = std::nullopt; +} + +void TTransactionSupport::AppendRow(const TString& m) +{ + TRow row; + row.Key = RandomNumber<ui64>(); + row.Value = m; + + Rows.push_back(std::move(row)); +} + +void TTransactionSupport::CreateSession() +{ + Y_VERIFY(!Session); + + auto result = TableClient.GetSession(NYdb::NTable::TCreateSessionSettings()).GetValueSync(); + EnsureSuccess(result, "GetSession"); + + Session = result.GetSession(); +} + +void TTransactionSupport::UpsertIntoTable() +{ + Y_VERIFY(Transaction); + + TString query = R"( + DECLARE $rows AS List<Struct< + id: Uint64, + value: String + >>; + + UPSERT INTO `)" + TableName + R"(` (SELECT id, value FROM AS_TABLE($rows)); + )"; + + NYdb::TParamsBuilder builder; + + auto& rows = builder.AddParam("$rows"); + rows.BeginList(); + for (auto& row : Rows) { + rows.AddListItem() + .BeginStruct() + .AddMember("id").Uint64(row.Key) + .AddMember("value").String(row.Value) + .EndStruct(); + } + rows.EndList(); + rows.Build(); + + auto params = builder.Build(); + + NYdb::NTable::TExecDataQuerySettings settings; + settings.KeepInQueryCache(true); + + auto runQuery = [this, &query, ¶ms, &settings](NYdb::NTable::TSession) -> NYdb::TStatus { + return Transaction->GetSession().ExecuteDataQuery(query, + NYdb::NTable::TTxControl::Tx(*Transaction), + params, + settings).GetValueSync(); + }; + + auto result = TableClient.RetryOperationSync(runQuery); + EnsureSuccess(result, "UPSERT"); +} + +void TTransactionSupport::Commit() +{ + Y_VERIFY(Transaction); + + auto settings = NYdb::NTable::TCommitTxSettings(); + auto result = Transaction->Commit(settings).GetValueSync(); + EnsureSuccess(result, "COMMIT"); +} + +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h new file mode 100644 index 0000000000..80f74fa2a2 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h @@ -0,0 +1,34 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +namespace NYdb::NConsoleClient { + +class TTransactionSupport { +public: + TTransactionSupport(const NYdb::TDriver& driver, + const TString& tableName); + + void BeginTx(); + void CommitTx(); + void AppendRow(const TString& data); + + struct TRow { + ui64 Key; + TString Value; + }; + + std::optional<NYdb::NTable::TTransaction> Transaction; + TVector<TRow> Rows; + +private: + void CreateSession(); + void UpsertIntoTable(); + void Commit(); + + NYdb::NTable::TTableClient TableClient; + TString TableName; + std::optional<NYdb::NTable::TSession> Session; +}; + +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp index 04d0377f47..9da4379246 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_full.cpp @@ -83,5 +83,7 @@ void TCommandWorkloadTopicRunFull::Parse(TConfig& config) int TCommandWorkloadTopicRunFull::Run(TConfig& config) { + Scenario.UseTransactions = false; + return Scenario.Run(config); } diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp index d9eb975093..871078a8a6 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp @@ -12,6 +12,7 @@ TTopicWorkloadStats::TTopicWorkloadStats() , ReadBytes(0) , ReadMessages(0) , FullTimeHist(HighestTrackableTime, 5) + , CommitTxTimeHist(HighestTrackableTime, 2) { } @@ -35,3 +36,8 @@ void TTopicWorkloadStats::AddEvent(const LagEvent& event) LagMessagesHist.RecordValue(Min(event.LagMessages, HighestTrackableMessageCount)); LagTimeHist.RecordValue(Min(event.LagTime, HighestTrackableTime)); } + +void TTopicWorkloadStats::AddEvent(const CommitTxEvent& event) +{ + CommitTxTimeHist.RecordValue(Min(event.Time, HighestTrackableTime)); +} diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h index d7c85fbffb..c1a1026485 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h @@ -19,12 +19,16 @@ namespace NYdb { ui64 LagMessages; ui64 LagTime; }; + struct CommitTxEvent { + ui64 Time; + }; TTopicWorkloadStats(); void AddEvent(const WriterEvent& event); void AddEvent(const ReaderEvent& event); void AddEvent(const LagEvent& event); + void AddEvent(const CommitTxEvent& event); ui64 WriteBytes; ui64 WriteMessages; @@ -35,6 +39,7 @@ namespace NYdb { ui64 ReadBytes; ui64 ReadMessages; NHdr::THistogram FullTimeHist; + NHdr::THistogram CommitTxTimeHist; private: constexpr static ui64 HighestTrackableTime = 100000; diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp index c5c30c4602..b1f57e45a8 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp @@ -24,15 +24,13 @@ TTopicWorkloadStatsCollector::TTopicWorkloadStatsCollector( , WindowStats(MakeHolder<TTopicWorkloadStats>()) { for (size_t writerIdx = 0; writerIdx < writerCount; writerIdx++) { - auto writerQueue = MakeHolder<TAutoLockFreeQueue<TTopicWorkloadStats::WriterEvent>>(); - WriterEventQueues.emplace_back(std::move(writerQueue)); + AddQueue(WriterEventQueues); } for (size_t readerIdx = 0; readerIdx < readerCount; readerIdx++) { - auto readerQueue = MakeHolder<TAutoLockFreeQueue<TTopicWorkloadStats::ReaderEvent>>(); - ReaderEventQueues.emplace_back(std::move(readerQueue)); - auto lagQueue = MakeHolder<TAutoLockFreeQueue<TTopicWorkloadStats::LagEvent>>(); - LagEventQueues.emplace_back(std::move(lagQueue)); + AddQueue(ReaderEventQueues); + AddQueue(LagEventQueues); + AddQueue(CommitTxEventQueues); } } @@ -127,6 +125,7 @@ void TTopicWorkloadStatsCollector::CollectThreadEvents() CollectThreadEvents(WriterEventQueues); CollectThreadEvents(ReaderEventQueues); CollectThreadEvents(LagEventQueues); + CollectThreadEvents(CommitTxEventQueues); } template<class T> @@ -141,6 +140,13 @@ void TTopicWorkloadStatsCollector::CollectThreadEvents(TEventQueues<T>& queues) } } +template<class T> +void TTopicWorkloadStatsCollector::AddQueue(TEventQueues<T>& queues) +{ + auto queue = MakeHolder<TAutoLockFreeQueue<T>>(); + queues.emplace_back(std::move(queue)); +} + ui64 TTopicWorkloadStatsCollector::GetTotalReadMessages() const { return TotalStats.ReadMessages; } @@ -164,6 +170,11 @@ void TTopicWorkloadStatsCollector::AddLagEvent(size_t readerIdx, const TTopicWor AddEvent(readerIdx, LagEventQueues, event); } +void TTopicWorkloadStatsCollector::AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event) +{ + AddEvent(readerIdx, CommitTxEventQueues, event); +} + template<class T> void TTopicWorkloadStatsCollector::AddEvent(size_t index, TEventQueues<T>& queues, const T& event) { diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h index eaa391f0ea..b6d856a6ab 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h @@ -25,6 +25,7 @@ namespace NYdb { void AddWriterEvent(size_t writerIdx, const TTopicWorkloadStats::WriterEvent& event); void AddReaderEvent(size_t readerIdx, const TTopicWorkloadStats::ReaderEvent& event); void AddLagEvent(size_t readerIdx, const TTopicWorkloadStats::LagEvent& event); + void AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event); ui64 GetTotalReadMessages() const; ui64 GetTotalWriteMessages() const; @@ -38,6 +39,9 @@ namespace NYdb { void CollectThreadEvents(TEventQueues<T>& queues); template<class T> + static void AddQueue(TEventQueues<T>& queues); + + template<class T> void AddEvent(size_t index, TEventQueues<T>& queues, const T& event); void PrintWindowStats(ui32 windowIt); @@ -49,6 +53,7 @@ namespace NYdb { TEventQueues<TTopicWorkloadStats::WriterEvent> WriterEventQueues; TEventQueues<TTopicWorkloadStats::ReaderEvent> ReaderEventQueues; TEventQueues<TTopicWorkloadStats::LagEvent> LagEventQueues; + TEventQueues<TTopicWorkloadStats::CommitTxEvent> CommitTxEventQueues; bool Quiet; bool PrintTimestamp; diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h index 2d42f31e90..8f2ba5c7c0 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h @@ -29,6 +29,7 @@ namespace NYdb { ui32 PartitionId; bool Direct; ui32 Codec = 0; + bool UseTransactions = false; }; class TTopicWorkloadWriterWorker { diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/ya.make b/ydb/public/lib/ydb_cli/commands/topic_workload/ya.make index 235b627878..1246090a32 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/ya.make +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/ya.make @@ -12,6 +12,7 @@ SRCS( topic_workload_stats_collector.cpp topic_workload_writer.cpp topic_workload_reader.cpp + topic_workload_reader_transaction_support.cpp topic_workload.cpp ) @@ -36,4 +37,4 @@ END() RECURSE_FOR_TESTS( ut -)
\ No newline at end of file +) diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt index dc7f515b84..4a4b5327b7 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.darwin-x86_64.txt @@ -31,5 +31,6 @@ target_sources(transfer_workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp ) diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt index 3bbdc78d3d..8419632422 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-aarch64.txt @@ -32,5 +32,6 @@ target_sources(transfer_workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp ) diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt index 3bbdc78d3d..8419632422 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.linux-x86_64.txt @@ -32,5 +32,6 @@ target_sources(transfer_workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp ) diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt index dc7f515b84..4a4b5327b7 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/CMakeLists.windows-x86_64.txt @@ -31,5 +31,6 @@ target_sources(transfer_workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp ) diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp index 02a7d19536..e5e971d031 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.cpp @@ -1,8 +1,10 @@ #include "transfer_workload_defines.h" +#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_defines.h> namespace NYdb::NConsoleClient::NWorkloadTransfer { const TString TOPIC = "transfer-topic"; const TString TABLE = "transfer-table"; +const TString CONSUMER_PREFIX = NConsoleClient::CONSUMER_PREFIX; } diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h index c5df4ab60a..be6cd055e1 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_defines.h @@ -5,6 +5,7 @@ namespace NYdb::NConsoleClient::NWorkloadTransfer { extern const TString TOPIC; +extern const TString CONSUMER_PREFIX; extern const TString TABLE; } diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp index 108696ba5e..c217f9eb13 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table.cpp @@ -1,12 +1,11 @@ #include "transfer_workload_topic_to_table.h" #include "transfer_workload_topic_to_table_init.h" #include "transfer_workload_topic_to_table_clean.h" +#include "transfer_workload_topic_to_table_run.h" #include "transfer_workload_defines.h" #include <ydb/public/lib/ydb_cli/commands/ydb_common.h> -#include <util/string/printf.h> - using namespace NYdb::NConsoleClient; TCommandWorkloadTransferTopicToTable::TCommandWorkloadTransferTopicToTable() : @@ -14,4 +13,5 @@ TCommandWorkloadTransferTopicToTable::TCommandWorkloadTransferTopicToTable() : { AddCommand(std::make_unique<TCommandWorkloadTransferTopicToTableInit>()); AddCommand(std::make_unique<TCommandWorkloadTransferTopicToTableClean>()); + AddCommand(std::make_unique<TCommandWorkloadTransferTopicToTableRun>()); } diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp index 35c52164ad..d8ee6ca626 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp @@ -42,6 +42,9 @@ void TCommandWorkloadTransferTopicToTableInit::Config(TConfig& config) config.Opts->AddLongOption("topic", "Topic name.") .DefaultValue(NWorkloadTransfer::TOPIC) .StoreResult(&Scenario.TopicName); + config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.") + .DefaultValue(NWorkloadTransfer::CONSUMER_PREFIX) + .StoreResult(&Scenario.ConsumerPrefix); config.Opts->AddLongOption("table", "Table name.") .DefaultValue(NWorkloadTransfer::TABLE) .StoreResult(&Scenario.TableName); diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp new file mode 100644 index 0000000000..01757f7841 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp @@ -0,0 +1,108 @@ +#include "transfer_workload_topic_to_table_run.h" +#include "transfer_workload_defines.h" + +#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_describe.h> +#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_params.h> +#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h> +#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h> +#include <ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_writer.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_command.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_service_topic.h> + +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <util/generic/guid.h> + +#include <future> +#include <thread> + +using namespace NYdb::NConsoleClient; + +TCommandWorkloadTransferTopicToTableRun::TCommandWorkloadTransferTopicToTableRun() : + TWorkloadCommand("run", {}, "Run workload") +{ +} + +void TCommandWorkloadTransferTopicToTableRun::Config(TConfig& config) +{ + TYdbCommand::Config(config); + + config.SetFreeArgsNum(0); + + config.Opts->AddLongOption('s', "seconds", "Seconds to run workload.") + .DefaultValue(10) + .StoreResult(&Scenario.TotalSec); + config.Opts->AddLongOption('w', "window", "Output window duration in seconds.") + .DefaultValue(1) + .StoreResult(&Scenario.WindowSec); + config.Opts->AddLongOption('q', "quiet", "Quiet mode. Doesn't print statistics each second.") + .StoreTrue(&Scenario.Quiet); + config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.") + .StoreTrue(&Scenario.PrintTimestamp); + config.Opts->AddLongOption("percentile", "Percentile for output statistics.") + .DefaultValue(50) + .StoreResult(&Scenario.Percentile); + config.Opts->AddLongOption("warmup", "Warm-up time in seconds.") + .DefaultValue(1) + .StoreResult(&Scenario.WarmupSec); + config.Opts->AddLongOption("topic", "Topic name.") + .DefaultValue(NWorkloadTransfer::TOPIC) + .StoreResult(&Scenario.TopicName); + config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.") + .DefaultValue(CONSUMER_PREFIX) + .StoreResult(&Scenario.ConsumerPrefix); + config.Opts->AddLongOption("table", "Table name.") + .DefaultValue(NWorkloadTransfer::TABLE) + .StoreResult(&Scenario.TableName); + + config.Opts->AddLongOption('p', "producer-threads", "Number of producer threads.") + .DefaultValue(1) + .StoreResult(&Scenario.ProducerThreadCount); + config.Opts->AddLongOption('t', "consumer-threads", "Number of consumer threads.") + .DefaultValue(1) + .StoreResult(&Scenario.ConsumerThreadCount); + config.Opts->AddLongOption('c', "consumers", "Number of consumers in a topic.") + .DefaultValue(1) + .StoreResult(&Scenario.ConsumerCount); + config.Opts->AddLongOption('m', "message-size", "Message size.") + .DefaultValue(10_KB) + .StoreMappedResultT<TString>(&Scenario.MessageSize, &TCommandWorkloadTopicParams::StrToBytes); + config.Opts->AddLongOption("message-rate", "Total message rate for all producer threads (messages per second). Exclusive with --byte-rate.") + .DefaultValue(0) + .StoreResult(&Scenario.MessageRate); + config.Opts->AddLongOption("byte-rate", "Total message rate for all producer threads (bytes per second). Exclusive with --message-rate.") + .DefaultValue(0) + .StoreMappedResultT<TString>(&Scenario.ByteRate, &TCommandWorkloadTopicParams::StrToBytes); + config.Opts->AddLongOption("codec", PrepareAllowedCodecsDescription("Client-side compression algorithm. When read, data will be uncompressed transparently with a codec used on write", InitAllowedCodecs())) + .Optional() + .DefaultValue((TStringBuilder() << NTopic::ECodec::RAW)) + .StoreMappedResultT<TString>(&Scenario.Codec, &TCommandWorkloadTopicParams::StrToCodec); + config.Opts->AddLongOption("commit-period", "Waiting time between commit.") + .DefaultValue(1) + .StoreResult(&Scenario.CommitPeriod); + config.Opts->AddLongOption("use-topic-commit", "Use TopicAPI commit.") + .DefaultValue(false) + .StoreTrue(&Scenario.UseTopicApiCommit); + + config.Opts->MutuallyExclusive("message-rate", "byte-rate"); + + config.IsNetworkIntensive = true; +} + +void TCommandWorkloadTransferTopicToTableRun::Parse(TConfig& config) +{ + TClientCommand::Parse(config); + + Scenario.EnsurePercentileIsValid(); + Scenario.EnsureWarmupSecIsValid(); +} + +int TCommandWorkloadTransferTopicToTableRun::Run(TConfig& config) +{ + Scenario.UseTransactions = true; + + return Scenario.Run(config); +} diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.h new file mode 100644 index 0000000000..b1aa8ff745 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.h @@ -0,0 +1,20 @@ +#pragma once + +#include <ydb/public/lib/ydb_cli/commands/ydb_workload.h> +#include <ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.h> + +namespace NYdb::NConsoleClient { + +class TCommandWorkloadTransferTopicToTableRun : public TWorkloadCommand { +public: + TCommandWorkloadTransferTopicToTableRun(); + + void Config(TConfig& config) override; + void Parse(TConfig& config) override; + int Run(TConfig& config) override; + +private: + TTopicReadWriteScenario Scenario; +}; + +} diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make b/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make index 6fb31b9d6f..16b5e9d136 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/ya.make @@ -5,6 +5,7 @@ SRCS( transfer_workload_topic_to_table.cpp transfer_workload_topic_to_table_init.cpp transfer_workload_topic_to_table_clean.cpp + transfer_workload_topic_to_table_run.cpp transfer_workload_defines.cpp ) |