aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <eivanov89@ydb.tech>2025-06-02 14:06:47 +0200
committerGitHub <noreply@github.com>2025-06-02 15:06:47 +0300
commita435e8cf166dfaaa493ebd81987713728e16a079 (patch)
tree2e3629297ddadebc6296d6108d6bbf3cf7e9f889
parent79bb7f425a2992ac13c9df91654fe7582064caea (diff)
downloadydb-a435e8cf166dfaaa493ebd81987713728e16a079.tar.gz
TPC-C WIP #17333 (#18986)
-rw-r--r--ydb/library/workload/tpcc/histogram.cpp10
-rw-r--r--ydb/library/workload/tpcc/histogram.h1
-rw-r--r--ydb/library/workload/tpcc/runner.cpp30
-rw-r--r--ydb/library/workload/tpcc/runner.h4
-rw-r--r--ydb/library/workload/tpcc/terminal.cpp33
-rw-r--r--ydb/library/workload/tpcc/terminal.h27
-rw-r--r--ydb/library/workload/tpcc/transaction_delivery.cpp19
-rw-r--r--ydb/library/workload/tpcc/transaction_neworder.cpp18
-rw-r--r--ydb/library/workload/tpcc/transaction_orderstatus.cpp19
-rw-r--r--ydb/library/workload/tpcc/transaction_payment.cpp19
-rw-r--r--ydb/library/workload/tpcc/transaction_simulation.cpp109
-rw-r--r--ydb/library/workload/tpcc/transaction_stocklevel.cpp19
-rw-r--r--ydb/library/workload/tpcc/transactions.h12
-rw-r--r--ydb/library/workload/tpcc/ut/histogram_ut.cpp12
-rw-r--r--ydb/library/workload/tpcc/ya.make1
15 files changed, 286 insertions, 47 deletions
diff --git a/ydb/library/workload/tpcc/histogram.cpp b/ydb/library/workload/tpcc/histogram.cpp
index 8c45fdadda2..cb1e9caed6a 100644
--- a/ydb/library/workload/tpcc/histogram.cpp
+++ b/ydb/library/workload/tpcc/histogram.cpp
@@ -9,6 +9,7 @@ THistogram::THistogram(uint64_t hdrTill, uint64_t maxValue)
: HdrTill_(hdrTill)
, MaxValue_(maxValue)
, TotalCount_(0)
+ , MaxRecordedValue_(0)
{
if (hdrTill == 0 || maxValue == 0 || hdrTill > maxValue) {
throw std::invalid_argument("Invalid histogram parameters");
@@ -26,6 +27,7 @@ void THistogram::RecordValue(uint64_t value) {
Buckets_[bucketIndex]++;
TotalCount_++;
+ MaxRecordedValue_ = std::max(MaxRecordedValue_, value);
}
void THistogram::Add(const THistogram& other) {
@@ -37,6 +39,7 @@ void THistogram::Add(const THistogram& other) {
Buckets_[i] += other.Buckets_[i];
}
TotalCount_ += other.TotalCount_;
+ MaxRecordedValue_ = std::max(MaxRecordedValue_, other.MaxRecordedValue_);
}
void THistogram::Sub(const THistogram& other) {
@@ -48,6 +51,8 @@ void THistogram::Sub(const THistogram& other) {
Buckets_[i] -= other.Buckets_[i];
}
TotalCount_ -= other.TotalCount_;
+ // Note: We can't update MaxRecordedValue_ in Sub() as we don't know the actual max value
+ // after subtraction. We'll keep the current MaxRecordedValue_.
}
uint64_t THistogram::GetValueAtPercentile(double percentile) const {
@@ -76,6 +81,7 @@ uint64_t THistogram::GetValueAtPercentile(double percentile) const {
void THistogram::Reset() {
std::fill(Buckets_.begin(), Buckets_.end(), 0);
TotalCount_ = 0;
+ MaxRecordedValue_ = 0;
}
size_t THistogram::GetBucketIndex(uint64_t value) const {
@@ -108,9 +114,9 @@ uint64_t THistogram::GetBucketUpperBound(size_t bucketIndex) const {
// Linear buckets: bucket 0 -> [0,1), bucket 1 -> [1,2), etc.
return static_cast<uint64_t>(bucketIndex + 1);
} else {
- // Last bucket extends to infinity
+ // Last bucket extends to max recorded value
if (bucketIndex == Buckets_.size() - 1) {
- return std::numeric_limits<uint64_t>::max();
+ return MaxRecordedValue_ > 0 ? MaxRecordedValue_ : std::numeric_limits<uint64_t>::max();
}
// Exponential buckets
diff --git a/ydb/library/workload/tpcc/histogram.h b/ydb/library/workload/tpcc/histogram.h
index 9423bbb13f0..a3309fe8bb5 100644
--- a/ydb/library/workload/tpcc/histogram.h
+++ b/ydb/library/workload/tpcc/histogram.h
@@ -30,6 +30,7 @@ private:
uint64_t MaxValue_;
std::vector<uint64_t> Buckets_;
uint64_t TotalCount_;
+ uint64_t MaxRecordedValue_; // Track the maximum recorded value
};
} // namespace NYdb::NTPCC
diff --git a/ydb/library/workload/tpcc/runner.cpp b/ydb/library/workload/tpcc/runner.cpp
index 4bfc2c3f7af..6458e862da4 100644
--- a/ydb/library/workload/tpcc/runner.cpp
+++ b/ydb/library/workload/tpcc/runner.cpp
@@ -235,6 +235,8 @@ TPCCRunner::TPCCRunner(const NConsoleClient::TClientCommand::TConfig& connection
clients[i % drivers.size()],
Config.Path,
Config.NoSleep,
+ Config.SimulateTransactionMs,
+ Config.SimulateTransactionSelect1Count,
TerminalsStopSource.get_token(),
StopWarmup,
StatsVec[i % threadCount],
@@ -452,6 +454,8 @@ void TPCCRunner::DumpFinalStats() {
size_t totalFailed = 0;
size_t totalUserAborted = 0;
+ size_t tableWidth = Config.Developer ? 95 : 65;
+
// Print header
std::cout << "\nTransaction Statistics:\n";
std::cout << "----------------------\n";
@@ -459,9 +463,15 @@ void TPCCRunner::DumpFinalStats() {
<< std::setw(10) << "OK"
<< std::setw(10) << "Failed"
<< std::setw(15) << "User Aborted"
- << std::setw(20) << "Latency p90 (ms)"
- << std::endl;
- std::cout << std::string(65, '-') << std::endl;
+ << std::setw(10) << "p90 (ms)";
+
+ if (Config.Developer) {
+ std::cout << std::setw(15) << "terminal p90 (ms)"
+ << std::setw(15) << "pure p90 (ms)";
+ }
+
+ std::cout << std::endl;
+ std::cout << std::string(tableWidth, '-') << std::endl;
size_t totalNewOrders = 0;
@@ -483,18 +493,24 @@ void TPCCRunner::DumpFinalStats() {
<< std::setw(10) << txStats.OK
<< std::setw(10) << txStats.Failed
<< std::setw(15) << txStats.UserAborted
- << std::setw(15) << txStats.LatencyHistogramMs.GetValueAtPercentile(90)
- << std::endl;
+ << std::setw(10) << txStats.LatencyHistogramFullMs.GetValueAtPercentile(90);
+
+ if (Config.Developer) {
+ std::cout << std::setw(15) << txStats.LatencyHistogramMs.GetValueAtPercentile(90)
+ << std::setw(15) << txStats.LatencyHistogramPure.GetValueAtPercentile(90);
+ }
+
+ std::cout << std::endl;
}
// Print totals
- std::cout << std::string(65, '-') << std::endl;
+ std::cout << std::string(tableWidth, '-') << std::endl;
std::cout << std::setw(15) << "TOTAL"
<< std::setw(10) << totalOK
<< std::setw(10) << totalFailed
<< std::setw(15) << totalUserAborted
<< std::endl;
- std::cout << std::string(65, '-') << std::endl;
+ std::cout << std::string(tableWidth, '-') << std::endl;
if (minutesPassed >= 1) {
size_t tpmC = size_t(totalNewOrders / minutesPassed);
diff --git a/ydb/library/workload/tpcc/runner.h b/ydb/library/workload/tpcc/runner.h
index 5d0630e6256..670ae5ead65 100644
--- a/ydb/library/workload/tpcc/runner.h
+++ b/ydb/library/workload/tpcc/runner.h
@@ -24,6 +24,10 @@ struct TRunConfig {
ELogPriority LogPriority = ELogPriority::TLOG_INFO;
bool NoSleep = false;
bool Developer = false;
+
+ // instead of actual transaction just async sleep and return SUCCESS
+ int SimulateTransactionMs = 0;
+ int SimulateTransactionSelect1Count = 0;
};
void RunSync(const NConsoleClient::TClientCommand::TConfig& connectionConfig, const TRunConfig& runConfig);
diff --git a/ydb/library/workload/tpcc/terminal.cpp b/ydb/library/workload/tpcc/terminal.cpp
index 0190dc1ccea..ffacf5269fb 100644
--- a/ydb/library/workload/tpcc/terminal.cpp
+++ b/ydb/library/workload/tpcc/terminal.cpp
@@ -17,7 +17,7 @@ namespace NYdb::NTPCC {
namespace {
struct TTerminalTransaction {
- using TTaskFunc = NThreading::TFuture<TStatus> (*)(TTransactionContext&, NQuery::TSession);
+ using TTaskFunc = NThreading::TFuture<TStatus> (*)(TTransactionContext&, TDuration&, NQuery::TSession);
TString Name;
double Weight;
@@ -65,12 +65,14 @@ TTerminal::TTerminal(size_t terminalID,
std::shared_ptr<NQuery::TQueryClient>& client,
const TString& path,
bool noSleep,
+ int simulateTransactionMs,
+ int simulateTransactionSelect1Count,
std::stop_token stopToken,
std::atomic<bool>& stopWarmup,
std::shared_ptr<TTerminalStats>& stats,
std::shared_ptr<TLog>& log)
: TaskQueue(taskQueue)
- , Context(terminalID, warehouseID, warehouseCount, TaskQueue, client, path, log)
+ , Context(terminalID, warehouseID, warehouseCount, TaskQueue, simulateTransactionMs, simulateTransactionSelect1Count, client, path, log)
, NoSleep(noSleep)
, StopToken(stopToken)
, StopWarmup(stopWarmup)
@@ -110,6 +112,7 @@ TTerminalTask TTerminal::Run() {
}
}
+ auto startTime = std::chrono::steady_clock::now();
co_await TTaskHasInflight(TaskQueue, Context.TerminalID);
if (StopToken.stop_requested()) {
TaskQueue.DecInflight();
@@ -119,23 +122,31 @@ TTerminalTask TTerminal::Run() {
LOG_T("Terminal " << Context.TerminalID << " starting " << transaction.Name << " transaction");
size_t execCount = 0;
- auto startTime = std::chrono::steady_clock::now();
+ auto startTimeTransaction = std::chrono::steady_clock::now();
+ TDuration latencyPure;
// the block helps to ensure, that session is destroyed before we sleep right after the block
{
- auto future = Context.Client->RetryQuery([this, &transaction, &execCount](TSession session) mutable {
- auto& Log = Context.Log;
- LOG_T("Terminal " << Context.TerminalID << " started RetryQuery for " << transaction.Name);
- ++execCount;
- return transaction.TaskFunc(Context, session);
- });
+ bool real = Context.SimulateTransactionMs == 0 && Context.SimulateTransactionSelect1 == 0;
+ auto future = Context.Client->RetryQuery(
+ [this, real, &transaction, &execCount, &latencyPure](TSession session) mutable {
+ auto& Log = Context.Log;
+ LOG_T("Terminal " << Context.TerminalID << " started RetryQuery for " << transaction.Name);
+ ++execCount;
+ if (real) {
+ return transaction.TaskFunc(Context, latencyPure, session);
+ } else {
+ return GetSimulationTask(Context, latencyPure, session);
+ }
+ });
auto result = co_await TSuspendWithFuture(future, Context.TaskQueue, Context.TerminalID);
auto endTime = std::chrono::steady_clock::now();
- auto latency = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime);
+ auto latencyFull = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime);
+ auto latencyTransaction = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTimeTransaction);
if (result.IsSuccess()) {
- Stats->AddOK(static_cast<TTerminalStats::ETransactionType>(txIndex), latency);
+ Stats->AddOK(static_cast<TTerminalStats::ETransactionType>(txIndex), latencyTransaction, latencyFull, latencyPure);
LOG_T("Terminal " << Context.TerminalID << " " << transaction.Name << " transaction finished in "
<< execCount << " execution(s): " << result.GetStatus());
} else {
diff --git a/ydb/library/workload/tpcc/terminal.h b/ydb/library/workload/tpcc/terminal.h
index b343e452225..7457130791b 100644
--- a/ydb/library/workload/tpcc/terminal.h
+++ b/ydb/library/workload/tpcc/terminal.h
@@ -40,6 +40,8 @@ public:
TGuard guard(HistLock);
dst.LatencyHistogramMs.Add(LatencyHistogramMs);
+ dst.LatencyHistogramFullMs.Add(LatencyHistogramMs);
+ dst.LatencyHistogramPure.Add(LatencyHistogramMs);
}
void Clear() {
@@ -48,14 +50,28 @@ public:
UserAborted.store(0, std::memory_order_relaxed);
TGuard guard(HistLock);
LatencyHistogramMs.Reset();
+ LatencyHistogramFullMs.Reset();
+ LatencyHistogramPure.Reset();
}
std::atomic<size_t> OK = 0;
std::atomic<size_t> Failed = 0;
std::atomic<size_t> UserAborted = 0;
+ // histograms protected by lock
+
mutable TSpinLock HistLock;
+
+ // Transaction latency observed by the terminal, i.e., includes session acquisition
+ // and retries performed by the SDK
THistogram LatencyHistogramMs{256, 32768};
+
+ // As LatencyHistogramMs plus inflight wait time in the terminal
+ THistogram LatencyHistogramFullMs{256, 32768};
+
+ // Latency of a successful transaction measured directly in the transaction code,
+ // when there is nothing to wait for except the queries
+ THistogram LatencyHistogramPure{256, 32768};
};
public:
@@ -65,12 +81,19 @@ public:
return Stats[type];
}
- void AddOK(ETransactionType type, std::chrono::milliseconds latency) {
+ void AddOK(
+ ETransactionType type,
+ std::chrono::milliseconds latency,
+ std::chrono::milliseconds latencyFull,
+ TDuration latencyPure)
+ {
auto& stats = Stats[type];
stats.OK.fetch_add(1, std::memory_order_relaxed);
{
TGuard guard(stats.HistLock);
stats.LatencyHistogramMs.RecordValue(latency.count());
+ stats.LatencyHistogramFullMs.RecordValue(latencyFull.count());
+ stats.LatencyHistogramPure.RecordValue(latencyPure.MilliSeconds());
}
}
@@ -122,6 +145,8 @@ public:
std::shared_ptr<NQuery::TQueryClient>& client,
const TString& path,
bool noSleep,
+ int simulateTransactionMs,
+ int simulateTransactionSelect1Count,
std::stop_token stopToken,
std::atomic<bool>& stopWarmup,
std::shared_ptr<TTerminalStats>& stats,
diff --git a/ydb/library/workload/tpcc/transaction_delivery.cpp b/ydb/library/workload/tpcc/transaction_delivery.cpp
index 8bfe94096bf..ebf5ed99d89 100644
--- a/ydb/library/workload/tpcc/transaction_delivery.cpp
+++ b/ydb/library/workload/tpcc/transaction_delivery.cpp
@@ -1,11 +1,13 @@
#include "transactions.h"
-#include <util/string/printf.h>
-
#include "constants.h"
#include "log.h"
#include "util.h"
+#include <library/cpp/time_provider/monotonic.h>
+
+#include <util/string/printf.h>
+
#include <format>
#include <string>
#include <vector>
@@ -325,9 +327,13 @@ TAsyncExecuteQueryResult UpdateCustomerBalanceAndDeliveryCount(
//-----------------------------------------------------------------------------
-NThreading::TFuture<TStatus> GetDeliveryTask(TTransactionContext& context,
+NThreading::TFuture<TStatus> GetDeliveryTask(
+ TTransactionContext& context,
+ TDuration& latency,
TSession session)
{
+ TMonotonic startTs = TMonotonic::Now();
+
TTransactionInflightGuard guard;
co_await TTaskReady(context.TaskQueue, context.TerminalID);
@@ -509,7 +515,12 @@ NThreading::TFuture<TStatus> GetDeliveryTask(TTransactionContext& context,
<< " is committing Delivery transaction, processed " << processedOrderCount << " districts");
auto commitFuture = tx->Commit();
- co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+ auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+
+ TMonotonic endTs = TMonotonic::Now();
+ latency = endTs - startTs;
+
+ co_return commitResult;
}
} // namespace NYdb::NTPCC
diff --git a/ydb/library/workload/tpcc/transaction_neworder.cpp b/ydb/library/workload/tpcc/transaction_neworder.cpp
index 2c24425cf08..703d748fd2a 100644
--- a/ydb/library/workload/tpcc/transaction_neworder.cpp
+++ b/ydb/library/workload/tpcc/transaction_neworder.cpp
@@ -1,12 +1,14 @@
#include "transactions.h"
-#include <util/generic/singleton.h>
-#include <util/string/printf.h>
-
#include "constants.h"
#include "log.h"
#include "util.h"
+#include <library/cpp/time_provider/monotonic.h>
+
+#include <util/generic/singleton.h>
+#include <util/string/printf.h>
+
#include <format>
#include <unordered_map>
@@ -530,8 +532,11 @@ TString GetDistInfo(int districtID, const Stock& stock) {
NThreading::TFuture<TStatus> GetNewOrderTask(
TTransactionContext& context,
+ TDuration& latency,
TSession session)
{
+ TMonotonic startTs = TMonotonic::Now();
+
TTransactionInflightGuard guard;
co_await TTaskReady(context.TaskQueue, context.TerminalID);
@@ -832,7 +837,12 @@ NThreading::TFuture<TStatus> GetNewOrderTask(
LOG_T("Terminal " << context.TerminalID << " is committing NewOrder transaction");
auto commitFuture = tx.Commit();
- co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+ auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+
+ TMonotonic endTs = TMonotonic::Now();
+ latency = endTs - startTs;
+
+ co_return commitResult;
}
} // namespace NYdb::NTPCC
diff --git a/ydb/library/workload/tpcc/transaction_orderstatus.cpp b/ydb/library/workload/tpcc/transaction_orderstatus.cpp
index 6acd765480d..072c8dfc31a 100644
--- a/ydb/library/workload/tpcc/transaction_orderstatus.cpp
+++ b/ydb/library/workload/tpcc/transaction_orderstatus.cpp
@@ -1,12 +1,14 @@
#include "transactions.h"
-#include <util/string/printf.h>
-
#include "common_queries.h"
#include "constants.h"
#include "log.h"
#include "util.h"
+#include <library/cpp/time_provider/monotonic.h>
+
+#include <util/string/printf.h>
+
#include <format>
#include <string>
@@ -107,9 +109,13 @@ TAsyncExecuteQueryResult GetOrderLines(
//-----------------------------------------------------------------------------
-NThreading::TFuture<TStatus> GetOrderStatusTask(TTransactionContext& context,
+NThreading::TFuture<TStatus> GetOrderStatusTask(
+ TTransactionContext& context,
+ TDuration& latency,
TSession session)
{
+ TMonotonic startTs = TMonotonic::Now();
+
TTransactionInflightGuard guard;
co_await TTaskReady(context.TaskQueue, context.TerminalID);
@@ -218,7 +224,12 @@ NThreading::TFuture<TStatus> GetOrderStatusTask(TTransactionContext& context,
<< ", lines " << orderLinesResult.GetResultSet(0).RowsCount());
auto commitFuture = tx->Commit();
- co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+ auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+
+ TMonotonic endTs = TMonotonic::Now();
+ latency = endTs - startTs;
+
+ co_return commitResult;
}
} // namespace NYdb::NTPCC
diff --git a/ydb/library/workload/tpcc/transaction_payment.cpp b/ydb/library/workload/tpcc/transaction_payment.cpp
index a5c181a1a97..8465b1eba99 100644
--- a/ydb/library/workload/tpcc/transaction_payment.cpp
+++ b/ydb/library/workload/tpcc/transaction_payment.cpp
@@ -1,12 +1,14 @@
#include "transactions.h"
-#include <util/string/printf.h>
-
#include "common_queries.h"
#include "constants.h"
#include "log.h"
#include "util.h"
+#include <library/cpp/time_provider/monotonic.h>
+
+#include <util/string/printf.h>
+
#include <format>
#include <string>
@@ -260,9 +262,13 @@ TAsyncExecuteQueryResult InsertHistoryRecord(
//-----------------------------------------------------------------------------
-NThreading::TFuture<TStatus> GetPaymentTask(TTransactionContext& context,
+NThreading::TFuture<TStatus> GetPaymentTask(
+ TTransactionContext& context,
+ TDuration& latency,
TSession session)
{
+ TMonotonic startTs = TMonotonic::Now();
+
TTransactionInflightGuard guard;
co_await TTaskReady(context.TaskQueue, context.TerminalID);
@@ -486,7 +492,12 @@ NThreading::TFuture<TStatus> GetPaymentTask(TTransactionContext& context,
<< "customer " << customer.c_id << ", amount " << paymentAmount);
auto commitFuture = tx.Commit();
- co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+ auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+
+ TMonotonic endTs = TMonotonic::Now();
+ latency = endTs - startTs;
+
+ co_return commitResult;
}
} // namespace NYdb::NTPCC
diff --git a/ydb/library/workload/tpcc/transaction_simulation.cpp b/ydb/library/workload/tpcc/transaction_simulation.cpp
new file mode 100644
index 00000000000..502996a4b4f
--- /dev/null
+++ b/ydb/library/workload/tpcc/transaction_simulation.cpp
@@ -0,0 +1,109 @@
+#include "transactions.h"
+
+#include "constants.h"
+#include "log.h"
+#include "util.h"
+
+#include <library/cpp/time_provider/monotonic.h>
+
+#include <util/generic/singleton.h>
+#include <util/string/printf.h>
+
+#include <format>
+#include <unordered_map>
+
+namespace NYdb::NTPCC {
+
+namespace {
+
+//-----------------------------------------------------------------------------
+
+using namespace NYdb::NQuery;
+
+//-----------------------------------------------------------------------------
+
+NYdb::NQuery::TAsyncExecuteQueryResult Select1(
+ NQuery::TSession& session,
+ const std::optional<NYdb::NQuery::TTransaction>& tx,
+ TTransactionContext& context)
+{
+ auto& Log = context.Log;
+ static std::string query = std::format(R"(
+ PRAGMA TablePathPrefix("{}");
+
+ DECLARE $count AS Int32;
+
+ SELECT $count;
+ )", context.Path.c_str());
+
+ auto params = TParamsBuilder()
+ .AddParam("$count").Int32(1).Build()
+ .Build();
+
+ auto txControl = tx ? TTxControl::Tx(*tx) : TTxControl::BeginTx(TTxSettings::SerializableRW());
+ auto result = session.ExecuteQuery(
+ query,
+ txControl,
+ std::move(params));
+
+ LOG_T("Terminal " << context.TerminalID << " waiting for select1");
+ return result;
+}
+
+} // anonymous
+
+//-----------------------------------------------------------------------------
+
+NThreading::TFuture<TStatus> GetSimulationTask(
+ TTransactionContext& context,
+ TDuration& latency,
+ NQuery::TSession session)
+{
+ TMonotonic startTs = TMonotonic::Now();
+
+ TTransactionInflightGuard guard;
+ co_await TTaskReady(context.TaskQueue, context.TerminalID);
+
+ auto& Log = context.Log;
+
+ LOG_T("Terminal " << context.TerminalID << " started simulated transaction");
+
+ // just to test if we have problems with generator (we don't)
+ for (size_t i = 0; i < 10; ++i) {
+ RandomNumber(DISTRICT_LOW_ID, DISTRICT_HIGH_ID);
+ }
+
+ if (context.SimulateTransactionMs != 0) {
+ std::chrono::milliseconds delay(context.SimulateTransactionMs);
+ co_await TSuspend(context.TaskQueue, context.TerminalID, delay);
+ TMonotonic endTs = TMonotonic::Now();
+ latency = endTs - startTs;
+ co_return TStatus(EStatus::SUCCESS, NIssue::TIssues());
+ }
+
+ // sleep 1 sumulation
+
+ std::optional<TTransaction> tx;
+ for (int i = 0; i < context.SimulateTransactionSelect1; ++i) {
+ auto future = Select1(session, tx, context);
+ auto result = co_await TSuspendWithFuture(future, context.TaskQueue, context.TerminalID);
+ if (!result.IsSuccess()) {
+ co_return result;
+ }
+
+ if (!tx) {
+ tx = *result.GetTransaction();
+ LOG_T("Terminal " << context.TerminalID << " simulated transaction txId " << tx->GetId());
+ }
+ }
+
+ auto commitFuture = tx->Commit();
+ auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+
+ TMonotonic endTs = TMonotonic::Now();
+ latency = endTs - startTs;
+
+ co_return commitResult;
+}
+
+} // namespace NYdb::NTPCC
diff --git a/ydb/library/workload/tpcc/transaction_stocklevel.cpp b/ydb/library/workload/tpcc/transaction_stocklevel.cpp
index 6ab1f83d7c9..ecedbaca80c 100644
--- a/ydb/library/workload/tpcc/transaction_stocklevel.cpp
+++ b/ydb/library/workload/tpcc/transaction_stocklevel.cpp
@@ -1,11 +1,13 @@
#include "transactions.h"
-#include <util/string/printf.h>
-
#include "constants.h"
#include "log.h"
#include "util.h"
+#include <library/cpp/time_provider/monotonic.h>
+
+#include <util/string/printf.h>
+
#include <format>
#include <string>
@@ -98,9 +100,13 @@ TAsyncExecuteQueryResult GetStockCount(
//-----------------------------------------------------------------------------
-NThreading::TFuture<TStatus> GetStockLevelTask(TTransactionContext& context,
+NThreading::TFuture<TStatus> GetStockLevelTask(
+ TTransactionContext& context,
+ TDuration& latency,
TSession session)
{
+ TMonotonic startTs = TMonotonic::Now();
+
TTransactionInflightGuard guard;
co_await TTaskReady(context.TaskQueue, context.TerminalID);
@@ -152,7 +158,12 @@ NThreading::TFuture<TStatus> GetStockLevelTask(TTransactionContext& context,
LOG_T("Terminal " << context.TerminalID << " is committing StockLevel transaction");
auto commitFuture = tx.Commit();
- co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+ auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
+
+ TMonotonic endTs = TMonotonic::Now();
+ latency = endTs - startTs;
+
+ co_return commitResult;
}
} // namespace NYdb::NTPCC
diff --git a/ydb/library/workload/tpcc/transactions.h b/ydb/library/workload/tpcc/transactions.h
index 1d3ad8e0201..7b5625d64d5 100644
--- a/ydb/library/workload/tpcc/transactions.h
+++ b/ydb/library/workload/tpcc/transactions.h
@@ -33,6 +33,8 @@ struct TTransactionContext {
size_t WarehouseID;
size_t WarehouseCount;
ITaskQueue& TaskQueue;
+ int SimulateTransactionMs;
+ int SimulateTransactionSelect1;
std::shared_ptr<NQuery::TQueryClient> Client;
const TString Path;
std::shared_ptr<TLog> Log;
@@ -45,22 +47,32 @@ struct TUserAbortedException : public yexception {
NThreading::TFuture<TStatus> GetNewOrderTask(
TTransactionContext& context,
+ TDuration& latency,
NQuery::TSession session);
NThreading::TFuture<TStatus> GetDeliveryTask(
TTransactionContext& context,
+ TDuration& latency,
NQuery::TSession session);
NThreading::TFuture<TStatus> GetOrderStatusTask(
TTransactionContext& context,
+ TDuration& latency,
NQuery::TSession session);
NThreading::TFuture<TStatus> GetPaymentTask(
TTransactionContext& context,
+ TDuration& latency,
NQuery::TSession session);
NThreading::TFuture<TStatus> GetStockLevelTask(
TTransactionContext& context,
+ TDuration& latency,
+ NQuery::TSession session);
+
+NThreading::TFuture<TStatus> GetSimulationTask(
+ TTransactionContext& context,
+ TDuration& latency,
NQuery::TSession session);
} // namespace NYdb::NTPCC
diff --git a/ydb/library/workload/tpcc/ut/histogram_ut.cpp b/ydb/library/workload/tpcc/ut/histogram_ut.cpp
index a50baefe163..914ccea52ea 100644
--- a/ydb/library/workload/tpcc/ut/histogram_ut.cpp
+++ b/ydb/library/workload/tpcc/ut/histogram_ut.cpp
@@ -52,7 +52,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) {
hist.RecordValue(100);
hist.RecordValue(1000);
- UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max());
+ UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 1000); // Should return max recorded value
}
Y_UNIT_TEST(ShouldHandleZeroValues) {
@@ -165,7 +165,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) {
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(44.4), 4); // ~44% of 9 values = 4th value (3)
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(55.5), 8); // ~55% of 9 values = 5th value (4)
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(77.7), 16); // ~77% of 9 values = 7th value (8)
- UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); // 100% of 9 values = 9th value (20)
+ UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 20); // 100% of 9 values = 9th value (20)
}
Y_UNIT_TEST(ShouldHandleLargeHistogram) {
@@ -190,7 +190,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) {
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(50.0), 68); // 50% of 135 = 67.5, so 68th value (67), bucket 67, upper bound 68
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(95.0), 256); // 95% of 135 = 128.25, so 129th value (200), bucket 128, upper bound 256
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(99.0), 8192); // 99% of 135 = 133.65, so 134th value (6000), bucket 133, upper bound 8192
- UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); // Last value (10000)
+ UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 10000); // Last value (10000)
}
Y_UNIT_TEST(ShouldHandleBucketBoundaries) {
@@ -211,7 +211,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) {
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(57.1), 4); // ~57% of 7 = 4th value (3), bucket 3, upper bound 4
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(71.4), 8); // ~71% of 7 = 5th value (4), bucket 4, upper bound 8
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(85.7), 16); // ~86% of 7 = 6th value (8), bucket 5, upper bound 16
- UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); // 7th value (16)
+ UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 16); // 7th value (16)
}
Y_UNIT_TEST(ShouldHandleVerySmallPercentiles) {
@@ -254,7 +254,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) {
hist.RecordValue(17); // Above maxValue, should go to [16,∞)
UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(33.3), 16); // 33% of 3 = 1st value (15)
- UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); // Last bucket
+ UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 17); // Last value (17)
}
Y_UNIT_TEST(ShouldHandleRepeatedValues) {
@@ -399,7 +399,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) {
// Test corner case: exactly at maxValue
THistogram hist2(128, 8192);
hist2.RecordValue(8192); // Should go to overflow bucket
- UNIT_ASSERT_VALUES_EQUAL(hist2.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max());
+ UNIT_ASSERT_VALUES_EQUAL(hist2.GetValueAtPercentile(100.0), 8192);
// Test edge case: just below maxValue
THistogram hist3(128, 8192);
diff --git a/ydb/library/workload/tpcc/ya.make b/ydb/library/workload/tpcc/ya.make
index 842452e1bee..d10a3aee77d 100644
--- a/ydb/library/workload/tpcc/ya.make
+++ b/ydb/library/workload/tpcc/ya.make
@@ -10,6 +10,7 @@ SRCS(
transaction_neworder.cpp
transaction_orderstatus.cpp
transaction_payment.cpp
+ transaction_simulation.cpp
transaction_stocklevel.cpp
)