aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-09-29 09:29:51 +0300
committerabcdef <akotov@ydb.tech>2023-09-29 09:49:13 +0300
commitceda18005f87ee1bc87fbc1d516b8b9e39028df5 (patch)
tree703163946f24f566787d6c9dd5200ce9172282e7
parent82c27d07b8cca223d3ec075911da0541c782b7b7 (diff)
downloadydb-ceda18005f87ee1bc87fbc1d516b8b9e39028df5.tar.gz
- добавил опцию `--commit-messages`
- добавил чтение из таблицы в сценарии с транзакциями - добавил опции `--only-table-in-tx` и `--only-topic-in-tx` - поменял вывод в таблицу статистики
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp34
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h12
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp20
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h6
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp96
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h19
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp12
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h10
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp61
-rw-r--r--ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h8
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp3
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp56
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h5
-rw-r--r--ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp16
14 files changed, 305 insertions, 53 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
index 10410f2640..c4f7923350 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp
@@ -44,6 +44,16 @@ void TTopicOperationsScenario::EnsureWarmupSecIsValid() const
}
}
+TString TTopicOperationsScenario::GetReadOnlyTableName() const
+{
+ return TableName + "-ro";
+}
+
+TString TTopicOperationsScenario::GetWriteOnlyTableName() const
+{
+ return TableName;
+}
+
THolder<TLogBackend> TTopicOperationsScenario::MakeLogBackend(TConfig::EVerbosityLevel level)
{
return CreateLogBackend("cerr",
@@ -74,7 +84,8 @@ void TTopicOperationsScenario::InitStatsCollector()
TotalSec.Seconds(),
WarmupSec.Seconds(),
Percentile,
- ErrorFlag);
+ ErrorFlag,
+ UseTransactions);
}
void TTopicOperationsScenario::CreateTopic(const TString& database,
@@ -118,6 +129,17 @@ void TTopicOperationsScenario::ExecSchemeQuery(const TString& query)
ThrowOnError(result);
}
+void TTopicOperationsScenario::ExecDataQuery(const TString& query,
+ const NYdb::TParams& params)
+{
+ NTable::TTableClient client(*Driver);
+ auto session = GetSession(client);
+ auto result = session.ExecuteDataQuery(query,
+ NTable::TTxControl::BeginTx(NTable::TTxSettings::SerializableRW()).CommitTx(),
+ params).ExtractValueSync();
+ ThrowOnError(result);
+}
+
void TTopicOperationsScenario::EnsureTopicNotExist(const TString& topic)
{
Y_VERIFY(Driver);
@@ -175,13 +197,17 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
.StartedCount = count,
.Database = database,
.TopicName = TopicName,
- .TableName = TableName,
+ .TableName = GetWriteOnlyTableName(),
+ .ReadOnlyTableName = GetReadOnlyTableName(),
.ConsumerIdx = consumerIdx,
.ConsumerPrefix = ConsumerPrefix,
.ReaderIdx = readerIdx,
.UseTransactions = UseTransactions,
- .UseTopicApiCommit = UseTopicApiCommit,
- .CommitPeriod = CommitPeriod
+ .UseTopicCommit = OnlyTableInTx,
+ .UseTableSelect = UseTableSelect && !OnlyTopicInTx,
+ .UseTableUpsert = !OnlyTopicInTx,
+ .CommitPeriod = CommitPeriod,
+ .CommitMessages = CommitMessages
};
threads.push_back(std::async([readerParams = std::move(readerParams)]() mutable { TTopicWorkloadReader::RetryableReaderLoop(readerParams); }));
diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
index d08847a035..dc702ed79c 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
@@ -18,6 +18,7 @@ class TLog;
namespace NYdb {
class TDriver;
+class TParams;
}
@@ -41,6 +42,9 @@ public:
void EnsurePercentileIsValid() const;
void EnsureWarmupSecIsValid() const;
+ TString GetReadOnlyTableName() const;
+ TString GetWriteOnlyTableName() const;
+
TDuration TotalSec;
TDuration WindowSec;
TDuration WarmupSec;
@@ -61,8 +65,11 @@ public:
TString TableName;
ui32 TablePartitionCount = 1;
bool UseTransactions = false;
- size_t CommitPeriod = 15;
- bool UseTopicApiCommit = false;
+ size_t CommitPeriod = 10;
+ size_t CommitMessages = 1'000'000;
+ bool OnlyTopicInTx = false;
+ bool OnlyTableInTx = false;
+ bool UseTableSelect = true;
protected:
void CreateTopic(const TString& database,
@@ -75,6 +82,7 @@ protected:
void DropTable(const TString& database, const TString& table);
void ExecSchemeQuery(const TString& query);
+ void ExecDataQuery(const TString& query, const NYdb::TParams& params);
void StartConsumerThreads(std::vector<std::future<void>>& threads,
const TString& database);
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
index aeca9f0004..d14840a6f1 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.cpp
@@ -35,7 +35,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
}
if (params.UseTransactions) {
- txSupport.emplace(params.Driver, params.TableName);
+ txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName);
}
NYdb::NTopic::TReadSessionSettings settings;
@@ -93,7 +93,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
<< " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
}
- if (!txSupport || params.UseTopicApiCommit) {
+ if (!txSupport || params.UseTopicCommit) {
dataEvent->Commit();
}
} else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
@@ -143,12 +143,14 @@ TVector<NYdb::NTopic::TReadSessionEvent::TEvent> TTopicWorkloadReader::GetEvents
settings.Block(false);
- if (txSupport && !params.UseTopicApiCommit) {
+ if (txSupport) {
if (!txSupport->Transaction) {
txSupport->BeginTx();
}
- settings.Tx(*txSupport->Transaction);
+ if (!params.UseTopicCommit) {
+ settings.Tx(*txSupport->Transaction);
+ }
}
return readSession.GetEvents(settings);
@@ -161,7 +163,7 @@ void TTopicWorkloadReader::TryCommitTx(TTopicWorkloadReaderParams& params,
{
Y_VERIFY(txSupport);
- if (commitTime > Now()) {
+ if ((commitTime > Now()) && (params.CommitMessages > txSupport->Rows.size())) {
return;
}
@@ -178,11 +180,11 @@ void TTopicWorkloadReader::TryCommitTableChanges(TTopicWorkloadReaderParams& par
return;
}
- auto begin = TInstant::Now();
- txSupport->CommitTx();
- ui64 duration = (TInstant::Now() - begin).MilliSeconds();
+ auto execTimes = txSupport->CommitTx(params.UseTableSelect, params.UseTableUpsert);
- params.StatsCollector->AddCommitTxEvent(params.ReaderIdx, {duration});
+ params.StatsCollector->AddSelectEvent(params.ReaderIdx, {execTimes.SelectTime.MilliSeconds()});
+ params.StatsCollector->AddUpsertEvent(params.ReaderIdx, {execTimes.UpsertTime.MilliSeconds()});
+ params.StatsCollector->AddCommitTxEvent(params.ReaderIdx, {execTimes.CommitTime.MilliSeconds()});
}
void TTopicWorkloadReader::GracefullShutdown(TVector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>& stopPartitionSessionEvents)
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h
index 7a74f131e0..09ced96f17 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h
@@ -22,12 +22,16 @@ namespace NYdb {
TString Database;
TString TopicName;
TString TableName;
+ TString ReadOnlyTableName;
ui32 ConsumerIdx;
TString ConsumerPrefix;
ui64 ReaderIdx;
bool UseTransactions = false;
- bool UseTopicApiCommit = false;
+ bool UseTopicCommit = false;
+ bool UseTableSelect = true;
+ bool UseTableUpsert = true;
size_t CommitPeriod = 15;
+ size_t CommitMessages = 1'000'000;
};
class TTransactionSupport;
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp
index 2669d95a67..93720fdc2e 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.cpp
@@ -6,17 +6,23 @@ namespace NYdb::NConsoleClient {
static void EnsureSuccess(const NYdb::TStatus& status, std::string_view name)
{
- Y_VERIFY(!status.IsTransportError());
+ if (status.IsTransportError()) {
+ Cerr << "transport error on " << name << ": " << status << Endl;
+ ythrow yexception() << "transport error on " << name << ": " << status;
+ }
if (!status.IsSuccess()) {
+ Cerr << "error on " << name << ": " << status << Endl;
ythrow yexception() << "error on " << name << ": " << status;
}
}
TTransactionSupport::TTransactionSupport(const NYdb::TDriver& driver,
- const TString& tableName) :
+ const TString& readOnlyTableName,
+ const TString& writeOnlyTableName) :
TableClient(driver),
- TableName(tableName)
+ ReadOnlyTableName(readOnlyTableName),
+ WriteOnlyTableName(writeOnlyTableName)
{
}
@@ -37,15 +43,20 @@ void TTransactionSupport::BeginTx()
Transaction = result.GetTransaction();
}
-void TTransactionSupport::CommitTx()
+auto TTransactionSupport::CommitTx(bool useTableSelect, bool useTableUpsert) -> TExecutionTimes
{
Y_VERIFY(Transaction);
- UpsertIntoTable();
- Commit();
+ TExecutionTimes result;
+
+ result.SelectTime = useTableSelect ? SelectFromTable() : TDuration::Seconds(0);
+ result.UpsertTime = useTableUpsert ? UpsertIntoTable() : TDuration::Seconds(0);
+ result.CommitTime = Commit();
Rows.clear();
Transaction = std::nullopt;
+
+ return result;
}
void TTransactionSupport::AppendRow(const TString& m)
@@ -67,18 +78,62 @@ void TTransactionSupport::CreateSession()
Session = result.GetSession();
}
-void TTransactionSupport::UpsertIntoTable()
+TDuration TTransactionSupport::SelectFromTable()
{
Y_VERIFY(Transaction);
- TString query = R"(
- DECLARE $rows AS List<Struct<
- id: Uint64,
- value: String
- >>;
+ ui64 left = RandomNumber<ui64>();
+ ui64 right = RandomNumber<ui64>();
+
+ if (right < left) {
+ std::swap(left, right);
+ }
+
+ TString query = " \
+ DECLARE $left AS Uint64; \
+ DECLARE $right AS Uint64; \
+ \
+ SELECT COUNT(id) FROM `" + ReadOnlyTableName + "` WHERE ($left <= id) AND (id < $right); \
+ ";
+
+ NYdb::TParamsBuilder builder;
+
+ builder.AddParam("$left").Uint64(left).Build();
+ builder.AddParam("$right").Uint64(right).Build();
+
+ auto params = builder.Build();
+
+ NYdb::NTable::TExecDataQuerySettings settings;
+ settings.KeepInQueryCache(true);
+
+ auto runQuery = [this, &query, &params, &settings](NYdb::NTable::TSession) -> NYdb::TStatus {
+ return Transaction->GetSession().ExecuteDataQuery(query,
+ NYdb::NTable::TTxControl::Tx(*Transaction),
+ params,
+ settings).GetValueSync();
+ };
+
+ auto beginTime = TInstant::Now();
+ auto result = TableClient.RetryOperationSync(runQuery);
+ auto duration = TInstant::Now() - beginTime;
+
+ EnsureSuccess(result, "SELECT");
- UPSERT INTO `)" + TableName + R"(` (SELECT id, value FROM AS_TABLE($rows));
- )";
+ return duration;
+}
+
+TDuration TTransactionSupport::UpsertIntoTable()
+{
+ Y_VERIFY(Transaction);
+
+ TString query = " \
+ DECLARE $rows AS List<Struct< \
+ id: Uint64, \
+ value: String \
+ >>; \
+ \
+ UPSERT INTO `" + WriteOnlyTableName + "` (SELECT id, value FROM AS_TABLE($rows)); \
+ ";
NYdb::TParamsBuilder builder;
@@ -106,17 +161,28 @@ void TTransactionSupport::UpsertIntoTable()
settings).GetValueSync();
};
+
+ auto beginTime = TInstant::Now();
auto result = TableClient.RetryOperationSync(runQuery);
+ auto duration = TInstant::Now() - beginTime;
+
EnsureSuccess(result, "UPSERT");
+
+ return duration;
}
-void TTransactionSupport::Commit()
+TDuration TTransactionSupport::Commit()
{
Y_VERIFY(Transaction);
auto settings = NYdb::NTable::TCommitTxSettings();
+ auto beginTime = TInstant::Now();
auto result = Transaction->Commit(settings).GetValueSync();
+ auto duration = TInstant::Now() - beginTime;
+
EnsureSuccess(result, "COMMIT");
+
+ return duration;
}
}
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h
index 80f74fa2a2..ab549b6f71 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader_transaction_support.h
@@ -6,11 +6,18 @@ namespace NYdb::NConsoleClient {
class TTransactionSupport {
public:
+ struct TExecutionTimes {
+ TDuration SelectTime;
+ TDuration UpsertTime;
+ TDuration CommitTime;
+ };
+
TTransactionSupport(const NYdb::TDriver& driver,
- const TString& tableName);
+ const TString& readOnlyTableName,
+ const TString& writeOnyTableName);
void BeginTx();
- void CommitTx();
+ TExecutionTimes CommitTx(bool useTableSelect, bool useTableUpsert);
void AppendRow(const TString& data);
struct TRow {
@@ -23,11 +30,13 @@ public:
private:
void CreateSession();
- void UpsertIntoTable();
- void Commit();
+ TDuration SelectFromTable();
+ TDuration UpsertIntoTable();
+ TDuration Commit();
NYdb::NTable::TTableClient TableClient;
- TString TableName;
+ TString ReadOnlyTableName;
+ TString WriteOnlyTableName;
std::optional<NYdb::NTable::TSession> Session;
};
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
index 871078a8a6..c3ae438f5e 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.cpp
@@ -12,6 +12,8 @@ TTopicWorkloadStats::TTopicWorkloadStats()
, ReadBytes(0)
, ReadMessages(0)
, FullTimeHist(HighestTrackableTime, 5)
+ , SelectTimeHist(HighestTrackableTime, 2)
+ , UpsertTimeHist(HighestTrackableTime, 2)
, CommitTxTimeHist(HighestTrackableTime, 2)
{
}
@@ -37,6 +39,16 @@ void TTopicWorkloadStats::AddEvent(const LagEvent& event)
LagTimeHist.RecordValue(Min(event.LagTime, HighestTrackableTime));
}
+void TTopicWorkloadStats::AddEvent(const SelectEvent& event)
+{
+ SelectTimeHist.RecordValue(Min(event.Time, HighestTrackableTime));
+}
+
+void TTopicWorkloadStats::AddEvent(const UpsertEvent& event)
+{
+ UpsertTimeHist.RecordValue(Min(event.Time, HighestTrackableTime));
+}
+
void TTopicWorkloadStats::AddEvent(const CommitTxEvent& event)
{
CommitTxTimeHist.RecordValue(Min(event.Time, HighestTrackableTime));
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h
index c1a1026485..9d4cdb7fa5 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats.h
@@ -19,6 +19,12 @@ namespace NYdb {
ui64 LagMessages;
ui64 LagTime;
};
+ struct SelectEvent {
+ ui64 Time;
+ };
+ struct UpsertEvent {
+ ui64 Time;
+ };
struct CommitTxEvent {
ui64 Time;
};
@@ -28,6 +34,8 @@ namespace NYdb {
void AddEvent(const WriterEvent& event);
void AddEvent(const ReaderEvent& event);
void AddEvent(const LagEvent& event);
+ void AddEvent(const SelectEvent& event);
+ void AddEvent(const UpsertEvent& event);
void AddEvent(const CommitTxEvent& event);
ui64 WriteBytes;
@@ -39,6 +47,8 @@ namespace NYdb {
ui64 ReadBytes;
ui64 ReadMessages;
NHdr::THistogram FullTimeHist;
+ NHdr::THistogram SelectTimeHist;
+ NHdr::THistogram UpsertTimeHist;
NHdr::THistogram CommitTxTimeHist;
private:
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
index b1f57e45a8..a1d86a1754 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.cpp
@@ -11,9 +11,11 @@ TTopicWorkloadStatsCollector::TTopicWorkloadStatsCollector(
bool quiet, bool printTimestamp,
ui32 windowDurationSec, ui32 totalDurationSec, ui32 warmupSec,
double percentile,
- std::shared_ptr<std::atomic_bool> errorFlag)
+ std::shared_ptr<std::atomic_bool> errorFlag,
+ bool transferMode)
: WriterCount(writerCount)
, ReaderCount(readerCount)
+ , TransferMode(transferMode)
, Quiet(quiet)
, PrintTimestamp(printTimestamp)
, WindowSec(windowDurationSec)
@@ -30,6 +32,8 @@ TTopicWorkloadStatsCollector::TTopicWorkloadStatsCollector(
for (size_t readerIdx = 0; readerIdx < readerCount; readerIdx++) {
AddQueue(ReaderEventQueues);
AddQueue(LagEventQueues);
+ AddQueue(SelectEventQueues);
+ AddQueue(UpsertEventQueues);
AddQueue(CommitTxEventQueues);
}
}
@@ -43,17 +47,39 @@ void TTopicWorkloadStatsCollector::PrintHeader(bool total) const {
header << "Window\t";
if (WriterCount > 0)
header << "Write speed\tWrite time\tInflight\t";
- if (ReaderCount > 0)
- header << "Lag\t\tLag time\tRead speed\tFull time\t";
+ if (ReaderCount > 0) {
+ if (!TransferMode)
+ header << "Lag\t\tLag time\t";
+ header << "Read speed\t";
+ if (TransferMode) {
+ header << "Topic time\t";
+ } else {
+ header << "Full time\t";
+ }
+ }
+ if (TransferMode) {
+ header << "Select time\t";
+ header << "Upsert time\t";
+ header << "Commit time\t";
+ }
if (PrintTimestamp)
header << "Timestamp";
+
header << "\n";
header << "#\t";
if (WriterCount > 0)
header << "msg/s\tMB/s\tpercentile,ms\tpercentile,msg\t";
- if (ReaderCount > 0)
- header << "percentile,msg\tpercentile,ms\tmsg/s\tMB/s\tpercentile,ms";
+ if (ReaderCount > 0) {
+ if (!TransferMode)
+ header << "percentile,msg\tpercentile,ms\t";
+ header << "msg/s\tMB/s\tpercentile,ms\t";
+ }
+ if (TransferMode) {
+ header << "percentile,ms\t";
+ header << "percentile,ms\t";
+ header << "percentile,ms\t";
+ }
header << "\n";
Cout << header << Flush;
@@ -108,12 +134,19 @@ void TTopicWorkloadStatsCollector::PrintStats(TMaybe<ui32> windowIt) const {
<< "\t" << stats.InflightMessagesHist.GetValueAtPercentile(Percentile) << "\t";
}
if (ReaderCount > 0) {
- Cout << "\t" << stats.LagMessagesHist.GetValueAtPercentile(Percentile) << "\t"
- << "\t" << stats.LagTimeHist.GetValueAtPercentile(Percentile) << "\t"
- << "\t" << (int)(stats.ReadMessages / seconds)
+ if (!TransferMode) {
+ Cout << "\t" << stats.LagMessagesHist.GetValueAtPercentile(Percentile) << "\t"
+ << "\t" << stats.LagTimeHist.GetValueAtPercentile(Percentile) << "\t";
+ }
+ Cout << "\t" << (int)(stats.ReadMessages / seconds)
<< "\t" << (int)(stats.ReadBytes / seconds / 1024 / 1024)
<< "\t" << stats.FullTimeHist.GetValueAtPercentile(Percentile) << "\t";
}
+ if (TransferMode) {
+ Cout << "\t" << stats.SelectTimeHist.GetValueAtPercentile(Percentile) << "\t";
+ Cout << "\t" << stats.UpsertTimeHist.GetValueAtPercentile(Percentile) << "\t";
+ Cout << "\t" << stats.CommitTxTimeHist.GetValueAtPercentile(Percentile) << "\t";
+ }
if (PrintTimestamp) {
Cout << "\t" << Now().ToStringUpToSeconds();
}
@@ -125,6 +158,8 @@ void TTopicWorkloadStatsCollector::CollectThreadEvents()
CollectThreadEvents(WriterEventQueues);
CollectThreadEvents(ReaderEventQueues);
CollectThreadEvents(LagEventQueues);
+ CollectThreadEvents(SelectEventQueues);
+ CollectThreadEvents(UpsertEventQueues);
CollectThreadEvents(CommitTxEventQueues);
}
@@ -170,6 +205,16 @@ void TTopicWorkloadStatsCollector::AddLagEvent(size_t readerIdx, const TTopicWor
AddEvent(readerIdx, LagEventQueues, event);
}
+void TTopicWorkloadStatsCollector::AddSelectEvent(size_t readerIdx, const TTopicWorkloadStats::SelectEvent& event)
+{
+ AddEvent(readerIdx, SelectEventQueues, event);
+}
+
+void TTopicWorkloadStatsCollector::AddUpsertEvent(size_t readerIdx, const TTopicWorkloadStats::UpsertEvent& event)
+{
+ AddEvent(readerIdx, UpsertEventQueues, event);
+}
+
void TTopicWorkloadStatsCollector::AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event)
{
AddEvent(readerIdx, CommitTxEventQueues, event);
diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h
index b6d856a6ab..3e36baf372 100644
--- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h
+++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_stats_collector.h
@@ -15,7 +15,8 @@ namespace NYdb {
bool quiet, bool printTimestamp,
ui32 windowDurationSec, ui32 totalDurationSec, ui32 warmupSec,
double Percentile,
- std::shared_ptr<std::atomic_bool> errorFlag);
+ std::shared_ptr<std::atomic_bool> errorFlag,
+ bool transferMode);
void PrintWindowStatsLoop();
@@ -25,6 +26,8 @@ namespace NYdb {
void AddWriterEvent(size_t writerIdx, const TTopicWorkloadStats::WriterEvent& event);
void AddReaderEvent(size_t readerIdx, const TTopicWorkloadStats::ReaderEvent& event);
void AddLagEvent(size_t readerIdx, const TTopicWorkloadStats::LagEvent& event);
+ void AddSelectEvent(size_t readerIdx, const TTopicWorkloadStats::SelectEvent& event);
+ void AddUpsertEvent(size_t readerIdx, const TTopicWorkloadStats::UpsertEvent& event);
void AddCommitTxEvent(size_t readerIdx, const TTopicWorkloadStats::CommitTxEvent& event);
ui64 GetTotalReadMessages() const;
@@ -49,10 +52,13 @@ namespace NYdb {
size_t WriterCount;
size_t ReaderCount;
+ bool TransferMode;
TEventQueues<TTopicWorkloadStats::WriterEvent> WriterEventQueues;
TEventQueues<TTopicWorkloadStats::ReaderEvent> ReaderEventQueues;
TEventQueues<TTopicWorkloadStats::LagEvent> LagEventQueues;
+ TEventQueues<TTopicWorkloadStats::SelectEvent> SelectEventQueues;
+ TEventQueues<TTopicWorkloadStats::UpsertEvent> UpsertEventQueues;
TEventQueues<TTopicWorkloadStats::CommitTxEvent> CommitTxEventQueues;
bool Quiet;
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
index fefd50eb56..2f16af1f56 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_clean.cpp
@@ -8,7 +8,8 @@ using namespace NYdb::NConsoleClient;
int TCommandWorkloadTransferTopicToTableClean::TScenario::DoRun(const TConfig& config)
{
DropTopic(config.Database, TopicName);
- DropTable(config.Database, TableName);
+ DropTable(config.Database, GetWriteOnlyTableName());
+ DropTable(config.Database, GetReadOnlyTableName());
return EXIT_SUCCESS;
}
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
index d8ee6ca626..abf94d8300 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.cpp
@@ -3,18 +3,21 @@
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <util/random/random.h>
+
using namespace NYdb::NConsoleClient;
int TCommandWorkloadTransferTopicToTableInit::TScenario::DoRun(const TConfig& config)
{
CreateTopic(config.Database, TopicName, TopicPartitionCount, ConsumerCount);
- CreateTable(TableName, TablePartitionCount);
+ CreateWriteOnlyTable(GetWriteOnlyTableName(), TablePartitionCount);
+ CreateReadOnlyTable(GetReadOnlyTableName(), TablePartitionCount);
return EXIT_SUCCESS;
}
-void TCommandWorkloadTransferTopicToTableInit::TScenario::CreateTable(const TString& name,
- ui32 partitionCount)
+void TCommandWorkloadTransferTopicToTableInit::TScenario::CreateWriteOnlyTable(const TString& name,
+ ui32 partitionCount)
{
TStringBuilder query;
query << "CREATE TABLE `";
@@ -28,6 +31,53 @@ void TCommandWorkloadTransferTopicToTableInit::TScenario::CreateTable(const TStr
ExecSchemeQuery(query);
}
+void TCommandWorkloadTransferTopicToTableInit::TScenario::CreateReadOnlyTable(const TString& name,
+ ui32 partitionCount)
+{
+ TStringBuilder query;
+ query << "CREATE TABLE `";
+ query << name;
+ query << "` (id Uint64, PRIMARY KEY (id)) WITH (UNIFORM_PARTITIONS = ";
+ query << partitionCount;
+ query << ", AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = ";
+ query << partitionCount;
+ query << ")";
+
+ ExecSchemeQuery(query);
+
+ for (int i = 0; i < 10; ++i) {
+ UpsertRandomKeyBlock();
+ }
+}
+
+void TCommandWorkloadTransferTopicToTableInit::TScenario::UpsertRandomKeyBlock()
+{
+ TString query = R"(
+ DECLARE $rows AS List<Struct<
+ id: Uint64
+ >>;
+
+ UPSERT INTO `)" + GetReadOnlyTableName() + R"(` (SELECT id FROM AS_TABLE($rows));
+ )";
+
+ NYdb::TParamsBuilder builder;
+
+ auto& rows = builder.AddParam("$rows");
+ rows.BeginList();
+ for (int i = 0; i < 100'000; ++i) {
+ rows.AddListItem()
+ .BeginStruct()
+ .AddMember("id").Uint64(RandomNumber<ui64>())
+ .EndStruct();
+ }
+ rows.EndList();
+ rows.Build();
+
+ auto params = builder.Build();
+
+ ExecDataQuery(query, params);
+}
+
TCommandWorkloadTransferTopicToTableInit::TCommandWorkloadTransferTopicToTableInit() :
TWorkloadCommand("init", {}, "Creates and initializes objects")
{
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h
index 1995406db4..417cb1c73f 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_init.h
@@ -17,7 +17,10 @@ private:
class TScenario : public TTopicOperationsScenario {
int DoRun(const TConfig& config) override;
- void CreateTable(const TString& table, ui32 partitionCount);
+ void CreateWriteOnlyTable(const TString& table, ui32 partitionCount);
+ void CreateReadOnlyTable(const TString& table, ui32 partitionCount);
+
+ void UpsertRandomKeyBlock();
};
TScenario Scenario;
diff --git a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp
index 891d4518c5..057c8471e1 100644
--- a/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp
+++ b/ydb/public/lib/ydb_cli/commands/transfer_workload/transfer_workload_topic_to_table_run.cpp
@@ -80,14 +80,24 @@ void TCommandWorkloadTransferTopicToTableRun::Config(TConfig& config)
.Optional()
.DefaultValue((TStringBuilder() << NTopic::ECodec::RAW))
.StoreMappedResultT<TString>(&Scenario.Codec, &TCommandWorkloadTopicParams::StrToCodec);
+
+ config.Opts->MutuallyExclusive("message-rate", "byte-rate");
+
config.Opts->AddLongOption("commit-period", "Waiting time between commit.")
.DefaultValue(10)
.StoreResult(&Scenario.CommitPeriod);
- config.Opts->AddLongOption("use-topic-commit", "Use TopicAPI commit.")
+ config.Opts->AddLongOption("commit-messages", "Number of messages per transaction")
+ .DefaultValue(1'000'000)
+ .StoreResult(&Scenario.CommitMessages);
+
+ config.Opts->AddLongOption("only-topic-in-tx", "Use only topic in transaction")
+ .DefaultValue(false)
+ .StoreTrue(&Scenario.OnlyTopicInTx);
+ config.Opts->AddLongOption("only-table-in-tx", "Use only table in transaction")
.DefaultValue(false)
- .StoreTrue(&Scenario.UseTopicApiCommit);
+ .StoreTrue(&Scenario.OnlyTableInTx);
- config.Opts->MutuallyExclusive("message-rate", "byte-rate");
+ config.Opts->MutuallyExclusive("only-topic-in-tx", "only-table-in-tx");
config.IsNetworkIntensive = true;
}