diff options
author | Evgeniy Ivanov <eivanov89@ydb.tech> | 2025-06-02 14:06:47 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-02 15:06:47 +0300 |
commit | a435e8cf166dfaaa493ebd81987713728e16a079 (patch) | |
tree | 2e3629297ddadebc6296d6108d6bbf3cf7e9f889 | |
parent | 79bb7f425a2992ac13c9df91654fe7582064caea (diff) | |
download | ydb-a435e8cf166dfaaa493ebd81987713728e16a079.tar.gz |
TPC-C WIP #17333 (#18986)
-rw-r--r-- | ydb/library/workload/tpcc/histogram.cpp | 10 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/histogram.h | 1 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/runner.cpp | 30 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/runner.h | 4 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/terminal.cpp | 33 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/terminal.h | 27 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/transaction_delivery.cpp | 19 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/transaction_neworder.cpp | 18 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/transaction_orderstatus.cpp | 19 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/transaction_payment.cpp | 19 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/transaction_simulation.cpp | 109 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/transaction_stocklevel.cpp | 19 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/transactions.h | 12 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/ut/histogram_ut.cpp | 12 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/ya.make | 1 |
15 files changed, 286 insertions, 47 deletions
diff --git a/ydb/library/workload/tpcc/histogram.cpp b/ydb/library/workload/tpcc/histogram.cpp index 8c45fdadda2..cb1e9caed6a 100644 --- a/ydb/library/workload/tpcc/histogram.cpp +++ b/ydb/library/workload/tpcc/histogram.cpp @@ -9,6 +9,7 @@ THistogram::THistogram(uint64_t hdrTill, uint64_t maxValue) : HdrTill_(hdrTill) , MaxValue_(maxValue) , TotalCount_(0) + , MaxRecordedValue_(0) { if (hdrTill == 0 || maxValue == 0 || hdrTill > maxValue) { throw std::invalid_argument("Invalid histogram parameters"); @@ -26,6 +27,7 @@ void THistogram::RecordValue(uint64_t value) { Buckets_[bucketIndex]++; TotalCount_++; + MaxRecordedValue_ = std::max(MaxRecordedValue_, value); } void THistogram::Add(const THistogram& other) { @@ -37,6 +39,7 @@ void THistogram::Add(const THistogram& other) { Buckets_[i] += other.Buckets_[i]; } TotalCount_ += other.TotalCount_; + MaxRecordedValue_ = std::max(MaxRecordedValue_, other.MaxRecordedValue_); } void THistogram::Sub(const THistogram& other) { @@ -48,6 +51,8 @@ void THistogram::Sub(const THistogram& other) { Buckets_[i] -= other.Buckets_[i]; } TotalCount_ -= other.TotalCount_; + // Note: We can't update MaxRecordedValue_ in Sub() as we don't know the actual max value + // after subtraction. We'll keep the current MaxRecordedValue_. } uint64_t THistogram::GetValueAtPercentile(double percentile) const { @@ -76,6 +81,7 @@ uint64_t THistogram::GetValueAtPercentile(double percentile) const { void THistogram::Reset() { std::fill(Buckets_.begin(), Buckets_.end(), 0); TotalCount_ = 0; + MaxRecordedValue_ = 0; } size_t THistogram::GetBucketIndex(uint64_t value) const { @@ -108,9 +114,9 @@ uint64_t THistogram::GetBucketUpperBound(size_t bucketIndex) const { // Linear buckets: bucket 0 -> [0,1), bucket 1 -> [1,2), etc. return static_cast<uint64_t>(bucketIndex + 1); } else { - // Last bucket extends to infinity + // Last bucket extends to max recorded value if (bucketIndex == Buckets_.size() - 1) { - return std::numeric_limits<uint64_t>::max(); + return MaxRecordedValue_ > 0 ? MaxRecordedValue_ : std::numeric_limits<uint64_t>::max(); } // Exponential buckets diff --git a/ydb/library/workload/tpcc/histogram.h b/ydb/library/workload/tpcc/histogram.h index 9423bbb13f0..a3309fe8bb5 100644 --- a/ydb/library/workload/tpcc/histogram.h +++ b/ydb/library/workload/tpcc/histogram.h @@ -30,6 +30,7 @@ private: uint64_t MaxValue_; std::vector<uint64_t> Buckets_; uint64_t TotalCount_; + uint64_t MaxRecordedValue_; // Track the maximum recorded value }; } // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/runner.cpp b/ydb/library/workload/tpcc/runner.cpp index 4bfc2c3f7af..6458e862da4 100644 --- a/ydb/library/workload/tpcc/runner.cpp +++ b/ydb/library/workload/tpcc/runner.cpp @@ -235,6 +235,8 @@ TPCCRunner::TPCCRunner(const NConsoleClient::TClientCommand::TConfig& connection clients[i % drivers.size()], Config.Path, Config.NoSleep, + Config.SimulateTransactionMs, + Config.SimulateTransactionSelect1Count, TerminalsStopSource.get_token(), StopWarmup, StatsVec[i % threadCount], @@ -452,6 +454,8 @@ void TPCCRunner::DumpFinalStats() { size_t totalFailed = 0; size_t totalUserAborted = 0; + size_t tableWidth = Config.Developer ? 95 : 65; + // Print header std::cout << "\nTransaction Statistics:\n"; std::cout << "----------------------\n"; @@ -459,9 +463,15 @@ void TPCCRunner::DumpFinalStats() { << std::setw(10) << "OK" << std::setw(10) << "Failed" << std::setw(15) << "User Aborted" - << std::setw(20) << "Latency p90 (ms)" - << std::endl; - std::cout << std::string(65, '-') << std::endl; + << std::setw(10) << "p90 (ms)"; + + if (Config.Developer) { + std::cout << std::setw(15) << "terminal p90 (ms)" + << std::setw(15) << "pure p90 (ms)"; + } + + std::cout << std::endl; + std::cout << std::string(tableWidth, '-') << std::endl; size_t totalNewOrders = 0; @@ -483,18 +493,24 @@ void TPCCRunner::DumpFinalStats() { << std::setw(10) << txStats.OK << std::setw(10) << txStats.Failed << std::setw(15) << txStats.UserAborted - << std::setw(15) << txStats.LatencyHistogramMs.GetValueAtPercentile(90) - << std::endl; + << std::setw(10) << txStats.LatencyHistogramFullMs.GetValueAtPercentile(90); + + if (Config.Developer) { + std::cout << std::setw(15) << txStats.LatencyHistogramMs.GetValueAtPercentile(90) + << std::setw(15) << txStats.LatencyHistogramPure.GetValueAtPercentile(90); + } + + std::cout << std::endl; } // Print totals - std::cout << std::string(65, '-') << std::endl; + std::cout << std::string(tableWidth, '-') << std::endl; std::cout << std::setw(15) << "TOTAL" << std::setw(10) << totalOK << std::setw(10) << totalFailed << std::setw(15) << totalUserAborted << std::endl; - std::cout << std::string(65, '-') << std::endl; + std::cout << std::string(tableWidth, '-') << std::endl; if (minutesPassed >= 1) { size_t tpmC = size_t(totalNewOrders / minutesPassed); diff --git a/ydb/library/workload/tpcc/runner.h b/ydb/library/workload/tpcc/runner.h index 5d0630e6256..670ae5ead65 100644 --- a/ydb/library/workload/tpcc/runner.h +++ b/ydb/library/workload/tpcc/runner.h @@ -24,6 +24,10 @@ struct TRunConfig { ELogPriority LogPriority = ELogPriority::TLOG_INFO; bool NoSleep = false; bool Developer = false; + + // instead of actual transaction just async sleep and return SUCCESS + int SimulateTransactionMs = 0; + int SimulateTransactionSelect1Count = 0; }; void RunSync(const NConsoleClient::TClientCommand::TConfig& connectionConfig, const TRunConfig& runConfig); diff --git a/ydb/library/workload/tpcc/terminal.cpp b/ydb/library/workload/tpcc/terminal.cpp index 0190dc1ccea..ffacf5269fb 100644 --- a/ydb/library/workload/tpcc/terminal.cpp +++ b/ydb/library/workload/tpcc/terminal.cpp @@ -17,7 +17,7 @@ namespace NYdb::NTPCC { namespace { struct TTerminalTransaction { - using TTaskFunc = NThreading::TFuture<TStatus> (*)(TTransactionContext&, NQuery::TSession); + using TTaskFunc = NThreading::TFuture<TStatus> (*)(TTransactionContext&, TDuration&, NQuery::TSession); TString Name; double Weight; @@ -65,12 +65,14 @@ TTerminal::TTerminal(size_t terminalID, std::shared_ptr<NQuery::TQueryClient>& client, const TString& path, bool noSleep, + int simulateTransactionMs, + int simulateTransactionSelect1Count, std::stop_token stopToken, std::atomic<bool>& stopWarmup, std::shared_ptr<TTerminalStats>& stats, std::shared_ptr<TLog>& log) : TaskQueue(taskQueue) - , Context(terminalID, warehouseID, warehouseCount, TaskQueue, client, path, log) + , Context(terminalID, warehouseID, warehouseCount, TaskQueue, simulateTransactionMs, simulateTransactionSelect1Count, client, path, log) , NoSleep(noSleep) , StopToken(stopToken) , StopWarmup(stopWarmup) @@ -110,6 +112,7 @@ TTerminalTask TTerminal::Run() { } } + auto startTime = std::chrono::steady_clock::now(); co_await TTaskHasInflight(TaskQueue, Context.TerminalID); if (StopToken.stop_requested()) { TaskQueue.DecInflight(); @@ -119,23 +122,31 @@ TTerminalTask TTerminal::Run() { LOG_T("Terminal " << Context.TerminalID << " starting " << transaction.Name << " transaction"); size_t execCount = 0; - auto startTime = std::chrono::steady_clock::now(); + auto startTimeTransaction = std::chrono::steady_clock::now(); + TDuration latencyPure; // the block helps to ensure, that session is destroyed before we sleep right after the block { - auto future = Context.Client->RetryQuery([this, &transaction, &execCount](TSession session) mutable { - auto& Log = Context.Log; - LOG_T("Terminal " << Context.TerminalID << " started RetryQuery for " << transaction.Name); - ++execCount; - return transaction.TaskFunc(Context, session); - }); + bool real = Context.SimulateTransactionMs == 0 && Context.SimulateTransactionSelect1 == 0; + auto future = Context.Client->RetryQuery( + [this, real, &transaction, &execCount, &latencyPure](TSession session) mutable { + auto& Log = Context.Log; + LOG_T("Terminal " << Context.TerminalID << " started RetryQuery for " << transaction.Name); + ++execCount; + if (real) { + return transaction.TaskFunc(Context, latencyPure, session); + } else { + return GetSimulationTask(Context, latencyPure, session); + } + }); auto result = co_await TSuspendWithFuture(future, Context.TaskQueue, Context.TerminalID); auto endTime = std::chrono::steady_clock::now(); - auto latency = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime); + auto latencyFull = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime); + auto latencyTransaction = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTimeTransaction); if (result.IsSuccess()) { - Stats->AddOK(static_cast<TTerminalStats::ETransactionType>(txIndex), latency); + Stats->AddOK(static_cast<TTerminalStats::ETransactionType>(txIndex), latencyTransaction, latencyFull, latencyPure); LOG_T("Terminal " << Context.TerminalID << " " << transaction.Name << " transaction finished in " << execCount << " execution(s): " << result.GetStatus()); } else { diff --git a/ydb/library/workload/tpcc/terminal.h b/ydb/library/workload/tpcc/terminal.h index b343e452225..7457130791b 100644 --- a/ydb/library/workload/tpcc/terminal.h +++ b/ydb/library/workload/tpcc/terminal.h @@ -40,6 +40,8 @@ public: TGuard guard(HistLock); dst.LatencyHistogramMs.Add(LatencyHistogramMs); + dst.LatencyHistogramFullMs.Add(LatencyHistogramMs); + dst.LatencyHistogramPure.Add(LatencyHistogramMs); } void Clear() { @@ -48,14 +50,28 @@ public: UserAborted.store(0, std::memory_order_relaxed); TGuard guard(HistLock); LatencyHistogramMs.Reset(); + LatencyHistogramFullMs.Reset(); + LatencyHistogramPure.Reset(); } std::atomic<size_t> OK = 0; std::atomic<size_t> Failed = 0; std::atomic<size_t> UserAborted = 0; + // histograms protected by lock + mutable TSpinLock HistLock; + + // Transaction latency observed by the terminal, i.e., includes session acquisition + // and retries performed by the SDK THistogram LatencyHistogramMs{256, 32768}; + + // As LatencyHistogramMs plus inflight wait time in the terminal + THistogram LatencyHistogramFullMs{256, 32768}; + + // Latency of a successful transaction measured directly in the transaction code, + // when there is nothing to wait for except the queries + THistogram LatencyHistogramPure{256, 32768}; }; public: @@ -65,12 +81,19 @@ public: return Stats[type]; } - void AddOK(ETransactionType type, std::chrono::milliseconds latency) { + void AddOK( + ETransactionType type, + std::chrono::milliseconds latency, + std::chrono::milliseconds latencyFull, + TDuration latencyPure) + { auto& stats = Stats[type]; stats.OK.fetch_add(1, std::memory_order_relaxed); { TGuard guard(stats.HistLock); stats.LatencyHistogramMs.RecordValue(latency.count()); + stats.LatencyHistogramFullMs.RecordValue(latencyFull.count()); + stats.LatencyHistogramPure.RecordValue(latencyPure.MilliSeconds()); } } @@ -122,6 +145,8 @@ public: std::shared_ptr<NQuery::TQueryClient>& client, const TString& path, bool noSleep, + int simulateTransactionMs, + int simulateTransactionSelect1Count, std::stop_token stopToken, std::atomic<bool>& stopWarmup, std::shared_ptr<TTerminalStats>& stats, diff --git a/ydb/library/workload/tpcc/transaction_delivery.cpp b/ydb/library/workload/tpcc/transaction_delivery.cpp index 8bfe94096bf..ebf5ed99d89 100644 --- a/ydb/library/workload/tpcc/transaction_delivery.cpp +++ b/ydb/library/workload/tpcc/transaction_delivery.cpp @@ -1,11 +1,13 @@ #include "transactions.h" -#include <util/string/printf.h> - #include "constants.h" #include "log.h" #include "util.h" +#include <library/cpp/time_provider/monotonic.h> + +#include <util/string/printf.h> + #include <format> #include <string> #include <vector> @@ -325,9 +327,13 @@ TAsyncExecuteQueryResult UpdateCustomerBalanceAndDeliveryCount( //----------------------------------------------------------------------------- -NThreading::TFuture<TStatus> GetDeliveryTask(TTransactionContext& context, +NThreading::TFuture<TStatus> GetDeliveryTask( + TTransactionContext& context, + TDuration& latency, TSession session) { + TMonotonic startTs = TMonotonic::Now(); + TTransactionInflightGuard guard; co_await TTaskReady(context.TaskQueue, context.TerminalID); @@ -509,7 +515,12 @@ NThreading::TFuture<TStatus> GetDeliveryTask(TTransactionContext& context, << " is committing Delivery transaction, processed " << processedOrderCount << " districts"); auto commitFuture = tx->Commit(); - co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + + TMonotonic endTs = TMonotonic::Now(); + latency = endTs - startTs; + + co_return commitResult; } } // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/transaction_neworder.cpp b/ydb/library/workload/tpcc/transaction_neworder.cpp index 2c24425cf08..703d748fd2a 100644 --- a/ydb/library/workload/tpcc/transaction_neworder.cpp +++ b/ydb/library/workload/tpcc/transaction_neworder.cpp @@ -1,12 +1,14 @@ #include "transactions.h" -#include <util/generic/singleton.h> -#include <util/string/printf.h> - #include "constants.h" #include "log.h" #include "util.h" +#include <library/cpp/time_provider/monotonic.h> + +#include <util/generic/singleton.h> +#include <util/string/printf.h> + #include <format> #include <unordered_map> @@ -530,8 +532,11 @@ TString GetDistInfo(int districtID, const Stock& stock) { NThreading::TFuture<TStatus> GetNewOrderTask( TTransactionContext& context, + TDuration& latency, TSession session) { + TMonotonic startTs = TMonotonic::Now(); + TTransactionInflightGuard guard; co_await TTaskReady(context.TaskQueue, context.TerminalID); @@ -832,7 +837,12 @@ NThreading::TFuture<TStatus> GetNewOrderTask( LOG_T("Terminal " << context.TerminalID << " is committing NewOrder transaction"); auto commitFuture = tx.Commit(); - co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + + TMonotonic endTs = TMonotonic::Now(); + latency = endTs - startTs; + + co_return commitResult; } } // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/transaction_orderstatus.cpp b/ydb/library/workload/tpcc/transaction_orderstatus.cpp index 6acd765480d..072c8dfc31a 100644 --- a/ydb/library/workload/tpcc/transaction_orderstatus.cpp +++ b/ydb/library/workload/tpcc/transaction_orderstatus.cpp @@ -1,12 +1,14 @@ #include "transactions.h" -#include <util/string/printf.h> - #include "common_queries.h" #include "constants.h" #include "log.h" #include "util.h" +#include <library/cpp/time_provider/monotonic.h> + +#include <util/string/printf.h> + #include <format> #include <string> @@ -107,9 +109,13 @@ TAsyncExecuteQueryResult GetOrderLines( //----------------------------------------------------------------------------- -NThreading::TFuture<TStatus> GetOrderStatusTask(TTransactionContext& context, +NThreading::TFuture<TStatus> GetOrderStatusTask( + TTransactionContext& context, + TDuration& latency, TSession session) { + TMonotonic startTs = TMonotonic::Now(); + TTransactionInflightGuard guard; co_await TTaskReady(context.TaskQueue, context.TerminalID); @@ -218,7 +224,12 @@ NThreading::TFuture<TStatus> GetOrderStatusTask(TTransactionContext& context, << ", lines " << orderLinesResult.GetResultSet(0).RowsCount()); auto commitFuture = tx->Commit(); - co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + + TMonotonic endTs = TMonotonic::Now(); + latency = endTs - startTs; + + co_return commitResult; } } // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/transaction_payment.cpp b/ydb/library/workload/tpcc/transaction_payment.cpp index a5c181a1a97..8465b1eba99 100644 --- a/ydb/library/workload/tpcc/transaction_payment.cpp +++ b/ydb/library/workload/tpcc/transaction_payment.cpp @@ -1,12 +1,14 @@ #include "transactions.h" -#include <util/string/printf.h> - #include "common_queries.h" #include "constants.h" #include "log.h" #include "util.h" +#include <library/cpp/time_provider/monotonic.h> + +#include <util/string/printf.h> + #include <format> #include <string> @@ -260,9 +262,13 @@ TAsyncExecuteQueryResult InsertHistoryRecord( //----------------------------------------------------------------------------- -NThreading::TFuture<TStatus> GetPaymentTask(TTransactionContext& context, +NThreading::TFuture<TStatus> GetPaymentTask( + TTransactionContext& context, + TDuration& latency, TSession session) { + TMonotonic startTs = TMonotonic::Now(); + TTransactionInflightGuard guard; co_await TTaskReady(context.TaskQueue, context.TerminalID); @@ -486,7 +492,12 @@ NThreading::TFuture<TStatus> GetPaymentTask(TTransactionContext& context, << "customer " << customer.c_id << ", amount " << paymentAmount); auto commitFuture = tx.Commit(); - co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + + TMonotonic endTs = TMonotonic::Now(); + latency = endTs - startTs; + + co_return commitResult; } } // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/transaction_simulation.cpp b/ydb/library/workload/tpcc/transaction_simulation.cpp new file mode 100644 index 00000000000..502996a4b4f --- /dev/null +++ b/ydb/library/workload/tpcc/transaction_simulation.cpp @@ -0,0 +1,109 @@ +#include "transactions.h" + +#include "constants.h" +#include "log.h" +#include "util.h" + +#include <library/cpp/time_provider/monotonic.h> + +#include <util/generic/singleton.h> +#include <util/string/printf.h> + +#include <format> +#include <unordered_map> + +namespace NYdb::NTPCC { + +namespace { + +//----------------------------------------------------------------------------- + +using namespace NYdb::NQuery; + +//----------------------------------------------------------------------------- + +NYdb::NQuery::TAsyncExecuteQueryResult Select1( + NQuery::TSession& session, + const std::optional<NYdb::NQuery::TTransaction>& tx, + TTransactionContext& context) +{ + auto& Log = context.Log; + static std::string query = std::format(R"( + PRAGMA TablePathPrefix("{}"); + + DECLARE $count AS Int32; + + SELECT $count; + )", context.Path.c_str()); + + auto params = TParamsBuilder() + .AddParam("$count").Int32(1).Build() + .Build(); + + auto txControl = tx ? TTxControl::Tx(*tx) : TTxControl::BeginTx(TTxSettings::SerializableRW()); + auto result = session.ExecuteQuery( + query, + txControl, + std::move(params)); + + LOG_T("Terminal " << context.TerminalID << " waiting for select1"); + return result; +} + +} // anonymous + +//----------------------------------------------------------------------------- + +NThreading::TFuture<TStatus> GetSimulationTask( + TTransactionContext& context, + TDuration& latency, + NQuery::TSession session) +{ + TMonotonic startTs = TMonotonic::Now(); + + TTransactionInflightGuard guard; + co_await TTaskReady(context.TaskQueue, context.TerminalID); + + auto& Log = context.Log; + + LOG_T("Terminal " << context.TerminalID << " started simulated transaction"); + + // just to test if we have problems with generator (we don't) + for (size_t i = 0; i < 10; ++i) { + RandomNumber(DISTRICT_LOW_ID, DISTRICT_HIGH_ID); + } + + if (context.SimulateTransactionMs != 0) { + std::chrono::milliseconds delay(context.SimulateTransactionMs); + co_await TSuspend(context.TaskQueue, context.TerminalID, delay); + TMonotonic endTs = TMonotonic::Now(); + latency = endTs - startTs; + co_return TStatus(EStatus::SUCCESS, NIssue::TIssues()); + } + + // sleep 1 sumulation + + std::optional<TTransaction> tx; + for (int i = 0; i < context.SimulateTransactionSelect1; ++i) { + auto future = Select1(session, tx, context); + auto result = co_await TSuspendWithFuture(future, context.TaskQueue, context.TerminalID); + if (!result.IsSuccess()) { + co_return result; + } + + if (!tx) { + tx = *result.GetTransaction(); + LOG_T("Terminal " << context.TerminalID << " simulated transaction txId " << tx->GetId()); + } + } + + auto commitFuture = tx->Commit(); + auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + + TMonotonic endTs = TMonotonic::Now(); + latency = endTs - startTs; + + co_return commitResult; +} + +} // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/transaction_stocklevel.cpp b/ydb/library/workload/tpcc/transaction_stocklevel.cpp index 6ab1f83d7c9..ecedbaca80c 100644 --- a/ydb/library/workload/tpcc/transaction_stocklevel.cpp +++ b/ydb/library/workload/tpcc/transaction_stocklevel.cpp @@ -1,11 +1,13 @@ #include "transactions.h" -#include <util/string/printf.h> - #include "constants.h" #include "log.h" #include "util.h" +#include <library/cpp/time_provider/monotonic.h> + +#include <util/string/printf.h> + #include <format> #include <string> @@ -98,9 +100,13 @@ TAsyncExecuteQueryResult GetStockCount( //----------------------------------------------------------------------------- -NThreading::TFuture<TStatus> GetStockLevelTask(TTransactionContext& context, +NThreading::TFuture<TStatus> GetStockLevelTask( + TTransactionContext& context, + TDuration& latency, TSession session) { + TMonotonic startTs = TMonotonic::Now(); + TTransactionInflightGuard guard; co_await TTaskReady(context.TaskQueue, context.TerminalID); @@ -152,7 +158,12 @@ NThreading::TFuture<TStatus> GetStockLevelTask(TTransactionContext& context, LOG_T("Terminal " << context.TerminalID << " is committing StockLevel transaction"); auto commitFuture = tx.Commit(); - co_return co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); + + TMonotonic endTs = TMonotonic::Now(); + latency = endTs - startTs; + + co_return commitResult; } } // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/transactions.h b/ydb/library/workload/tpcc/transactions.h index 1d3ad8e0201..7b5625d64d5 100644 --- a/ydb/library/workload/tpcc/transactions.h +++ b/ydb/library/workload/tpcc/transactions.h @@ -33,6 +33,8 @@ struct TTransactionContext { size_t WarehouseID; size_t WarehouseCount; ITaskQueue& TaskQueue; + int SimulateTransactionMs; + int SimulateTransactionSelect1; std::shared_ptr<NQuery::TQueryClient> Client; const TString Path; std::shared_ptr<TLog> Log; @@ -45,22 +47,32 @@ struct TUserAbortedException : public yexception { NThreading::TFuture<TStatus> GetNewOrderTask( TTransactionContext& context, + TDuration& latency, NQuery::TSession session); NThreading::TFuture<TStatus> GetDeliveryTask( TTransactionContext& context, + TDuration& latency, NQuery::TSession session); NThreading::TFuture<TStatus> GetOrderStatusTask( TTransactionContext& context, + TDuration& latency, NQuery::TSession session); NThreading::TFuture<TStatus> GetPaymentTask( TTransactionContext& context, + TDuration& latency, NQuery::TSession session); NThreading::TFuture<TStatus> GetStockLevelTask( TTransactionContext& context, + TDuration& latency, + NQuery::TSession session); + +NThreading::TFuture<TStatus> GetSimulationTask( + TTransactionContext& context, + TDuration& latency, NQuery::TSession session); } // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/ut/histogram_ut.cpp b/ydb/library/workload/tpcc/ut/histogram_ut.cpp index a50baefe163..914ccea52ea 100644 --- a/ydb/library/workload/tpcc/ut/histogram_ut.cpp +++ b/ydb/library/workload/tpcc/ut/histogram_ut.cpp @@ -52,7 +52,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) { hist.RecordValue(100); hist.RecordValue(1000); - UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); + UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 1000); // Should return max recorded value } Y_UNIT_TEST(ShouldHandleZeroValues) { @@ -165,7 +165,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) { UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(44.4), 4); // ~44% of 9 values = 4th value (3) UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(55.5), 8); // ~55% of 9 values = 5th value (4) UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(77.7), 16); // ~77% of 9 values = 7th value (8) - UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); // 100% of 9 values = 9th value (20) + UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 20); // 100% of 9 values = 9th value (20) } Y_UNIT_TEST(ShouldHandleLargeHistogram) { @@ -190,7 +190,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) { UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(50.0), 68); // 50% of 135 = 67.5, so 68th value (67), bucket 67, upper bound 68 UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(95.0), 256); // 95% of 135 = 128.25, so 129th value (200), bucket 128, upper bound 256 UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(99.0), 8192); // 99% of 135 = 133.65, so 134th value (6000), bucket 133, upper bound 8192 - UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); // Last value (10000) + UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 10000); // Last value (10000) } Y_UNIT_TEST(ShouldHandleBucketBoundaries) { @@ -211,7 +211,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) { UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(57.1), 4); // ~57% of 7 = 4th value (3), bucket 3, upper bound 4 UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(71.4), 8); // ~71% of 7 = 5th value (4), bucket 4, upper bound 8 UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(85.7), 16); // ~86% of 7 = 6th value (8), bucket 5, upper bound 16 - UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); // 7th value (16) + UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 16); // 7th value (16) } Y_UNIT_TEST(ShouldHandleVerySmallPercentiles) { @@ -254,7 +254,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) { hist.RecordValue(17); // Above maxValue, should go to [16,∞) UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(33.3), 16); // 33% of 3 = 1st value (15) - UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); // Last bucket + UNIT_ASSERT_VALUES_EQUAL(hist.GetValueAtPercentile(100.0), 17); // Last value (17) } Y_UNIT_TEST(ShouldHandleRepeatedValues) { @@ -399,7 +399,7 @@ Y_UNIT_TEST_SUITE(THistogramTest) { // Test corner case: exactly at maxValue THistogram hist2(128, 8192); hist2.RecordValue(8192); // Should go to overflow bucket - UNIT_ASSERT_VALUES_EQUAL(hist2.GetValueAtPercentile(100.0), std::numeric_limits<uint64_t>::max()); + UNIT_ASSERT_VALUES_EQUAL(hist2.GetValueAtPercentile(100.0), 8192); // Test edge case: just below maxValue THistogram hist3(128, 8192); diff --git a/ydb/library/workload/tpcc/ya.make b/ydb/library/workload/tpcc/ya.make index 842452e1bee..d10a3aee77d 100644 --- a/ydb/library/workload/tpcc/ya.make +++ b/ydb/library/workload/tpcc/ya.make @@ -10,6 +10,7 @@ SRCS( transaction_neworder.cpp transaction_orderstatus.cpp transaction_payment.cpp + transaction_simulation.cpp transaction_stocklevel.cpp ) |