diff options
author | abcdef <akotov@ydb.tech> | 2023-09-29 09:29:51 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-09-29 09:49:13 +0300 |
commit | ceda18005f87ee1bc87fbc1d516b8b9e39028df5 (patch) | |
tree | 703163946f24f566787d6c9dd5200ce9172282e7 | |
parent | 82c27d07b8cca223d3ec075911da0541c782b7b7 (diff) | |
download | ydb-ceda18005f87ee1bc87fbc1d516b8b9e39028df5.tar.gz |
- добавил опцию `--commit-messages`
- добавил чтение из таблицы в сценарии с транзакциями
- добавил опции `--only-table-in-tx` и `--only-topic-in-tx`
- поменял вывод в таблицу статистики
14 files changed, 305 insertions, 53 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp index 10410f2640..c4f7923350 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp @@ -44,6 +44,16 @@ void TTopicOperationsScenario::EnsureWarmupSecIsValid() const } } +TString TTopicOperationsScenario::GetReadOnlyTableName() const +{ + return TableName + "-ro"; +} + +TString TTopicOperationsScenario::GetWriteOnlyTableName() const +{ + return TableName; +} + THolder<TLogBackend> TTopicOperationsScenario::MakeLogBackend(TConfig::EVerbosityLevel level) { return CreateLogBackend("cerr", @@ -74,7 +84,8 @@ void TTopicOperationsScenario::InitStatsCollector() TotalSec.Seconds(), WarmupSec.Seconds(), Percentile, - ErrorFlag); + ErrorFlag, + UseTransactions); } void TTopicOperationsScenario::CreateTopic(const TString& database, @@ -118,6 +129,17 @@ void TTopicOperationsScenario::ExecSchemeQuery(const TString& query) ThrowOnError(result); } +void TTopicOperationsScenario::ExecDataQuery(const TString& query, + const NYdb::TParams& params) +{ + NTable::TTableClient client(*Driver); + auto session = GetSession(client); + auto result = session.ExecuteDataQuery(query, + NTable::TTxControl::BeginTx(NTable::TTxSettings::SerializableRW()).CommitTx(), + params).ExtractValueSync(); + ThrowOnError(result); +} + void TTopicOperationsScenario::EnsureTopicNotExist(const TString& topic) { Y_VERIFY(Driver); @@ -175,13 +197,17 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void .StartedCount = count, .Database = database, .TopicName = TopicName, - .TableName = TableName, + .TableName = GetWriteOnlyTableName(), + .ReadOnlyTableName = GetReadOnlyTableName(), .ConsumerIdx = consumerIdx, .ConsumerPrefix = ConsumerPrefix, .ReaderIdx = readerIdx, .UseTransactions = UseTransactions, - .UseTopicApiCommit = UseTopicApiCommit, - .CommitPeriod = CommitPeriod + .UseTopicCommit = OnlyTableInTx, + .UseTableSelect = UseTableSelect && !OnlyTopicInTx, + .UseTableUpsert = !OnlyTopicInTx, + .CommitPeriod = CommitPeriod, + .CommitMessages = CommitMessages }; threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::RetryableReaderLoop(readerParams); })); diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h index d08847a035..dc702ed79c 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h +++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h @@ -18,6 +18,7 @@ class TLog; namespace NYdb { class TDriver; +class TParams; } @@ -41,6 +42,9 @@ public: void EnsurePercentileIsValid() const; void EnsureWarmupSecIsValid() const; + TString GetReadOnlyTableName() const; + TString GetWriteOnlyTableName() const; + TDuration TotalSec; TDuration WindowSec; TDuration WarmupSec; @@ -61,8 +65,11 @@ public: TString TableName; ui32 TablePartitionCount = 1; bool UseTransactions = false; - size_t CommitPeriod = 15; - bool UseTopicApiCommit = false; + size_t CommitPeriod = 10; + size_t CommitMessages = 1'000'000; + bool OnlyTopicInTx = false; + bool OnlyTableInTx = false; + bool UseTableSelect = true; protected: void CreateTopic(const TString& database, @@ -75,6 +82,7 @@ protected: void DropTable(const TString& database, const TString& table); void ExecSchemeQuery(const TString& query); + void ExecDataQuery(const TString& query, const NYdb::TParams& params); void StartConsumerThreads(std::vector<std::future<void>>& threads, const TString& database); diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp index aeca9f0004..d14840a6f1 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp @@ -35,7 +35,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta } if (params.UseTransactions) { - txSupport.emplace(params.Driver, params.TableName); + txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName); } NYdb::NTopic::TReadSessionSettings settings; @@ -93,7 +93,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta << " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime); } - if (!txSupport || params.UseTopicApiCommit) { + if (!txSupport || params.UseTopicCommit) { dataEvent->Commit(); } } else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) { @@ -143,12 +143,14 @@ TVector<NYdb::NTopic::TReadSessionEvent::TEvent> TTopicWorkloadReader::GetEvents settings.Block(false); - if (txSupport && !params.UseTopicApiCommit) { + if (txSupport) { if (!txSupport->Transaction) { txSupport->BeginTx(); } - settings.Tx(*txSupport->Transaction); + if (!params.UseTopicCommit) { + settings.Tx(*txSupport->Transaction); + } } return readSession.GetEvents(settings); @@ -161,7 +163,7 @@ void TTopicWorkloadReader::TryCommitTx(TTopicWorkloadReaderParams& params, { Y_VERIFY(txSupport); - if (commitTime > Now()) { + if ((commitTime > Now()) && (params.CommitMessages > txSupport->Rows.size())) { return; } @@ -178,11 +180,11 @@ void TTopicWorkloadReader::TryCommitTableChanges(TTopicWorkloadReaderParams& par return; } - auto begin = TInstant::Now(); - txSupport->CommitTx(); - ui64 duration = (TInstant::Now() - begin).MilliSeconds(); + auto execTimes = txSupport->CommitTx(params.UseTableSelect, params.UseTableUpsert); - params.StatsCollector->AddCommitTxEvent(params.ReaderIdx, {duration}); + params.StatsCollector->AddSelectEvent(params.ReaderIdx, {execTimes.SelectTime.MilliSeconds()}); + params.StatsCollector->AddUpsertEvent(params.ReaderIdx, {execTimes.UpsertTime.MilliSeconds()}); + params.StatsCollector->AddCommitTxEvent(params.ReaderIdx, {execTimes.CommitTime.MilliSeconds()}); } void TTopicWorkloadReader::GracefullShutdown(TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents) diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h index 7a74f131e0..09ced96f17 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h @@ -22,12 +22,16 @@ namespace NYdb { TString Database; TString TopicName; TString TableName; + TString ReadOnlyTableName; ui32 ConsumerIdx; TString ConsumerPrefix; ui64 ReaderIdx; bool UseTransactions = false; - bool UseTopicApiCommit = false; + bool UseTopicCommit = false; + bool UseTableSelect = true; + bool UseTableUpsert = true; size_t CommitPeriod = 15; + size_t CommitMessages = 1'000'000; }; class TTransactionSupport; diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp index 2669d95a67..93720fdc2e 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp @@ -6,17 +6,23 @@ namespace NYdb::NConsoleClient { static void EnsureSuccess(const NYdb::TStatus& status, std::string_view name) { - Y_VERIFY(!status.IsTransportError()); + if (status.IsTransportError()) { + Cerr << "transport error on " << name << ": " << status << Endl; + ythrow yexception() << "transport error on " << name << ": " << status; + } if (!status.IsSuccess()) { + Cerr << "error on " << name << ": " << status << Endl; ythrow yexception() << "error on " << name << ": " << status; } } TTransactionSupport::TTransactionSupport(const NYdb::TDriver& driver, - const TString& tableName) : + const TString& readOnlyTableName, + const TString& writeOnlyTableName) : TableClient(driver), - TableName(tableName) + ReadOnlyTableName(readOnlyTableName), + WriteOnlyTableName(writeOnlyTableName) { } @@ -37,15 +43,20 @@ void TTransactionSupport::BeginTx() Transaction = result.GetTransaction(); } -void TTransactionSupport::CommitTx() +auto TTransactionSupport::CommitTx(bool useTableSelect, bool useTableUpsert) -> TExecutionTimes { Y_VERIFY(Transaction); - UpsertIntoTable(); - Commit(); + TExecutionTimes result; + + result.SelectTime = useTableSelect ? SelectFromTable() : TDuration::Seconds(0); + result.UpsertTime = useTableUpsert ? UpsertIntoTable() : TDuration::Seconds(0); + result.CommitTime = Commit(); Rows.clear(); Transaction = std::nullopt; + + return result; } void TTransactionSupport::AppendRow(const TString& m) @@ -67,18 +78,62 @@ void TTransactionSupport::CreateSession() Session = result.GetSession(); } -void TTransactionSupport::UpsertIntoTable() +TDuration TTransactionSupport::SelectFromTable() { Y_VERIFY(Transaction); - TString query = R"( - DECLARE $rows AS List<Struct< - id: Uint64, - value: String - >>; + ui64 left = RandomNumber<ui64>(); + ui64 right = RandomNumber<ui64>(); + + if (right < left) { + std::swap(left, right); + } + + TString query = " \ + DECLARE $left AS Uint64; \ + DECLARE $right AS Uint64; \ + \ + SELECT COUNT(id) FROM `" + ReadOnlyTableName + "` WHERE ($left <= id) AND (id < $right); \ + "; + + NYdb::TParamsBuilder builder; + + builder.AddParam("$left").Uint64(left).Build(); + builder.AddParam("$right").Uint64(right).Build(); + + auto params = builder.Build(); + + NYdb::NTable::TExecDataQuerySettings settings; + settings.KeepInQueryCache(true); + + auto runQuery = [this, &query, ¶ms, &settings](NYdb::NTable::TSession) -> NYdb::TStatus { + return Transaction->GetSession().ExecuteDataQuery(query, + NYdb::NTable::TTxControl::Tx(*Transaction), + params, + settings).GetValueSync(); + }; + + auto beginTime = TInstant::Now(); + auto result = TableClient.RetryOperationSync(runQuery); + auto duration = TInstant::Now() - beginTime; + + EnsureSuccess(result, "SELECT"); - UPSERT INTO `)" + TableName + R"(` (SELECT id, value FROM AS_TABLE($rows)); - )"; + return duration; +} + +TDuration TTransactionSupport::UpsertIntoTable() +{ + Y_VERIFY(Transaction); + + TString query = " \ + DECLARE $rows AS List<Struct< \ + id: Uint64, \ + value: String \ + >>; \ + \ + UPSERT INTO `" + WriteOnlyTableName + "` (SELECT id, value FROM AS_TABLE($rows)); \ + "; NYdb::TParamsBuilder builder; @@ -106,17 +161,28 @@ void TTransactionSupport::UpsertIntoTable() settings).GetValueSync(); }; + + auto beginTime = TInstant::Now(); auto result = TableClient.RetryOperationSync(runQuery); + auto duration = TInstant::Now() - beginTime; + EnsureSuccess(result, "UPSERT"); + + return duration; } -void TTransactionSupport::Commit() +TDuration TTransactionSupport::Commit() { Y_VERIFY(Transaction); auto settings = NYdb::NTable::TCommitTxSettings(); + auto beginTime = TInstant::Now(); auto result = Transaction->Commit(settings).GetValueSync(); + auto duration = TInstant::Now() - beginTime; + EnsureSuccess(result, "COMMIT"); + + return duration; } } diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h index 80f74fa2a2..ab549b6f71 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h @@ -6,11 +6,18 @@ namespace NYdb::NConsoleClient { class TTransactionSupport { public: + struct TExecutionTimes { + TDuration SelectTime; + TDuration UpsertTime; + TDuration CommitTime; + }; + TTransactionSupport(const NYdb::TDriver& driver, - const TString& tableName); + const TString& readOnlyTableName, + const TString& writeOnyTableName); void BeginTx(); - void CommitTx(); + TExecutionTimes CommitTx(bool useTableSelect, bool useTableUpsert); void AppendRow(const TString& data); struct TRow { @@ -23,11 +30,13 @@ public: private: void CreateSession(); - void UpsertIntoTable(); - void Commit(); + TDuration SelectFromTable(); + TDuration UpsertIntoTable(); + TDuration Commit(); NYdb::NTable::TTableClient TableClient; - TString TableName; + TString ReadOnlyTableName; + TString WriteOnlyTableName; std::optional<NYdb::NTable::TSession> Session; }; diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp index 871078a8a6..c3ae438f5e 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp @@ -12,6 +12,8 @@ TTopicWorkloadStats::TTopicWorkloadStats() , ReadBytes(0) , ReadMessages(0) , FullTimeHist(HighestTrackableTime, 5) + , SelectTimeHist(HighestTrackableTime, 2) + , UpsertTimeHist(HighestTrackableTime, 2) , CommitTxTimeHist(HighestTrackableTime, 2) { } @@ -37,6 +39,16 @@ void TTopicWorkloadStats::AddEvent(const LagEvent& event) LagTimeHist.RecordValue(Min(event.LagTime, HighestTrackableTime)); } +void TTopicWorkloadStats::AddEvent(const SelectEvent& event) +{ + SelectTimeHist.RecordValue(Min(event.Time, HighestTrackableTime)); +} + +void TTopicWorkloadStats::AddEvent(const UpsertEvent& event) +{ + UpsertTimeHist.RecordValue(Min(event.Time, HighestTrackableTime)); +} + void TTopicWorkloadStats::AddEvent(const CommitTxEvent& event) { CommitTxTimeHist.RecordValue(Min(event.Time, HighestTrackableTime)); diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h index c1a1026485..9d4cdb7fa5 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h @@ -19,6 +19,12 @@ namespace NYdb { ui64 LagMessages; ui64 LagTime; }; + struct SelectEvent { + ui64 Time; + }; + struct UpsertEvent { + ui64 Time; + }; struct CommitTxEvent { ui64 Time; }; @@ -28,6 +34,8 @@ namespace NYdb { void AddEvent(const WriterEvent& event); void AddEvent(const ReaderEvent& event); void AddEvent(const LagEvent& event); + void AddEvent(const SelectEvent& event); + void AddEvent(const UpsertEvent& event); void AddEvent(const CommitTxEvent& event); ui64 WriteBytes; @@ -39,6 +47,8 @@ namespace NYdb { ui64 ReadBytes; ui64 ReadMessages; NHdr::THistogram FullTimeHist; + NHdr::THistogram SelectTimeHist; + NHdr::THistogram UpsertTimeHist; NHdr::THistogram CommitTxTimeHist; private: diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp index b1f57e45a8..a1d86a1754 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp @@ -11,9 +11,11 @@ TTopicWorkloadStatsCollector::TTopicWorkloadStatsCollector( bool quiet, bool printTimestamp, ui32 windowDurationSec, ui32 totalDurationSec, ui32 warmupSec, double percentile, - std::shared_ptr<std::atomic_bool> errorFlag) + std::shared_ptr<std::atomic_bool> errorFlag, + bool transferMode) : WriterCount(writerCount) , ReaderCount(readerCount) + , TransferMode(transferMode) , Quiet(quiet) , PrintTimestamp(printTimestamp) , WindowSec(windowDurationSec) @@ -30,6 +32,8 @@ TTopicWorkloadStatsCollector::TTopicWorkloadStatsCollector( for (size_t readerIdx = 0; readerIdx < readerCount; readerIdx++) { AddQueue(ReaderEventQueues); AddQueue(LagEventQueues); + AddQueue(SelectEventQueues); + AddQueue(UpsertEventQueues); AddQueue(CommitTxEventQueues); } } @@ -43,17 +47,39 @@ void TTopicWorkloadStatsCollector::PrintHeader(bool total) const { header << "Window\t"; if (WriterCount > 0) header << "Write speed\tWrite time\tInflight\t"; - if (ReaderCount > 0) - header << "Lag\t\tLag time\tRead speed\tFull time\t"; + if (ReaderCount > 0) { + if (!TransferMode) + header << "Lag\t\tLag time\t"; + header << "Read speed\t"; + if (TransferMode) { + header << "Topic time\t"; + } else { + header << "Full time\t"; + } + } + if (TransferMode) { + header << "Select time\t"; + header << "Upsert time\t"; + header << "Commit time\t"; + } if (PrintTimestamp) header << "Timestamp"; + header << "\n"; header << "#\t"; if (WriterCount > 0) header << "msg/s\tMB/s\tpercentile,ms\tpercentile,msg\t"; - if (ReaderCount > 0) - header << "percentile,msg\tpercentile,ms\tmsg/s\tMB/s\tpercentile,ms"; + if (ReaderCount > 0) { + if (!TransferMode) + header << "percentile,msg\tpercentile,ms\t"; + header << "msg/s\tMB/s\tpercentile,ms\t"; + } + if (TransferMode) { + header << "percentile,ms\t"; + header << "percentile,ms\t"; + header << "percentile,ms\t"; + } header << "\n"; Cout << header << Flush; @@ -108,12 +134,19 @@ void TTopicWorkloadStatsCollector::PrintStats(TMaybe<ui32> windowIt) const { << "\t" << stats.InflightMessagesHist.GetValueAtPercentile(Percentile) << "\t"; } if (ReaderCount > 0) { - Cout << "\t" << stats.LagMessagesHist.GetValueAtPercentile(Percentile) << "\t" - << "\t" << stats.LagTimeHist.GetValueAtPercentile(Percentile) << "\t" - << "\t" << (int)(stats.ReadMessages / seconds) + if (!TransferMode) { + Cout << "\t" << stats.LagMessagesHist.GetValueAtPercentile(Percentile) << "\t" + << "\t" << stats.LagTimeHist.GetValueAtPercentile(Percentile) << "\t"; + } + Cout << "\t" << (int)(stats.ReadMessages / seconds) << "\t" << (int)(stats.ReadBytes / seconds / 1024 / 1024) << "\t" << stats.FullTimeHist.GetValueAtPercentile(Percentile) << "\t"; } + if (TransferMode) { + Cout << "\t" << stats.SelectTimeHist.GetValueAtPercentile(Percentile) << "\t"; + Cout << "\t" << stats.UpsertTimeHist.GetValueAtPercentile(Percentile) << "\t"; + Cout << "\t" << stats.CommitTxTimeHist.GetValueAtPercentile(Percentile) << "\t"; + } if (PrintTimestamp) { Cout << "\t" << Now().ToStringUpToSeconds(); } @@ -125,6 +158,8 @@ void TTopicWorkloadStatsCollector::CollectThreadEvents() CollectThreadEvents(WriterEventQueues); CollectThreadEvents(ReaderEventQueues); CollectThreadEvents(LagEventQueues); + CollectThreadEvents(SelectEventQueues); + CollectThreadEvents(UpsertEventQueues); CollectThreadEvents(CommitTxEventQueues); } @@ -170,6 +205,16 @@ void TTopicWorkloadStatsCollector::AddLagEvent(size_t readerIdx, const TTopicWor AddEvent(readerIdx, LagEventQueues, event); } +void TTopicWorkloadStatsCollector::AddSelectEvent(size_t readerIdx, const TTopicWorkloadStats::SelectEvent& event) +{ + AddEvent(readerIdx, SelectEventQueues, event); +} + +void TTopicWorkloadStatsCollector::AddUpsertEvent(size_t readerIdx, const TTopicWorkloadStats::UpsertEvent& event) +{ + AddEvent(readerIdx, UpsertEventQueues, event); +} + void TTopicWorkloadStatsCollector::AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event) { AddEvent(readerIdx, CommitTxEventQueues, event); diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h index b6d856a6ab..3e36baf372 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h @@ -15,7 +15,8 @@ namespace NYdb { bool quiet, bool printTimestamp, ui32 windowDurationSec, ui32 totalDurationSec, ui32 warmupSec, double Percentile, - std::shared_ptr<std::atomic_bool> errorFlag); + std::shared_ptr<std::atomic_bool> errorFlag, + bool transferMode); void PrintWindowStatsLoop(); @@ -25,6 +26,8 @@ namespace NYdb { void AddWriterEvent(size_t writerIdx, const TTopicWorkloadStats::WriterEvent& event); void AddReaderEvent(size_t readerIdx, const TTopicWorkloadStats::ReaderEvent& event); void AddLagEvent(size_t readerIdx, const TTopicWorkloadStats::LagEvent& event); + void AddSelectEvent(size_t readerIdx, const TTopicWorkloadStats::SelectEvent& event); + void AddUpsertEvent(size_t readerIdx, const TTopicWorkloadStats::UpsertEvent& event); void AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event); ui64 GetTotalReadMessages() const; @@ -49,10 +52,13 @@ namespace NYdb { size_t WriterCount; size_t ReaderCount; + bool TransferMode; TEventQueues<TTopicWorkloadStats::WriterEvent> WriterEventQueues; TEventQueues<TTopicWorkloadStats::ReaderEvent> ReaderEventQueues; TEventQueues<TTopicWorkloadStats::LagEvent> LagEventQueues; + TEventQueues<TTopicWorkloadStats::SelectEvent> SelectEventQueues; + TEventQueues<TTopicWorkloadStats::UpsertEvent> UpsertEventQueues; TEventQueues<TTopicWorkloadStats::CommitTxEvent> CommitTxEventQueues; bool Quiet; diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp index fefd50eb56..2f16af1f56 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp @@ -8,7 +8,8 @@ using namespace NYdb::NConsoleClient; int TCommandWorkloadTransferTopicToTableClean::TScenario::DoRun(const TConfig& config) { DropTopic(config.Database, TopicName); - DropTable(config.Database, TableName); + DropTable(config.Database, GetWriteOnlyTableName()); + DropTable(config.Database, GetReadOnlyTableName()); return EXIT_SUCCESS; } diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp index d8ee6ca626..abf94d8300 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp @@ -3,18 +3,21 @@ #include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <util/random/random.h> + using namespace NYdb::NConsoleClient; int TCommandWorkloadTransferTopicToTableInit::TScenario::DoRun(const TConfig& config) { CreateTopic(config.Database, TopicName, TopicPartitionCount, ConsumerCount); - CreateTable(TableName, TablePartitionCount); + CreateWriteOnlyTable(GetWriteOnlyTableName(), TablePartitionCount); + CreateReadOnlyTable(GetReadOnlyTableName(), TablePartitionCount); return EXIT_SUCCESS; } -void TCommandWorkloadTransferTopicToTableInit::TScenario::CreateTable(const TString& name, - ui32 partitionCount) +void TCommandWorkloadTransferTopicToTableInit::TScenario::CreateWriteOnlyTable(const TString& name, + ui32 partitionCount) { TStringBuilder query; query << "CREATE TABLE `"; @@ -28,6 +31,53 @@ void TCommandWorkloadTransferTopicToTableInit::TScenario::CreateTable(const TStr ExecSchemeQuery(query); } +void TCommandWorkloadTransferTopicToTableInit::TScenario::CreateReadOnlyTable(const TString& name, + ui32 partitionCount) +{ + TStringBuilder query; + query << "CREATE TABLE `"; + query << name; + query << "` (id Uint64, PRIMARY KEY (id)) WITH (UNIFORM_PARTITIONS = "; + query << partitionCount; + query << ", AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = "; + query << partitionCount; + query << ")"; + + ExecSchemeQuery(query); + + for (int i = 0; i < 10; ++i) { + UpsertRandomKeyBlock(); + } +} + +void TCommandWorkloadTransferTopicToTableInit::TScenario::UpsertRandomKeyBlock() +{ + TString query = R"( + DECLARE $rows AS List<Struct< + id: Uint64 + >>; + + UPSERT INTO `)" + GetReadOnlyTableName() + R"(` (SELECT id FROM AS_TABLE($rows)); + )"; + + NYdb::TParamsBuilder builder; + + auto& rows = builder.AddParam("$rows"); + rows.BeginList(); + for (int i = 0; i < 100'000; ++i) { + rows.AddListItem() + .BeginStruct() + .AddMember("id").Uint64(RandomNumber<ui64>()) + .EndStruct(); + } + rows.EndList(); + rows.Build(); + + auto params = builder.Build(); + + ExecDataQuery(query, params); +} + TCommandWorkloadTransferTopicToTableInit::TCommandWorkloadTransferTopicToTableInit() : TWorkloadCommand("init", {}, "Creates and initializes objects") { diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h index 1995406db4..417cb1c73f 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h @@ -17,7 +17,10 @@ private: class TScenario : public TTopicOperationsScenario { int DoRun(const TConfig& config) override; - void CreateTable(const TString& table, ui32 partitionCount); + void CreateWriteOnlyTable(const TString& table, ui32 partitionCount); + void CreateReadOnlyTable(const TString& table, ui32 partitionCount); + + void UpsertRandomKeyBlock(); }; TScenario Scenario; diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp index 891d4518c5..057c8471e1 100644 --- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp +++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp @@ -80,14 +80,24 @@ void TCommandWorkloadTransferTopicToTableRun::Config(TConfig& config) .Optional() .DefaultValue((TStringBuilder() << NTopic::ECodec::RAW)) .StoreMappedResultT<TString>(&Scenario.Codec, &TCommandWorkloadTopicParams::StrToCodec); + + config.Opts->MutuallyExclusive("message-rate", "byte-rate"); + config.Opts->AddLongOption("commit-period", "Waiting time between commit.") .DefaultValue(10) .StoreResult(&Scenario.CommitPeriod); - config.Opts->AddLongOption("use-topic-commit", "Use TopicAPI commit.") + config.Opts->AddLongOption("commit-messages", "Number of messages per transaction") + .DefaultValue(1'000'000) + .StoreResult(&Scenario.CommitMessages); + + config.Opts->AddLongOption("only-topic-in-tx", "Use only topic in transaction") + .DefaultValue(false) + .StoreTrue(&Scenario.OnlyTopicInTx); + config.Opts->AddLongOption("only-table-in-tx", "Use only table in transaction") .DefaultValue(false) - .StoreTrue(&Scenario.UseTopicApiCommit); + .StoreTrue(&Scenario.OnlyTableInTx); - config.Opts->MutuallyExclusive("message-rate", "byte-rate"); + config.Opts->MutuallyExclusive("only-topic-in-tx", "only-table-in-tx"); config.IsNetworkIntensive = true; } |