diff options
| -rw-r--r-- | ydb/library/workload/tpcc/common_queries.cpp | 9 | ||||
| -rw-r--r-- | ydb/library/workload/tpcc/constants.h | 15 | ||||
| -rw-r--r-- | ydb/library/workload/tpcc/transaction_delivery.cpp | 69 | ||||
| -rw-r--r-- | ydb/library/workload/tpcc/transaction_neworder.cpp | 88 | ||||
| -rw-r--r-- | ydb/library/workload/tpcc/transaction_orderstatus.cpp | 28 | ||||
| -rw-r--r-- | ydb/library/workload/tpcc/transaction_payment.cpp | 87 | ||||
| -rw-r--r-- | ydb/library/workload/tpcc/transaction_simulation.cpp | 2 | ||||
| -rw-r--r-- | ydb/library/workload/tpcc/transaction_stocklevel.cpp | 22 |
8 files changed, 197 insertions, 123 deletions
diff --git a/ydb/library/workload/tpcc/common_queries.cpp b/ydb/library/workload/tpcc/common_queries.cpp index bbef44dd4c3..341e8cacb87 100644 --- a/ydb/library/workload/tpcc/common_queries.cpp +++ b/ydb/library/workload/tpcc/common_queries.cpp @@ -1,4 +1,5 @@ #include "common_queries.h" +#include "constants.h" #include "log.h" #include "transactions.h" @@ -64,11 +65,11 @@ TAsyncExecuteQueryResult GetCustomerById( SELECT C_FIRST, C_MIDDLE, C_LAST, C_STREET_1, C_STREET_2, C_CITY, C_STATE, C_ZIP, C_PHONE, C_CREDIT, C_CREDIT_LIM, C_DISCOUNT, C_BALANCE, C_YTD_PAYMENT, C_PAYMENT_CNT, C_SINCE - FROM `customer` + FROM `{}` WHERE C_W_ID = $c_w_id AND C_D_ID = $c_d_id AND C_ID = $c_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_CUSTOMER); auto params = TParamsBuilder() .AddParam("$c_w_id").Int32(warehouseID).Build() @@ -108,12 +109,12 @@ TAsyncExecuteQueryResult GetCustomersByLastName( SELECT C_FIRST, C_MIDDLE, C_ID, C_STREET_1, C_STREET_2, C_CITY, C_STATE, C_ZIP, C_PHONE, C_CREDIT, C_CREDIT_LIM, C_DISCOUNT, C_BALANCE, C_YTD_PAYMENT, C_PAYMENT_CNT, C_SINCE - FROM `customer` VIEW idx_customer_name AS idx + FROM `{}` VIEW `{}` AS idx WHERE idx.C_W_ID = $c_w_id AND idx.C_D_ID = $c_d_id AND idx.C_LAST = $c_last ORDER BY idx.C_FIRST; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_CUSTOMER, INDEX_CUSTOMER_NAME); auto params = TParamsBuilder() .AddParam("$c_w_id").Int32(warehouseID).Build() diff --git a/ydb/library/workload/tpcc/constants.h b/ydb/library/workload/tpcc/constants.h index bde60422abf..89ad7f86088 100644 --- a/ydb/library/workload/tpcc/constants.h +++ b/ydb/library/workload/tpcc/constants.h @@ -52,4 +52,19 @@ constexpr std::chrono::seconds STOCK_LEVEL_THINK_TIME{5}; // thinking and keying times and number of terminals per warehouse constexpr double MAX_TPMC_PER_WAREHOUSE = 12.86; +// Table names +constexpr const char* TABLE_CUSTOMER = "customer"; +constexpr const char* TABLE_WAREHOUSE = "warehouse"; +constexpr const char* TABLE_DISTRICT = "district"; +constexpr const char* TABLE_NEW_ORDER = "new_order"; +constexpr const char* TABLE_OORDER = "oorder"; +constexpr const char* TABLE_ITEM = "item"; +constexpr const char* TABLE_STOCK = "stock"; +constexpr const char* TABLE_ORDER_LINE = "order_line"; +constexpr const char* TABLE_HISTORY = "history"; + +// Index/View names +constexpr const char* INDEX_CUSTOMER_NAME = "idx_customer_name"; +constexpr const char* INDEX_ORDER = "idx_order"; + } // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/transaction_delivery.cpp b/ydb/library/workload/tpcc/transaction_delivery.cpp index ebf5ed99d89..4a34ddf14e8 100644 --- a/ydb/library/workload/tpcc/transaction_delivery.cpp +++ b/ydb/library/workload/tpcc/transaction_delivery.cpp @@ -48,12 +48,12 @@ TAsyncExecuteQueryResult GetNewOrder( DECLARE $no_d_id AS Int32; DECLARE $no_w_id AS Int32; - SELECT NO_W_ID, NO_D_ID, NO_O_ID FROM `new_order` + SELECT NO_W_ID, NO_D_ID, NO_O_ID FROM `{}` WHERE NO_D_ID = $no_d_id AND NO_W_ID = $no_w_id ORDER BY NO_W_ID ASC, NO_D_ID ASC, NO_O_ID ASC LIMIT 1; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_NEW_ORDER); auto params = TParamsBuilder() .AddParam("$no_d_id").Int32(districtID).Build() @@ -84,11 +84,11 @@ TAsyncExecuteQueryResult DeleteNewOrder( DECLARE $no_d_id AS Int32; DECLARE $no_w_id AS Int32; - DELETE FROM `new_order` + DELETE FROM `{}` WHERE NO_O_ID = $no_o_id AND NO_D_ID = $no_d_id AND NO_W_ID = $no_w_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_NEW_ORDER); auto params = TParamsBuilder() .AddParam("$no_o_id").Int32(orderID).Build() @@ -120,11 +120,11 @@ TAsyncExecuteQueryResult GetCustomerID( DECLARE $o_w_id AS Int32; SELECT O_C_ID - FROM `oorder` + FROM `{}` WHERE O_ID = $o_id AND O_D_ID = $o_d_id AND O_W_ID = $o_w_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_OORDER); auto params = TParamsBuilder() .AddParam("$o_id").Int32(orderID).Build() @@ -156,9 +156,9 @@ TAsyncExecuteQueryResult UpdateCarrierID( DECLARE $o_w_id AS Int32; DECLARE $o_carrier_id AS Int32; - UPSERT INTO `oorder` (O_W_ID, O_D_ID, O_ID, O_CARRIER_ID) + UPSERT INTO `{}` (O_W_ID, O_D_ID, O_ID, O_CARRIER_ID) VALUES ($o_w_id, $o_d_id, $o_id, $o_carrier_id); - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_OORDER); auto params = TParamsBuilder() .AddParam("$o_w_id").Int32(warehouseID).Build() @@ -188,8 +188,8 @@ TAsyncExecuteQueryResult UpdateDeliveryDate( $mapper = ($row) -> (AsStruct( $row.p1 as OL_W_ID, $row.p2 as OL_D_ID, $row.p3 as OL_O_ID, $row.p4 as OL_NUMBER, $row.p5 as OL_DELIVERY_D)); - UPSERT INTO `order_line` SELECT * FROM as_table(ListMap($values, $mapper)); - )", context.Path.c_str()); + UPSERT INTO `{}` SELECT * FROM as_table(ListMap($values, $mapper)); + )", context.Path.c_str(), TABLE_ORDER_LINE); auto paramsBuilder = TParamsBuilder(); auto& listBuilder = paramsBuilder.AddParam("$values").BeginList(); @@ -229,11 +229,11 @@ TAsyncExecuteQueryResult GetOrderLines( DECLARE $ol_w_id AS Int32; SELECT OL_NUMBER, OL_AMOUNT - FROM `order_line` + FROM `{}` WHERE OL_O_ID = $ol_o_id AND OL_D_ID = $ol_d_id AND OL_W_ID = $ol_w_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_ORDER_LINE); auto params = TParamsBuilder() .AddParam("$ol_o_id").Int32(orderID).Build() @@ -265,11 +265,11 @@ TAsyncExecuteQueryResult GetCustomerData( DECLARE $c_id AS Int32; SELECT C_BALANCE, C_DELIVERY_CNT - FROM `customer` + FROM `{}` WHERE C_W_ID = $c_w_id AND C_D_ID = $c_d_id AND C_ID = $c_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_CUSTOMER); auto params = TParamsBuilder() .AddParam("$c_w_id").Int32(warehouseID).Build() @@ -302,9 +302,9 @@ TAsyncExecuteQueryResult UpdateCustomerBalanceAndDeliveryCount( DECLARE $c_balance AS Double; DECLARE $c_delivery_cnt AS Int32; - UPSERT INTO `customer` (C_W_ID, C_D_ID, C_ID, C_BALANCE, C_DELIVERY_CNT) + UPSERT INTO `{}` (C_W_ID, C_D_ID, C_ID, C_BALANCE, C_DELIVERY_CNT) VALUES ($c_w_id, $c_d_id, $c_id, $c_balance, $c_delivery_cnt); - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_CUSTOMER); auto params = TParamsBuilder() .AddParam("$c_w_id").Int32(warehouseID).Build() @@ -342,7 +342,8 @@ NThreading::TFuture<TStatus> GetDeliveryTask( const int warehouseID = context.WarehouseID; const int carrierID = RandomNumber(1, 10); - LOG_T("Terminal " << context.TerminalID << " started Delivery transaction in " << warehouseID); + LOG_T("Terminal " << context.TerminalID << " started Delivery transaction in " << warehouseID + << ", session: " << session.GetId()); size_t processedOrderCount = 0; std::optional<TTransaction> tx; @@ -356,9 +357,11 @@ NThreading::TFuture<TStatus> GetDeliveryTask( if (!newOrderResult.IsSuccess()) { if (ShouldExit(newOrderResult)) { LOG_E("Terminal " << context.TerminalID << " new order query failed: " - << newOrderResult.GetIssues().ToOneLineString()); + << newOrderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " new order query failed: " + << newOrderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return newOrderResult; } @@ -383,9 +386,11 @@ NThreading::TFuture<TStatus> GetDeliveryTask( if (!customerIdResult.IsSuccess()) { if (ShouldExit(customerIdResult)) { LOG_E("Terminal " << context.TerminalID << " get customer ID failed: " - << customerIdResult.GetIssues().ToOneLineString()); + << customerIdResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " get customer ID failed: " + << customerIdResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return customerIdResult; } @@ -403,9 +408,11 @@ NThreading::TFuture<TStatus> GetDeliveryTask( if (!customerDataResult.IsSuccess()) { if (ShouldExit(customerDataResult)) { LOG_E("Terminal " << context.TerminalID << " get customer data failed: " - << customerDataResult.GetIssues().ToOneLineString()); + << customerDataResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " get customer data failed: " + << customerDataResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return customerDataResult; } @@ -425,9 +432,11 @@ NThreading::TFuture<TStatus> GetDeliveryTask( if (!orderLinesResult.IsSuccess()) { if (ShouldExit(orderLinesResult)) { LOG_E("Terminal " << context.TerminalID << " get order lines failed: " - << orderLinesResult.GetIssues().ToOneLineString()); + << orderLinesResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " get order lines failed: " + << orderLinesResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return orderLinesResult; } @@ -463,9 +472,11 @@ NThreading::TFuture<TStatus> GetDeliveryTask( if (!deleteOrderResult.IsSuccess()) { if (ShouldExit(deleteOrderResult)) { LOG_E("Terminal " << context.TerminalID << " delete order failed: " - << deleteOrderResult.GetIssues().ToOneLineString()); + << deleteOrderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " delete order failed: " + << deleteOrderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return deleteOrderResult; } @@ -475,9 +486,11 @@ NThreading::TFuture<TStatus> GetDeliveryTask( if (!updateCarrierResult.IsSuccess()) { if (ShouldExit(updateCarrierResult)) { LOG_E("Terminal " << context.TerminalID << " update carrier ID failed: " - << updateCarrierResult.GetIssues().ToOneLineString()); + << updateCarrierResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " update carrier ID failed: " + << updateCarrierResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return updateCarrierResult; } @@ -489,9 +502,11 @@ NThreading::TFuture<TStatus> GetDeliveryTask( if (!updateDeliveryResult.IsSuccess()) { if (ShouldExit(updateDeliveryResult)) { LOG_E("Terminal " << context.TerminalID << " update delivery date failed: " - << updateDeliveryResult.GetIssues().ToOneLineString()); + << updateDeliveryResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " update delivery date failed: " + << updateDeliveryResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return updateDeliveryResult; } @@ -501,9 +516,11 @@ NThreading::TFuture<TStatus> GetDeliveryTask( if (!updateCustomerResult.IsSuccess()) { if (ShouldExit(updateCustomerResult)) { LOG_E("Terminal " << context.TerminalID << " update customer failed: " - << updateCustomerResult.GetIssues().ToOneLineString()); + << updateCustomerResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " update customer failed: " + << updateCustomerResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return updateCustomerResult; } @@ -512,7 +529,7 @@ NThreading::TFuture<TStatus> GetDeliveryTask( } LOG_T("Terminal " << context.TerminalID - << " is committing Delivery transaction, processed " << processedOrderCount << " districts"); + << " is committing Delivery transaction, processed " << processedOrderCount << " districts, session: " << session.GetId()); auto commitFuture = tx->Commit(); auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); diff --git a/ydb/library/workload/tpcc/transaction_neworder.cpp b/ydb/library/workload/tpcc/transaction_neworder.cpp index 703d748fd2a..31ef64f6cbf 100644 --- a/ydb/library/workload/tpcc/transaction_neworder.cpp +++ b/ydb/library/workload/tpcc/transaction_neworder.cpp @@ -40,7 +40,7 @@ struct TSqlHelper { ss << Sprintf("DECLARE $item%d AS Int32;\n", j); } - ss << "SELECT I_ID, I_PRICE, I_NAME, I_DATA FROM item WHERE I_ID IN ("; + ss << "SELECT I_ID, I_PRICE, I_NAME, I_DATA FROM `" << TABLE_ITEM << "` WHERE I_ID IN ("; for (int j = 1; j <= i; ++j) { if (j == 1) { ss << "$item1"; @@ -70,7 +70,7 @@ struct TSqlHelper { ss << "SELECT S_W_ID, S_I_ID, S_QUANTITY, S_DATA, S_YTD, S_ORDER_CNT, S_REMOTE_CNT, " << "S_DIST_01, S_DIST_02, S_DIST_03, S_DIST_04, S_DIST_05, " << "S_DIST_06, S_DIST_07, S_DIST_08, S_DIST_09, S_DIST_10 " << - "FROM stock WHERE (S_W_ID, S_I_ID) IN ("; + "FROM `" << TABLE_STOCK << "` WHERE (S_W_ID, S_I_ID) IN ("; for (int j = 1; j <= i; ++j) { if (j == 1) { @@ -151,11 +151,11 @@ TAsyncExecuteQueryResult GetCustomer( DECLARE $c_id AS Int32; SELECT C_DISCOUNT, C_LAST, C_CREDIT - FROM `customer` + FROM `{}` WHERE C_W_ID = $c_w_id AND C_D_ID = $c_d_id AND C_ID = $c_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_CUSTOMER); auto params = TParamsBuilder() .AddParam("$c_w_id").Int32(warehouseID).Build() @@ -169,7 +169,7 @@ TAsyncExecuteQueryResult GetCustomer( std::move(params)); LOG_T("Terminal " << context.TerminalID << " waiting for customer result for " - << warehouseID << ", " << districtID << ", " << customerID); + << warehouseID << ", " << districtID << ", " << customerID << ", session: " << session.GetId()); return result; } @@ -185,9 +185,9 @@ TAsyncExecuteQueryResult GetWarehouseTax( DECLARE $w_id AS Int32; SELECT W_TAX - FROM `warehouse` + FROM `{}` WHERE W_ID = $w_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_WAREHOUSE); auto params = TParamsBuilder() .AddParam("$w_id").Int32(warehouseID).Build() @@ -198,7 +198,7 @@ TAsyncExecuteQueryResult GetWarehouseTax( TTxControl::Tx(tx), std::move(params)); - LOG_T("Terminal " << context.TerminalID << " waiting for warehouse result for " << warehouseID); + LOG_T("Terminal " << context.TerminalID << " waiting for warehouse result for " << warehouseID << ", session: " << session.GetId()); return result; } @@ -215,10 +215,10 @@ TAsyncExecuteQueryResult GetDistrict( DECLARE $d_id AS Int32; SELECT D_NEXT_O_ID, D_TAX - FROM `district` + FROM `{}` WHERE D_W_ID = $d_w_id AND D_ID = $d_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_DISTRICT); auto params = TParamsBuilder() .AddParam("$d_w_id").Int32(warehouseID).Build() @@ -231,7 +231,7 @@ TAsyncExecuteQueryResult GetDistrict( std::move(params)); LOG_T("Terminal " << context.TerminalID << " waiting for district result for " - << warehouseID << ", " << districtID); + << warehouseID << ", " << districtID << ", session: " << session.GetId()); return result; } @@ -253,9 +253,9 @@ TAsyncExecuteQueryResult UpdateDistrict( DECLARE $d_id AS Int32; DECLARE $d_next_o_id AS Int32; - UPSERT INTO `district` (D_W_ID, D_ID, D_NEXT_O_ID) + UPSERT INTO `{}` (D_W_ID, D_ID, D_NEXT_O_ID) VALUES ($d_w_id, $d_id, $d_next_o_id); - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_DISTRICT); auto params = TParamsBuilder() .AddParam("$d_w_id").Int32(warehouseID).Build() @@ -290,9 +290,9 @@ TAsyncExecuteQueryResult InsertNewOrder( DECLARE $no_d_id AS Int32; DECLARE $no_w_id AS Int32; - INSERT INTO `new_order` (NO_O_ID, NO_D_ID, NO_W_ID) + UPSERT INTO `{}` (NO_O_ID, NO_D_ID, NO_W_ID) VALUES ($no_o_id, $no_d_id, $no_w_id); - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_NEW_ORDER); auto params = TParamsBuilder() .AddParam("$no_o_id").Int32(orderID).Build() @@ -334,9 +334,9 @@ TAsyncExecuteQueryResult InsertOpenOrder( DECLARE $o_ol_cnt AS Int32; DECLARE $o_all_local AS Int32; - INSERT INTO `oorder` (O_ID, O_D_ID, O_W_ID, O_C_ID, O_ENTRY_D, O_OL_CNT, O_ALL_LOCAL) + UPSERT INTO `{}` (O_ID, O_D_ID, O_W_ID, O_C_ID, O_ENTRY_D, O_OL_CNT, O_ALL_LOCAL) VALUES ($o_id, $o_d_id, $o_w_id, $o_c_id, $o_entry_d, $o_ol_cnt, $o_all_local); - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_OORDER); auto params = TParamsBuilder() .AddParam("$o_id").Int32(orderID).Build() @@ -437,8 +437,8 @@ TAsyncExecuteQueryResult UpdateStocks( $row.p4 as S_YTD, $row.p5 as S_ORDER_CNT, $row.p6 as S_REMOTE_CNT)); - UPSERT INTO `stock` SELECT * FROM AS_TABLE(ListMap($values, $mapper)); - )", context.Path.c_str()); + UPSERT INTO `{}` SELECT * FROM AS_TABLE(ListMap($values, $mapper)); + )", context.Path.c_str(), TABLE_STOCK); auto paramsBuilder = TParamsBuilder(); auto& listBuilder = paramsBuilder.AddParam("$values").BeginList(); @@ -479,8 +479,8 @@ TAsyncExecuteQueryResult InsertOrderLines( $row.p6 as OL_DELIVERY_D, $row.p7 as OL_AMOUNT, $row.p8 as OL_SUPPLY_W_ID, $row.p9 as OL_QUANTITY, $row.p10 as OL_DIST_INFO)); - UPSERT INTO order_line SELECT * FROM AS_TABLE(ListMap($values, $mapper)); - )", context.Path.c_str()); + UPSERT INTO `{}` SELECT * FROM AS_TABLE(ListMap($values, $mapper)); + )", context.Path.c_str(), TABLE_ORDER_LINE); auto paramsBuilder = TParamsBuilder(); auto& listBuilder = paramsBuilder.AddParam("$values").BeginList(); @@ -547,7 +547,7 @@ NThreading::TFuture<TStatus> GetNewOrderTask( const int customerID = GetRandomCustomerID(); LOG_T("Terminal " << context.TerminalID << " started NewOrder transaction in " - << warehouseID << ", " << districtID << " for " << customerID); + << warehouseID << ", " << districtID << " for " << customerID << ", session: " << session.GetId()); // Generate order line items @@ -590,9 +590,11 @@ NThreading::TFuture<TStatus> GetNewOrderTask( if (!customerResult.IsSuccess()) { if (ShouldExit(customerResult)) { LOG_E("Terminal " << context.TerminalID << " customer query failed: " << customerResult.GetStatus() << ", " - << customerResult.GetIssues().ToOneLineString()); + << customerResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " customer query failed: " << customerResult.GetStatus() << ", " + << customerResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return customerResult; } @@ -606,9 +608,11 @@ NThreading::TFuture<TStatus> GetNewOrderTask( if (!warehouseResult.IsSuccess()) { if (ShouldExit(warehouseResult)) { LOG_E("Terminal " << context.TerminalID << " warehouse query failed: " - << warehouseResult.GetIssues().ToOneLineString()); + << warehouseResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " warehouse query failed: " + << warehouseResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return warehouseResult; } @@ -618,10 +622,12 @@ NThreading::TFuture<TStatus> GetNewOrderTask( auto districtResult = co_await TSuspendWithFuture(districtFuture, context.TaskQueue, context.TerminalID); if (!districtResult.IsSuccess()) { if (ShouldExit(districtResult)) { - LOG_E("Terminal " << context.TerminalID << " district query failed: " - << districtResult.GetIssues().ToOneLineString()); + LOG_E("Terminal " << context.TerminalID << " district query (neworder) failed: " + << districtResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " district query (neworder) failed: " + << districtResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return districtResult; } @@ -640,9 +646,11 @@ NThreading::TFuture<TStatus> GetNewOrderTask( if (!updateDistrictResult.IsSuccess()) { if (ShouldExit(updateDistrictResult)) { LOG_E("Terminal " << context.TerminalID << " district update failed: " - << updateDistrictResult.GetIssues().ToOneLineString()); + << updateDistrictResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " district update failed: " + << updateDistrictResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return updateDistrictResult; } @@ -653,9 +661,11 @@ NThreading::TFuture<TStatus> GetNewOrderTask( if (!newOrderResult.IsSuccess()) { if (ShouldExit(newOrderResult)) { LOG_E("Terminal " << context.TerminalID << " new order insert failed: " - << newOrderResult.GetIssues().ToOneLineString()); + << newOrderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " new order insert failed: " + << newOrderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return newOrderResult; } @@ -667,9 +677,11 @@ NThreading::TFuture<TStatus> GetNewOrderTask( if (!openOrderResult.IsSuccess()) { if (ShouldExit(openOrderResult)) { LOG_E("Terminal " << context.TerminalID << " open order insert failed: " - << openOrderResult.GetIssues().ToOneLineString()); + << openOrderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " open order insert failed: " + << openOrderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return openOrderResult; } @@ -680,9 +692,11 @@ NThreading::TFuture<TStatus> GetNewOrderTask( if (!itemsResult.IsSuccess()) { if (ShouldExit(itemsResult)) { LOG_E("Terminal " << context.TerminalID << " items query failed: " - << itemsResult.GetIssues().ToOneLineString()); + << itemsResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " items query failed: " + << itemsResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return itemsResult; } @@ -712,9 +726,11 @@ NThreading::TFuture<TStatus> GetNewOrderTask( if (!stocksResult.IsSuccess()) { if (ShouldExit(stocksResult)) { LOG_E("Terminal " << context.TerminalID << " stocks query failed: " - << stocksResult.GetIssues().ToOneLineString()); + << stocksResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " stocks query failed: " + << stocksResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return stocksResult; } @@ -815,9 +831,11 @@ NThreading::TFuture<TStatus> GetNewOrderTask( if (!updateStocksResult.IsSuccess()) { if (ShouldExit(updateStocksResult)) { LOG_E("Terminal " << context.TerminalID << " stocks update failed: " - << updateStocksResult.GetIssues().ToOneLineString()); + << updateStocksResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " stocks update failed: " + << updateStocksResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return updateStocksResult; } @@ -828,13 +846,15 @@ NThreading::TFuture<TStatus> GetNewOrderTask( if (!orderLinesResult.IsSuccess()) { if (ShouldExit(orderLinesResult)) { LOG_E("Terminal " << context.TerminalID << " order lines insert failed: " - << orderLinesResult.GetIssues().ToOneLineString()); + << orderLinesResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " order lines insert failed: " + << orderLinesResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return orderLinesResult; } - LOG_T("Terminal " << context.TerminalID << " is committing NewOrder transaction"); + LOG_T("Terminal " << context.TerminalID << " is committing NewOrder transaction, session: " << session.GetId()); auto commitFuture = tx.Commit(); auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); diff --git a/ydb/library/workload/tpcc/transaction_orderstatus.cpp b/ydb/library/workload/tpcc/transaction_orderstatus.cpp index 072c8dfc31a..76c102b6f11 100644 --- a/ydb/library/workload/tpcc/transaction_orderstatus.cpp +++ b/ydb/library/workload/tpcc/transaction_orderstatus.cpp @@ -46,13 +46,13 @@ TAsyncExecuteQueryResult GetOrderByCustomer( DECLARE $o_c_id AS Int32; SELECT O_W_ID, O_D_ID, O_C_ID, O_ID, O_CARRIER_ID, O_ENTRY_D - FROM `oorder` VIEW idx_order AS idx + FROM `{}` VIEW `{}` AS idx WHERE idx.O_W_ID = $o_w_id AND idx.O_D_ID = $o_d_id AND idx.O_C_ID = $o_c_id ORDER BY idx.O_W_ID DESC, idx.O_D_ID DESC, idx.O_C_ID DESC, idx.O_ID DESC LIMIT 1; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_OORDER, INDEX_ORDER); auto params = TParamsBuilder() .AddParam("$o_w_id").Int32(warehouseID).Build() @@ -84,11 +84,11 @@ TAsyncExecuteQueryResult GetOrderLines( DECLARE $ol_o_id AS Int32; SELECT OL_I_ID, OL_SUPPLY_W_ID, OL_QUANTITY, OL_AMOUNT, OL_DELIVERY_D - FROM `order_line` + FROM `{}` WHERE OL_O_ID = $ol_o_id AND OL_D_ID = $ol_d_id AND OL_W_ID = $ol_w_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_ORDER_LINE); auto params = TParamsBuilder() .AddParam("$ol_w_id").Int32(warehouseID).Build() @@ -125,7 +125,7 @@ NThreading::TFuture<TStatus> GetOrderStatusTask( const int districtID = RandomNumber(DISTRICT_LOW_ID, DISTRICT_HIGH_ID); LOG_T("Terminal " << context.TerminalID << " started OrderStatus transaction in " - << warehouseID << ", " << districtID); + << warehouseID << ", " << districtID << ", session: " << session.GetId()); // Determine lookup method (60% by name, 40% by id) bool lookupByName = RandomNumber(1, 100) <= 60; @@ -144,9 +144,11 @@ NThreading::TFuture<TStatus> GetOrderStatusTask( if (!customersResult.IsSuccess()) { if (ShouldExit(customersResult)) { LOG_E("Terminal " << context.TerminalID << " customers query failed: " - << customersResult.GetIssues().ToOneLineString()); + << customersResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " customers query failed: " + << customersResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return customersResult; } @@ -168,9 +170,11 @@ NThreading::TFuture<TStatus> GetOrderStatusTask( if (!customerResult.IsSuccess()) { if (ShouldExit(customerResult)) { LOG_E("Terminal " << context.TerminalID << " customer query failed: " - << customerResult.GetIssues().ToOneLineString()); + << customerResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " customer query failed: " + << customerResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return customerResult; } @@ -193,9 +197,11 @@ NThreading::TFuture<TStatus> GetOrderStatusTask( if (!orderResult.IsSuccess()) { if (ShouldExit(orderResult)) { LOG_E("Terminal " << context.TerminalID << " order query failed: " - << orderResult.GetIssues().ToOneLineString()); + << orderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " order query failed: " + << orderResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return orderResult; } @@ -213,15 +219,17 @@ NThreading::TFuture<TStatus> GetOrderStatusTask( if (!orderLinesResult.IsSuccess()) { if (ShouldExit(orderLinesResult)) { LOG_E("Terminal " << context.TerminalID << " order lines query failed: " - << orderLinesResult.GetIssues().ToOneLineString()); + << orderLinesResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " order lines query failed: " + << orderLinesResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return orderLinesResult; } LOG_T("Terminal " << context.TerminalID << " is committing OrderStatus transaction: " << "customer " << customer.c_id << ", order " << orderID - << ", lines " << orderLinesResult.GetResultSet(0).RowsCount()); + << ", lines " << orderLinesResult.GetResultSet(0).RowsCount() << ", session: " << session.GetId()); auto commitFuture = tx->Commit(); auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); diff --git a/ydb/library/workload/tpcc/transaction_payment.cpp b/ydb/library/workload/tpcc/transaction_payment.cpp index 8465b1eba99..3f271016ad1 100644 --- a/ydb/library/workload/tpcc/transaction_payment.cpp +++ b/ydb/library/workload/tpcc/transaction_payment.cpp @@ -32,11 +32,11 @@ TAsyncExecuteQueryResult UpdateWarehouse( DECLARE $w_id AS Int32; DECLARE $payment AS Double; - UPDATE `warehouse` + UPDATE `{}` SET W_YTD = W_YTD + $payment WHERE W_ID = $w_id RETURNING W_STREET_1, W_STREET_2, W_CITY, W_STATE, W_ZIP, W_NAME; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_WAREHOUSE); auto params = TParamsBuilder() .AddParam("$w_id").Int32(warehouseID).Build() @@ -66,12 +66,12 @@ TAsyncExecuteQueryResult UpdateDistrict( DECLARE $d_id AS Int32; DECLARE $payment AS Double; - UPDATE `district` + UPDATE `{}` SET D_YTD = D_YTD + $payment WHERE D_W_ID = $d_w_id AND D_ID = $d_id RETURNING D_STREET_1, D_STREET_2, D_CITY, D_STATE, D_ZIP, D_NAME; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_DISTRICT); auto params = TParamsBuilder() .AddParam("$d_w_id").Int32(warehouseID).Build() @@ -103,11 +103,11 @@ TAsyncExecuteQueryResult GetCustomerCData( DECLARE $c_id AS Int32; SELECT C_DATA - FROM `customer` + FROM `{}` WHERE C_W_ID = $c_w_id AND C_D_ID = $c_d_id AND C_ID = $c_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_CUSTOMER); auto params = TParamsBuilder() .AddParam("$c_w_id").Int32(warehouseID).Build() @@ -143,10 +143,10 @@ TAsyncExecuteQueryResult UpdateCustomerWithCData( DECLARE $c_payment_cnt AS Int32; DECLARE $c_data AS Utf8; - UPSERT INTO `customer` + UPSERT INTO `{}` (C_W_ID, C_D_ID, C_ID, C_BALANCE, C_YTD_PAYMENT, C_PAYMENT_CNT, C_DATA) VALUES ($c_w_id, $c_d_id, $c_id, $c_balance, $c_ytd_payment, $c_payment_cnt, $c_data); - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_CUSTOMER); auto params = TParamsBuilder() .AddParam("$c_w_id").Int32(warehouseID).Build() @@ -185,10 +185,10 @@ TAsyncExecuteQueryResult UpdateCustomer( DECLARE $c_ytd_payment AS Double; DECLARE $c_payment_cnt AS Int32; - UPSERT INTO `customer` + UPSERT INTO `{}` (C_W_ID, C_D_ID, C_ID, C_BALANCE, C_YTD_PAYMENT, C_PAYMENT_CNT) VALUES ($c_w_id, $c_d_id, $c_id, $c_balance, $c_ytd_payment, $c_payment_cnt); - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_CUSTOMER); auto params = TParamsBuilder() .AddParam("$c_w_id").Int32(warehouseID).Build() @@ -229,10 +229,10 @@ TAsyncExecuteQueryResult InsertHistoryRecord( DECLARE $h_data AS Utf8; DECLARE $h_c_nano_ts AS Int64; - INSERT INTO `history` + UPSERT INTO `{}` (H_C_D_ID, H_C_W_ID, H_C_ID, H_D_ID, H_W_ID, H_DATE, H_AMOUNT, H_DATA, H_C_NANO_TS) VALUES ($h_c_d_id, $h_c_w_id, $h_c_id, $h_d_id, $h_w_id, $h_date, $h_amount, $h_data, $h_c_nano_ts); - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_HISTORY); auto now = std::chrono::system_clock::now(); auto nanoTs = std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count(); @@ -279,7 +279,7 @@ NThreading::TFuture<TStatus> GetPaymentTask( const double paymentAmount = static_cast<double>(RandomNumber(100, 500000)) / 100.0; LOG_T("Terminal " << context.TerminalID << " started Payment transaction in " - << warehouseID << ", " << districtID); + << warehouseID << ", " << districtID << ", session: " << session.GetId()); // Update warehouse YTD @@ -288,9 +288,11 @@ NThreading::TFuture<TStatus> GetPaymentTask( if (!warehouseResult.IsSuccess()) { if (ShouldExit(warehouseResult)) { LOG_E("Terminal " << context.TerminalID << " warehouse update failed: " - << warehouseResult.GetIssues().ToOneLineString()); + << warehouseResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " warehouse update failed: " + << warehouseResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return warehouseResult; } @@ -299,11 +301,8 @@ NThreading::TFuture<TStatus> GetPaymentTask( TResultSetParser warehouseParser(warehouseResult.GetResultSet(0)); if (!warehouseParser.TryNextRow()) { - if (ShouldExit(warehouseResult)) { - LOG_E("Terminal " << context.TerminalID << " warehouse not found: " << warehouseID); - std::quick_exit(1); - } - co_return warehouseResult; + LOG_E("Terminal " << context.TerminalID << " warehouse not found: " << warehouseID << ", session: " << session.GetId()); + std::quick_exit(1); } std::string warehouseName = *warehouseParser.ColumnParser("W_NAME").GetOptionalUtf8(); @@ -314,20 +313,19 @@ NThreading::TFuture<TStatus> GetPaymentTask( if (!districtResult.IsSuccess()) { if (ShouldExit(districtResult)) { LOG_E("Terminal " << context.TerminalID << " district update failed: " - << districtResult.GetIssues().ToOneLineString()); + << districtResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " district update failed: " + << districtResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return districtResult; } TResultSetParser districtParser(districtResult.GetResultSet(0)); if (!districtParser.TryNextRow()) { - if (ShouldExit(districtResult)) { - LOG_E("Terminal " << context.TerminalID << " district not found: W_ID=" << warehouseID - << ", D_ID=" << districtID); - std::quick_exit(1); - } - co_return districtResult; + LOG_E("Terminal " << context.TerminalID << " district not found: W_ID=" << warehouseID + << ", D_ID=" << districtID << ", session: " << session.GetId()); + std::quick_exit(1); } std::string districtName = *districtParser.ColumnParser("D_NAME").GetOptionalUtf8(); @@ -359,9 +357,11 @@ NThreading::TFuture<TStatus> GetPaymentTask( if (!customersResult.IsSuccess()) { if (ShouldExit(customersResult)) { LOG_E("Terminal " << context.TerminalID << " customers query failed: " - << customersResult.GetIssues().ToOneLineString()); + << customersResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " customers query failed: " + << customersResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return customersResult; } @@ -370,7 +370,7 @@ NThreading::TFuture<TStatus> GetPaymentTask( auto selectedCustomer = SelectCustomerFromResultSet(customersResult.GetResultSet(0)); if (!selectedCustomer) { LOG_E("Terminal " << context.TerminalID << " no customer found by name: " - << warehouseID << ", " << districtID << ", " << lastName); + << warehouseID << ", " << districtID << ", " << lastName << ", session: " << session.GetId()); std::quick_exit(1); } customer = std::move(*selectedCustomer); @@ -382,18 +382,21 @@ NThreading::TFuture<TStatus> GetPaymentTask( if (!customerResult.IsSuccess()) { if (ShouldExit(customerResult)) { LOG_E("Terminal " << context.TerminalID << " customer query failed: " - << customerResult.GetIssues().ToOneLineString()); + << customerResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " customer query failed: " + << customerResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return customerResult; } TResultSetParser customerParser(customerResult.GetResultSet(0)); if (!customerParser.TryNextRow()) { LOG_E("Terminal " << context.TerminalID << " no customer found by id: " - << warehouseID << ", " << districtID << ", " << customerID); + << warehouseID << ", " << districtID << ", " << customerID << ", session: " << session.GetId()); } customer = ParseCustomerFromResult(customerParser); + customer.c_id = customerID; } // Update customer balance @@ -410,18 +413,18 @@ NThreading::TFuture<TStatus> GetPaymentTask( if (!cDataResult.IsSuccess()) { if (ShouldExit(cDataResult)) { LOG_E("Terminal " << context.TerminalID << " customer data query failed: " - << cDataResult.GetIssues().ToOneLineString()); + << cDataResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " customer data query failed: " + << cDataResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return cDataResult; } TResultSetParser cDataParser(cDataResult.GetResultSet(0)); if (!cDataParser.TryNextRow()) { - if (ShouldExit(cDataResult)) { - LOG_E("Terminal " << context.TerminalID << " customer data not found: " << customer.c_id); - std::quick_exit(1); - } + LOG_E("Terminal " << context.TerminalID << " customer data not found: " << customer.c_id << ", session: " << session.GetId()); + std::quick_exit(1); co_return cDataResult; } customer.c_data = cDataParser.ColumnParser("C_DATA").GetOptionalUtf8().value_or(""); @@ -446,9 +449,11 @@ NThreading::TFuture<TStatus> GetPaymentTask( if (!updateCustResult.IsSuccess()) { if (ShouldExit(updateCustResult)) { LOG_E("Terminal " << context.TerminalID << " customer update with data failed: " - << updateCustResult.GetIssues().ToOneLineString()); + << updateCustResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " customer update with data failed: " + << updateCustResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return updateCustResult; } } else { @@ -461,9 +466,11 @@ NThreading::TFuture<TStatus> GetPaymentTask( if (!updateCustResult.IsSuccess()) { if (ShouldExit(updateCustResult)) { LOG_E("Terminal " << context.TerminalID << " customer update failed: " - << updateCustResult.GetIssues().ToOneLineString()); + << updateCustResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " customer update failed: " + << updateCustResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return updateCustResult; } } @@ -482,14 +489,16 @@ NThreading::TFuture<TStatus> GetPaymentTask( if (!historyResult.IsSuccess()) { if (ShouldExit(historyResult)) { LOG_E("Terminal " << context.TerminalID << " history insert failed: " - << historyResult.GetIssues().ToOneLineString()); + << historyResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " history insert failed: " + << historyResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return historyResult; } LOG_T("Terminal " << context.TerminalID << " is committing Payment transaction: " - << "customer " << customer.c_id << ", amount " << paymentAmount); + << "customer " << customer.c_id << ", amount " << paymentAmount << ", session: " << session.GetId()); auto commitFuture = tx.Commit(); auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); diff --git a/ydb/library/workload/tpcc/transaction_simulation.cpp b/ydb/library/workload/tpcc/transaction_simulation.cpp index 502996a4b4f..079dbec60b9 100644 --- a/ydb/library/workload/tpcc/transaction_simulation.cpp +++ b/ydb/library/workload/tpcc/transaction_simulation.cpp @@ -66,7 +66,7 @@ NThreading::TFuture<TStatus> GetSimulationTask( auto& Log = context.Log; - LOG_T("Terminal " << context.TerminalID << " started simulated transaction"); + LOG_T("Terminal " << context.TerminalID << " started simulated transaction, session: " << session.GetId()); // just to test if we have problems with generator (we don't) for (size_t i = 0; i < 10; ++i) { diff --git a/ydb/library/workload/tpcc/transaction_stocklevel.cpp b/ydb/library/workload/tpcc/transaction_stocklevel.cpp index ecedbaca80c..bf48a29c40a 100644 --- a/ydb/library/workload/tpcc/transaction_stocklevel.cpp +++ b/ydb/library/workload/tpcc/transaction_stocklevel.cpp @@ -32,10 +32,10 @@ TAsyncExecuteQueryResult GetDistrictOrderId( DECLARE $d_id AS Int32; SELECT D_NEXT_O_ID - FROM `district` + FROM `{}` WHERE D_W_ID = $d_w_id AND D_ID = $d_id; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_DISTRICT); auto params = TParamsBuilder() .AddParam("$d_w_id").Int32(warehouseID).Build() @@ -69,14 +69,14 @@ TAsyncExecuteQueryResult GetStockCount( DECLARE $s_quantity AS Int32; SELECT COUNT(DISTINCT (s.S_I_ID)) AS STOCK_COUNT - FROM `order_line` as ol INNER JOIN `stock` as s ON s.S_I_ID = ol.OL_I_ID + FROM `{}` as ol INNER JOIN `{}` as s ON s.S_I_ID = ol.OL_I_ID WHERE ol.OL_W_ID = $ol_w_id AND ol.OL_D_ID = $ol_d_id AND ol.OL_O_ID < $ol_o_id_high AND ol.OL_O_ID >= $ol_o_id_low AND s.S_W_ID = $s_w_id AND s.S_QUANTITY < $s_quantity; - )", context.Path.c_str()); + )", context.Path.c_str(), TABLE_ORDER_LINE, TABLE_STOCK); auto params = TParamsBuilder() .AddParam("$ol_w_id").Int32(warehouseID).Build() @@ -117,17 +117,19 @@ NThreading::TFuture<TStatus> GetStockLevelTask( const int threshold = RandomNumber(10, 20); LOG_T("Terminal " << context.TerminalID << " started StockLevel transaction in " - << warehouseID << ", " << districtID); + << warehouseID << ", " << districtID << ", session: " << session.GetId()); // Get next order ID from district auto districtFuture = GetDistrictOrderId(session, context, warehouseID, districtID); auto districtResult = co_await TSuspendWithFuture(districtFuture, context.TaskQueue, context.TerminalID); if (!districtResult.IsSuccess()) { - LOG_E("Terminal " << context.TerminalID << " district query failed: " - << districtResult.GetIssues().ToOneLineString()); if (ShouldExit(districtResult)) { + LOG_E("Terminal " << context.TerminalID << " district query (stockleve) failed: " + << districtResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " district query (stockleve) failed: " + << districtResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return districtResult; } @@ -149,13 +151,15 @@ NThreading::TFuture<TStatus> GetStockLevelTask( if (!stockCountResult.IsSuccess()) { if (ShouldExit(stockCountResult)) { LOG_E("Terminal " << context.TerminalID << " stock count query failed: " - << stockCountResult.GetIssues().ToOneLineString()); + << stockCountResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); std::quick_exit(1); } + LOG_T("Terminal " << context.TerminalID << " stock count query failed: " + << stockCountResult.GetIssues().ToOneLineString() << ", session: " << session.GetId()); co_return stockCountResult; } - LOG_T("Terminal " << context.TerminalID << " is committing StockLevel transaction"); + LOG_T("Terminal " << context.TerminalID << " is committing StockLevel transaction, session: " << session.GetId()); auto commitFuture = tx.Commit(); auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID); |
