summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <[email protected]>2025-07-23 17:38:59 +0200
committerGitHub <[email protected]>2025-07-23 18:38:59 +0300
commited54f6b458f72dda750ba39a1cf61cc8f164006b (patch)
tree2f95b5a5899fbc09bfecddaed6e9e789985c57f7
parent2dffb9996a88ae9f28c6c66db4ce9c5bc98f57a9 (diff)
Tpcc checks improvements (#17333) (#21532)CLI_2.24.0
-rw-r--r--ydb/library/workload/tpcc/check.cpp160
1 files changed, 66 insertions, 94 deletions
diff --git a/ydb/library/workload/tpcc/check.cpp b/ydb/library/workload/tpcc/check.cpp
index 49094fe7296..36c4fc719d5 100644
--- a/ydb/library/workload/tpcc/check.cpp
+++ b/ydb/library/workload/tpcc/check.cpp
@@ -21,6 +21,25 @@ using namespace NThreading;
//-----------------------------------------------------------------------------
+// waits all futures and returns either the one with exception,
+// or void future.
+TFuture<void> WaitAllAndCheck(const std::vector<TFuture<void>>& futures) {
+ auto result = WaitAll(futures).Apply([allFutures = std::move(futures)](const auto&) {
+ // return any with error
+ for (const auto& future: allFutures) {
+ if (future.HasException()) {
+ return future;
+ }
+ }
+
+ return MakeFuture();
+ });
+
+ return result;
+}
+
+//-----------------------------------------------------------------------------
+
TFuture<void> BaseCheckWarehouseTable(TQueryClient& client, const TString& path, int expectedWhNumber) {
// W_ID is PK, so we can take just min, max and count to check if all rows present
TString query = std::format(R"(
@@ -525,15 +544,18 @@ TFuture<void> BaseCheckHistoryTable(TQueryClient& client, const TString& path, i
// based on checks in TPC-C for CockroachDB
-TFuture<void> CheckNoRows(TQueryClient& client, const TString& query) {
+TFuture<void> CheckNoRows(TQueryClient& client, const TString& query, const TString& description = {}) {
return client.RetryQuery([query](TSession session) {
return session.ExecuteQuery(query, TTxControl::NoTx());
- }).Apply([](const TFuture<TExecuteQueryResult>& future) {
+ }).Apply([description](const TFuture<TExecuteQueryResult>& future) {
auto result = future.GetValueSync();
ThrowIfError(result, TStringBuilder() << "query failed");
TResultSetParser parser(result.GetResultSet(0));
if (parser.TryNextRow()) {
+ if (!description.empty()) {
+ ythrow yexception() << description;
+ }
ythrow yexception();
}
});
@@ -665,8 +687,8 @@ void TPCCChecker::CheckSync() {
&TPCCChecker::ConsistencyCheck3325,
&TPCCChecker::ConsistencyCheck3326,
//&TPCCChecker::ConsistencyCheck3327, //mem
- //&TPCCChecker::ConsistencyCheck3328, // possibly fails
- //&TPCCChecker::ConsistencyCheck3329, // possibly fails (similar to 3328)
+ &TPCCChecker::ConsistencyCheck3328,
+ &TPCCChecker::ConsistencyCheck3329,
//&TPCCChecker::ConsistencyCheck33210, // mem
};
@@ -786,22 +808,14 @@ void TPCCChecker::ConsistencyCheck3324(TQueryClient& client) {
)", Config.Path.c_str(), TABLE_OORDER, startWh, endWh, TABLE_ORDER_LINE, startWh, endWh);
// because of #21490 we run queries 1 by one
- auto future = CheckNoRows(client, query);
+ TStringStream ss;
+ ss << "w_id_from=" << startWh << ", w_id_to=" << endWh;
+ auto future = CheckNoRows(client, query, ss.Str());
future.Wait();
rangeFutures.push_back(future);
}
- auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) {
- // return any with error
- for (const auto& future: allFutures) {
- if (future.HasException()) {
- return future;
- }
- }
-
- return MakeFuture();
- });
-
+ auto result = WaitAllAndCheck(rangeFutures);
RunningChecks.insert(RunningChecks.end(), {
{ result, "3.3.2.4" },
});
@@ -821,15 +835,16 @@ void TPCCChecker::ConsistencyCheck3325(TQueryClient& client) {
$warehouse_to = {};
$missing_in_order =
- SELECT no.NO_W_ID AS W_ID, no.NO_D_ID AS D_ID, no.NO_O_ID AS O_ID
+ SELECT no.NO_W_ID AS W_ID, no.NO_D_ID AS D_ID, no.NO_O_ID AS O_ID, o.O_W_ID as O_W_ID, o.O_CARRIER_ID as CID
FROM `{}` AS no
LEFT JOIN (
- SELECT * FROM `{}`
+ SELECT O_W_ID as O_W_ID, O_D_ID as O_D_ID, O_ID as O_ID, O_CARRIER_ID as O_CARRIER_ID FROM `{}`
WHERE O_W_ID >= $warehouse_from AND O_W_ID <= $warehouse_to
) AS o
ON no.NO_W_ID = o.O_W_ID AND no.NO_D_ID = o.O_D_ID AND no.NO_O_ID = o.O_ID
WHERE no.NO_W_ID >= $warehouse_from AND no.NO_W_ID <= $warehouse_to
- AND (o.O_W_ID IS NULL OR o.O_CARRIER_ID IS NOT NULL);
+ AND (o.O_W_ID IS NULL OR (o.O_CARRIER_ID IS NOT NULL AND o.O_CARRIER_ID != 0))
+ LIMIT 1;
$missing_in_new_order =
SELECT o.O_W_ID AS W_ID, o.O_D_ID AS D_ID, o.O_ID AS O_ID
@@ -842,7 +857,8 @@ void TPCCChecker::ConsistencyCheck3325(TQueryClient& client) {
WHERE NO_W_ID >= $warehouse_from AND NO_W_ID <= $warehouse_to
) AS no
ON o.O_W_ID = no.NO_W_ID AND o.O_D_ID = no.NO_D_ID AND o.O_ID = no.NO_O_ID
- WHERE o.O_CARRIER_ID IS NULL AND no.NO_W_ID IS NULL;
+ WHERE (o.O_CARRIER_ID IS NULL OR o.O_CARRIER_ID = 0) AND no.NO_W_ID IS NULL
+ LIMIT 1;
SELECT *
FROM $missing_in_order
@@ -855,27 +871,17 @@ void TPCCChecker::ConsistencyCheck3325(TQueryClient& client) {
// because of #21490 we run queries 1 by one
// also these queries consume a lot of memory, so probably 1 by one is better.
- auto future = CheckNoRows(client, query);
+ TStringStream ss;
+ ss << "w_id_from=" << startWh << ", w_id_to=" << endWh;
+ auto future = CheckNoRows(client, query, ss.Str());
future.Wait();
rangeFutures.push_back(future);
}
- auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) {
- // return any with error
- for (const auto& future: allFutures) {
- if (future.HasException()) {
- return future;
- }
- }
-
- return MakeFuture();
- });
-
+ auto result = WaitAllAndCheck(rangeFutures);
RunningChecks.insert(RunningChecks.end(), {
{ result, "3.3.2.5" },
});
-
- WaitAll(rangeFutures).GetValueSync();
}
void TPCCChecker::ConsistencyCheck3326(TQueryClient& client) {
@@ -937,27 +943,18 @@ void TPCCChecker::ConsistencyCheck3326(TQueryClient& client) {
)", Config.Path.c_str(), startWh, endWh, TABLE_ORDER_LINE, TABLE_OORDER, TABLE_OORDER
);
- auto future = CheckNoRows(client, query);
+ // because of #21490 we run queries 1 by one
+ TStringStream ss;
+ ss << "w_id_from=" << startWh << ", w_id_to=" << endWh;
+ auto future = CheckNoRows(client, query, ss.Str());
future.Wait();
rangeFutures.push_back(future);
}
- auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) {
- // return any with error
- for (const auto& future: allFutures) {
- if (future.HasException()) {
- return future;
- }
- }
-
- return MakeFuture();
- });
-
+ auto result = WaitAllAndCheck(rangeFutures);
RunningChecks.insert(RunningChecks.end(), {
{ result, "3.3.2.6" },
});
-
- WaitAll(rangeFutures).GetValueSync();
}
// TODO: rewrite (to much mem and materialization size)
@@ -1001,27 +998,18 @@ void TPCCChecker::ConsistencyCheck3327(TQueryClient& client) {
)", Config.Path.c_str(), startWh, endWh,
TABLE_ORDER_LINE, TABLE_OORDER);
- auto future = CheckNoRows(client, query);
+ // because of #21490 we run queries 1 by one
+ TStringStream ss;
+ ss << "w_id_from=" << startWh << ", w_id_to=" << endWh;
+ auto future = CheckNoRows(client, query, ss.Str());
future.Wait();
rangeFutures.push_back(future);
}
- auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) {
- // return any with error
- for (const auto& future: allFutures) {
- if (future.HasException()) {
- return future;
- }
- }
-
- return MakeFuture();
- });
-
+ auto result = WaitAllAndCheck(rangeFutures);
RunningChecks.insert(RunningChecks.end(), {
{ result, "3.3.2.7" },
});
-
- WaitAll(rangeFutures).GetValueSync();
}
void TPCCChecker::ConsistencyCheck3328(TQueryClient& client) {
@@ -1062,11 +1050,10 @@ void TPCCChecker::ConsistencyCheck3329(TQueryClient& client) {
GROUP BY H_W_ID, H_D_ID;
-- Join with district and compare D_YTD to summed H_AMOUNT
- SELECT *
SELECT h.W_ID, h.D_ID, h.SUM_H_AMOUNT, ABS(d.D_YTD - h.SUM_H_AMOUNT) as delta
FROM `{}` as d
JOIN $history_sums AS h
- ON D_W_ID = h.W_ID AND D_ID = h.D_ID
+ ON d.D_W_ID = h.W_ID AND d.D_ID = h.D_ID
WHERE ABS(d.D_YTD - h.SUM_H_AMOUNT) > 1e-3
LIMIT 1;
)", Config.Path.c_str(), TABLE_HISTORY, TABLE_DISTRICT);
@@ -1135,20 +1122,15 @@ void TPCCChecker::ConsistencyCheck33210(TQueryClient& client) {
)", Config.Path.c_str(), startWh, endWh,
TABLE_HISTORY, TABLE_OORDER, TABLE_ORDER_LINE, TABLE_CUSTOMER);
- auto future = CheckNoRows(client, query);
+ // because of #21490 we run queries 1 by one
+ TStringStream ss;
+ ss << "w_id_from=" << startWh << ", w_id_to=" << endWh;
+ auto future = CheckNoRows(client, query, ss.Str());
future.Wait();
rangeFutures.push_back(future);
}
- auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) {
- for (const auto& future : allFutures) {
- if (future.HasException()) {
- return future;
- }
- }
- return MakeFuture();
- });
-
+ auto result = WaitAllAndCheck(rangeFutures);
RunningChecks.push_back({ result, "3.3.2.10" });
}
@@ -1185,20 +1167,15 @@ void TPCCChecker::ConsistencyCheck33211(TQueryClient& client) {
)", Config.Path.c_str(), startWh, endWh,
TABLE_OORDER, TABLE_NEW_ORDER);
- auto future = CheckNoRows(client, query);
+ // because of #21490 we run queries 1 by one
+ TStringStream ss;
+ ss << "w_id_from=" << startWh << ", w_id_to=" << endWh;
+ auto future = CheckNoRows(client, query, ss.Str());
future.Wait();
rangeFutures.push_back(future);
}
- auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) {
- for (const auto& future : allFutures) {
- if (future.HasException()) {
- return future;
- }
- }
- return MakeFuture();
- });
-
+ auto result = WaitAllAndCheck(rangeFutures);
RunningChecks.push_back({ result, "3.3.2.11" });
}
@@ -1243,20 +1220,15 @@ void TPCCChecker::ConsistencyCheck33212(TQueryClient& client) {
)", Config.Path.c_str(), startWh, endWh,
TABLE_OORDER, TABLE_ORDER_LINE, TABLE_CUSTOMER);
- auto future = CheckNoRows(client, query);
+ // because of #21490 we run queries 1 by one
+ TStringStream ss;
+ ss << "w_id_from=" << startWh << ", w_id_to=" << endWh;
+ auto future = CheckNoRows(client, query, ss.Str());
future.Wait();
rangeFutures.push_back(future);
}
- auto result = WaitAll(rangeFutures).Apply([allFutures = std::move(rangeFutures)](const auto&) {
- for (const auto& future : allFutures) {
- if (future.HasException()) {
- return future;
- }
- }
- return MakeFuture();
- });
-
+ auto result = WaitAllAndCheck(rangeFutures);
RunningChecks.push_back({ result, "3.3.2.12" });
}