diff options
author | Evgeniy Ivanov <[email protected]> | 2025-07-23 17:38:59 +0200 |
---|---|---|
committer | GitHub <[email protected]> | 2025-07-23 18:38:59 +0300 |
commit | ed54f6b458f72dda750ba39a1cf61cc8f164006b (patch) | |
tree | 2f95b5a5899fbc09bfecddaed6e9e789985c57f7 | |
parent | 2dffb9996a88ae9f28c6c66db4ce9c5bc98f57a9 (diff) |
Tpcc checks improvements (#17333) (#21532)CLI_2.24.0
-rw-r--r-- | ydb/library/workload/tpcc/check.cpp | 160 |
1 files changed, 66 insertions, 94 deletions
diff --git a/ydb/library/workload/tpcc/check.cpp b/ydb/library/workload/tpcc/check.cpp index 49094fe7296..36c4fc719d5 100644 --- a/ydb/library/workload/tpcc/check.cpp +++ b/ydb/library/workload/tpcc/check.cpp @@ -21,6 +21,25 @@ using namespace NThreading; //----------------------------------------------------------------------------- +// waits all futures and returns either the one with exception, +// or void future. +TFuture<void> WaitAllAndCheck(const std::vector<TFuture<void>>& futures) { + auto result = WaitAll(futures).Apply([allFutures = std::move(futures)](const auto&) { + // return any with error + for (const auto& future: allFutures) { + if (future.HasException()) { + return future; + } + } + + return MakeFuture(); + }); + + return result; +} + +//----------------------------------------------------------------------------- + TFuture<void> BaseCheckWarehouseTable(TQueryClient& client, const TString& path, int expectedWhNumber) { // W_ID is PK, so we can take just min, max and count to check if all rows present TString query = std::format(R"( @@ -525,15 +544,18 @@ TFuture<void> BaseCheckHistoryTable(TQueryClient& client, const TString& path, i // based on checks in TPC-C for CockroachDB -TFuture<void> CheckNoRows(TQueryClient& client, const TString& query) { +TFuture<void> CheckNoRows(TQueryClient& client, const TString& query, const TString& description = {}) { return client.RetryQuery([query](TSession session) { return session.ExecuteQuery(query, TTxControl::NoTx()); - }).Apply([](const TFuture<TExecuteQueryResult>& future) { + }).Apply([description](const TFuture<TExecuteQueryResult>& future) { auto result = future.GetValueSync(); ThrowIfError(result, TStringBuilder() << "query failed"); TResultSetParser parser(result.GetResultSet(0)); if (parser.TryNextRow()) { + if (!description.empty()) { + ythrow yexception() << description; + } ythrow yexception(); } }); @@ -665,8 +687,8 @@ void TPCCChecker::CheckSync() { &TPCCChecker::ConsistencyCheck3325, &TPCCChecker::ConsistencyCheck3326, //&TPCCChecker::ConsistencyCheck3327, //mem - //&TPCCChecker::ConsistencyCheck3328, // possibly fails - //&TPCCChecker::ConsistencyCheck3329, // possibly fails (similar to 3328) + &TPCCChecker::ConsistencyCheck3328, + &TPCCChecker::ConsistencyCheck3329, //&TPCCChecker::ConsistencyCheck33210, // mem }; @@ -786,22 +808,14 @@ void TPCCChecker::ConsistencyCheck3324(TQueryClient& client) { )", Config.Path.c_str(), TABLE_OORDER, startWh, endWh, TABLE_ORDER_LINE, startWh, endWh); // because of #21490 we run queries 1 by one - auto future = CheckNoRows(client, query); + TStringStream ss; + ss << "w_id_from=" << startWh << ", w_id_to=" << endWh; + auto future = CheckNoRows(client, query, ss.Str()); future.Wait(); rangeFutures.push_back(future); } - auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) { - // return any with error - for (const auto& future: allFutures) { - if (future.HasException()) { - return future; - } - } - - return MakeFuture(); - }); - + auto result = WaitAllAndCheck(rangeFutures); RunningChecks.insert(RunningChecks.end(), { { result, "3.3.2.4" }, }); @@ -821,15 +835,16 @@ void TPCCChecker::ConsistencyCheck3325(TQueryClient& client) { $warehouse_to = {}; $missing_in_order = - SELECT no.NO_W_ID AS W_ID, no.NO_D_ID AS D_ID, no.NO_O_ID AS O_ID + SELECT no.NO_W_ID AS W_ID, no.NO_D_ID AS D_ID, no.NO_O_ID AS O_ID, o.O_W_ID as O_W_ID, o.O_CARRIER_ID as CID FROM `{}` AS no LEFT JOIN ( - SELECT * FROM `{}` + SELECT O_W_ID as O_W_ID, O_D_ID as O_D_ID, O_ID as O_ID, O_CARRIER_ID as O_CARRIER_ID FROM `{}` WHERE O_W_ID >= $warehouse_from AND O_W_ID <= $warehouse_to ) AS o ON no.NO_W_ID = o.O_W_ID AND no.NO_D_ID = o.O_D_ID AND no.NO_O_ID = o.O_ID WHERE no.NO_W_ID >= $warehouse_from AND no.NO_W_ID <= $warehouse_to - AND (o.O_W_ID IS NULL OR o.O_CARRIER_ID IS NOT NULL); + AND (o.O_W_ID IS NULL OR (o.O_CARRIER_ID IS NOT NULL AND o.O_CARRIER_ID != 0)) + LIMIT 1; $missing_in_new_order = SELECT o.O_W_ID AS W_ID, o.O_D_ID AS D_ID, o.O_ID AS O_ID @@ -842,7 +857,8 @@ void TPCCChecker::ConsistencyCheck3325(TQueryClient& client) { WHERE NO_W_ID >= $warehouse_from AND NO_W_ID <= $warehouse_to ) AS no ON o.O_W_ID = no.NO_W_ID AND o.O_D_ID = no.NO_D_ID AND o.O_ID = no.NO_O_ID - WHERE o.O_CARRIER_ID IS NULL AND no.NO_W_ID IS NULL; + WHERE (o.O_CARRIER_ID IS NULL OR o.O_CARRIER_ID = 0) AND no.NO_W_ID IS NULL + LIMIT 1; SELECT * FROM $missing_in_order @@ -855,27 +871,17 @@ void TPCCChecker::ConsistencyCheck3325(TQueryClient& client) { // because of #21490 we run queries 1 by one // also these queries consume a lot of memory, so probably 1 by one is better. - auto future = CheckNoRows(client, query); + TStringStream ss; + ss << "w_id_from=" << startWh << ", w_id_to=" << endWh; + auto future = CheckNoRows(client, query, ss.Str()); future.Wait(); rangeFutures.push_back(future); } - auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) { - // return any with error - for (const auto& future: allFutures) { - if (future.HasException()) { - return future; - } - } - - return MakeFuture(); - }); - + auto result = WaitAllAndCheck(rangeFutures); RunningChecks.insert(RunningChecks.end(), { { result, "3.3.2.5" }, }); - - WaitAll(rangeFutures).GetValueSync(); } void TPCCChecker::ConsistencyCheck3326(TQueryClient& client) { @@ -937,27 +943,18 @@ void TPCCChecker::ConsistencyCheck3326(TQueryClient& client) { )", Config.Path.c_str(), startWh, endWh, TABLE_ORDER_LINE, TABLE_OORDER, TABLE_OORDER ); - auto future = CheckNoRows(client, query); + // because of #21490 we run queries 1 by one + TStringStream ss; + ss << "w_id_from=" << startWh << ", w_id_to=" << endWh; + auto future = CheckNoRows(client, query, ss.Str()); future.Wait(); rangeFutures.push_back(future); } - auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) { - // return any with error - for (const auto& future: allFutures) { - if (future.HasException()) { - return future; - } - } - - return MakeFuture(); - }); - + auto result = WaitAllAndCheck(rangeFutures); RunningChecks.insert(RunningChecks.end(), { { result, "3.3.2.6" }, }); - - WaitAll(rangeFutures).GetValueSync(); } // TODO: rewrite (to much mem and materialization size) @@ -1001,27 +998,18 @@ void TPCCChecker::ConsistencyCheck3327(TQueryClient& client) { )", Config.Path.c_str(), startWh, endWh, TABLE_ORDER_LINE, TABLE_OORDER); - auto future = CheckNoRows(client, query); + // because of #21490 we run queries 1 by one + TStringStream ss; + ss << "w_id_from=" << startWh << ", w_id_to=" << endWh; + auto future = CheckNoRows(client, query, ss.Str()); future.Wait(); rangeFutures.push_back(future); } - auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) { - // return any with error - for (const auto& future: allFutures) { - if (future.HasException()) { - return future; - } - } - - return MakeFuture(); - }); - + auto result = WaitAllAndCheck(rangeFutures); RunningChecks.insert(RunningChecks.end(), { { result, "3.3.2.7" }, }); - - WaitAll(rangeFutures).GetValueSync(); } void TPCCChecker::ConsistencyCheck3328(TQueryClient& client) { @@ -1062,11 +1050,10 @@ void TPCCChecker::ConsistencyCheck3329(TQueryClient& client) { GROUP BY H_W_ID, H_D_ID; -- Join with district and compare D_YTD to summed H_AMOUNT - SELECT * SELECT h.W_ID, h.D_ID, h.SUM_H_AMOUNT, ABS(d.D_YTD - h.SUM_H_AMOUNT) as delta FROM `{}` as d JOIN $history_sums AS h - ON D_W_ID = h.W_ID AND D_ID = h.D_ID + ON d.D_W_ID = h.W_ID AND d.D_ID = h.D_ID WHERE ABS(d.D_YTD - h.SUM_H_AMOUNT) > 1e-3 LIMIT 1; )", Config.Path.c_str(), TABLE_HISTORY, TABLE_DISTRICT); @@ -1135,20 +1122,15 @@ void TPCCChecker::ConsistencyCheck33210(TQueryClient& client) { )", Config.Path.c_str(), startWh, endWh, TABLE_HISTORY, TABLE_OORDER, TABLE_ORDER_LINE, TABLE_CUSTOMER); - auto future = CheckNoRows(client, query); + // because of #21490 we run queries 1 by one + TStringStream ss; + ss << "w_id_from=" << startWh << ", w_id_to=" << endWh; + auto future = CheckNoRows(client, query, ss.Str()); future.Wait(); rangeFutures.push_back(future); } - auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) { - for (const auto& future : allFutures) { - if (future.HasException()) { - return future; - } - } - return MakeFuture(); - }); - + auto result = WaitAllAndCheck(rangeFutures); RunningChecks.push_back({ result, "3.3.2.10" }); } @@ -1185,20 +1167,15 @@ void TPCCChecker::ConsistencyCheck33211(TQueryClient& client) { )", Config.Path.c_str(), startWh, endWh, TABLE_OORDER, TABLE_NEW_ORDER); - auto future = CheckNoRows(client, query); + // because of #21490 we run queries 1 by one + TStringStream ss; + ss << "w_id_from=" << startWh << ", w_id_to=" << endWh; + auto future = CheckNoRows(client, query, ss.Str()); future.Wait(); rangeFutures.push_back(future); } - auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) { - for (const auto& future : allFutures) { - if (future.HasException()) { - return future; - } - } - return MakeFuture(); - }); - + auto result = WaitAllAndCheck(rangeFutures); RunningChecks.push_back({ result, "3.3.2.11" }); } @@ -1243,20 +1220,15 @@ void TPCCChecker::ConsistencyCheck33212(TQueryClient& client) { )", Config.Path.c_str(), startWh, endWh, TABLE_OORDER, TABLE_ORDER_LINE, TABLE_CUSTOMER); - auto future = CheckNoRows(client, query); + // because of #21490 we run queries 1 by one + TStringStream ss; + ss << "w_id_from=" << startWh << ", w_id_to=" << endWh; + auto future = CheckNoRows(client, query, ss.Str()); future.Wait(); rangeFutures.push_back(future); } - auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) { - for (const auto& future : allFutures) { - if (future.HasException()) { - return future; - } - } - return MakeFuture(); - }); - + auto result = WaitAllAndCheck(rangeFutures); RunningChecks.push_back({ result, "3.3.2.12" }); } |