diff options
author | turlybekovol <turlybekovol@yandex-team.com> | 2023-08-28 12:03:55 +0300 |
---|---|---|
committer | turlybekovol <turlybekovol@yandex-team.com> | 2023-08-28 12:37:00 +0300 |
commit | da025ccc23fafcc3a3a572d83f82996be433fa89 (patch) | |
tree | 453e8ee6eca255335f4a08be466b62313600ef5f | |
parent | bfdb8734c1c53000f3c653c2c9218c3cca1f952e (diff) | |
download | ydb-da025ccc23fafcc3a3a572d83f82996be433fa89.tar.gz |
Workload TPC-C init
41 files changed, 2135 insertions, 1 deletions
diff --git a/ydb/library/workload/CMakeLists.darwin-x86_64.txt b/ydb/library/workload/CMakeLists.darwin-x86_64.txt index 69416c94e7..4b6d17589e 100644 --- a/ydb/library/workload/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/workload/CMakeLists.darwin-x86_64.txt @@ -6,11 +6,18 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(ydb-library-workload) target_link_libraries(ydb-library-workload PUBLIC contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime api-protos cpp-client-ydb_table ) @@ -18,4 +25,21 @@ target_sources(ydb-library-workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_workload.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/customer.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/district.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/history.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/item.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/new_order.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/oorder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/order_line.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/query_generator.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/stock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/warehouse.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp +) +generate_enum_serilization(ydb-library-workload + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_config.h + INCLUDE_HEADERS + ydb/library/workload/tpcc/tpcc_config.h ) diff --git a/ydb/library/workload/CMakeLists.linux-aarch64.txt b/ydb/library/workload/CMakeLists.linux-aarch64.txt index d5dba36558..6006afe3cc 100644 --- a/ydb/library/workload/CMakeLists.linux-aarch64.txt +++ b/ydb/library/workload/CMakeLists.linux-aarch64.txt @@ -6,12 +6,19 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(ydb-library-workload) target_link_libraries(ydb-library-workload PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime api-protos cpp-client-ydb_table ) @@ -19,4 +26,21 @@ target_sources(ydb-library-workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_workload.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/customer.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/district.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/history.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/item.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/new_order.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/oorder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/order_line.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/query_generator.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/stock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/warehouse.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp +) +generate_enum_serilization(ydb-library-workload + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_config.h + INCLUDE_HEADERS + ydb/library/workload/tpcc/tpcc_config.h ) diff --git a/ydb/library/workload/CMakeLists.linux-x86_64.txt b/ydb/library/workload/CMakeLists.linux-x86_64.txt index d5dba36558..6006afe3cc 100644 --- a/ydb/library/workload/CMakeLists.linux-x86_64.txt +++ b/ydb/library/workload/CMakeLists.linux-x86_64.txt @@ -6,12 +6,19 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(ydb-library-workload) target_link_libraries(ydb-library-workload PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime api-protos cpp-client-ydb_table ) @@ -19,4 +26,21 @@ target_sources(ydb-library-workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_workload.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/customer.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/district.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/history.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/item.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/new_order.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/oorder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/order_line.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/query_generator.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/stock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/warehouse.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp +) +generate_enum_serilization(ydb-library-workload + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_config.h + INCLUDE_HEADERS + ydb/library/workload/tpcc/tpcc_config.h ) diff --git a/ydb/library/workload/CMakeLists.windows-x86_64.txt b/ydb/library/workload/CMakeLists.windows-x86_64.txt index 69416c94e7..4b6d17589e 100644 --- a/ydb/library/workload/CMakeLists.windows-x86_64.txt +++ b/ydb/library/workload/CMakeLists.windows-x86_64.txt @@ -6,11 +6,18 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(ydb-library-workload) target_link_libraries(ydb-library-workload PUBLIC contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime api-protos cpp-client-ydb_table ) @@ -18,4 +25,21 @@ target_sources(ydb-library-workload PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_workload.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/customer.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/district.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/history.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/item.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/new_order.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/oorder.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/order_line.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/query_generator.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/stock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/warehouse.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp +) +generate_enum_serilization(ydb-library-workload + ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_config.h + INCLUDE_HEADERS + ydb/library/workload/tpcc/tpcc_config.h ) diff --git a/ydb/library/workload/tpcc/load_data/customer.cpp b/ydb/library/workload/tpcc/load_data/customer.cpp new file mode 100644 index 0000000000..9c491acc15 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/customer.cpp @@ -0,0 +1,126 @@ +#include "customer.h" + +#include <util/string/printf.h> + +#include <sstream> + +namespace NYdbWorkload { +namespace NTPCC { + +TCustomerLoadQueryGenerator::TCustomerLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed) + : TLoadQueryGenerator(params, seed) + , PartNum(partNum) +{ + WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1; + DistrictId = 1; + CustomerId = 1; +} + +bool TCustomerLoadQueryGenerator::Finished() { + i32 whEnd = PartNum * Params.Warehouses / Params.Threads; + return (WarehouseId > whEnd); +} + +TString TCustomerLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) { + std::string partNumS = std::to_string(partNum); + TString query = Sprintf(R"( + CREATE TABLE `%s/customer` ( + C_W_ID Int32 NOT NULL, + C_D_ID Int32 NOT NULL, + C_ID Int32 NOT NULL, + C_DISCOUNT Double, + C_CREDIT Utf8, + C_LAST Utf8, + C_FIRST Utf8, + C_CREDIT_LIM Double, + C_BALANCE Double, + C_YTD_PAYMENT Double, + C_PAYMENT_CNT Int32, + C_DELIVERY_CNT Int32, + C_STREET_1 Utf8, + C_STREET_2 Utf8, + C_CITY Utf8, + C_STATE Utf8, + C_ZIP Utf8, + C_PHONE Utf8, + C_SINCE Timestamp, + C_MIDDLE Utf8, + C_DATA Utf8, + PRIMARY KEY (C_W_ID, C_D_ID, C_ID) + ) WITH ( + AUTO_PARTITIONING_BY_LOAD = DISABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s, + PARTITION_AT_KEYS = (%s) + ); + )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str()); + + return query; +} + +std::string TCustomerLoadQueryGenerator::GetCleanDDL() { + std::string cleanQuery = "DROP TABLE `customer`;"; + + return cleanQuery; +} + +NYdb::TValue TCustomerLoadQueryGenerator::GetNextBatchLoadDDL() { + NYdb::TValueBuilder rows; + if (Finished()) { + return rows.Build(); + } + + rows.BeginList(); + for (i32 id = 1; id <= Params.LoadBatchSize; ++id) { + rows.AddListItem().BeginStruct(); + rows.AddMember("C_W_ID").Int32(WarehouseId); + rows.AddMember("C_D_ID").Int32(DistrictId); + rows.AddMember("C_ID").Int32(CustomerId); + rows.AddMember("C_DISCOUNT").Double(static_cast<double>(UniformRandom32(1, 5000, Rng)) / 10000.0); + if (UniformRandom32(1, 100, Rng) <= 10) { + rows.AddMember("C_CREDIT").Utf8("BC"); + } else { + rows.AddMember("C_CREDIT").Utf8("GC"); + } + if (CustomerId <= 1000) { + rows.AddMember("C_LAST").Utf8(GetLastName(CustomerId - 1)); + } else { + rows.AddMember("C_LAST").Utf8(GetNonUniformRandomLastNameForLoad(Rng)); + } + rows.AddMember("C_FIRST").Utf8(RandomString(UniformRandom32(8, 16, Rng))); + rows.AddMember("C_CREDIT_LIM").Double(50000.0); + rows.AddMember("C_BALANCE").Double(-10.0); + rows.AddMember("C_YTD_PAYMENT").Double(10.0); + rows.AddMember("C_PAYMENT_CNT").Int32(1); + rows.AddMember("C_DELIVERY_CNT").Int32(0); + rows.AddMember("C_STREET_1").Utf8(RandomString(UniformRandom32(10, 20, Rng))); + rows.AddMember("C_STREET_2").Utf8(RandomString(UniformRandom32(10, 20, Rng))); + rows.AddMember("C_CITY").Utf8(RandomString(UniformRandom32(10, 20, Rng))); + rows.AddMember("C_STATE").Utf8(RandomString(2)); + rows.AddMember("C_ZIP").Utf8(RandomNumberString(4) + "11111"); + rows.AddMember("C_PHONE").Utf8(RandomNumberString(16)); + rows.AddMember("C_SINCE").Timestamp(Now()); + rows.AddMember("C_MIDDLE").Utf8("OE"); + rows.AddMember("C_DATA").Utf8(RandomString(UniformRandom32(300, 500, Rng))); + rows.EndStruct(); + + CustomerId++; + if (CustomerId > ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) { + CustomerId = 1; + DistrictId++; + if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) { + DistrictId = 1; + WarehouseId++; + if (Finished()) { + break; + } + } + } + } + rows.EndList(); + + return rows.Build(); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/customer.h b/ydb/library/workload/tpcc/load_data/customer.h new file mode 100644 index 0000000000..1f380d5f73 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/customer.h @@ -0,0 +1,30 @@ +#pragma once + +#include "query_generator.h" + + +namespace NYdbWorkload { +namespace NTPCC { + +class TCustomerLoadQueryGenerator : public TLoadQueryGenerator { +public: + + TCustomerLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed); + + static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys); + + static std::string GetCleanDDL(); + + NYdb::TValue GetNextBatchLoadDDL() override; + + bool Finished() override; + +private: + ui32 PartNum; + i32 WarehouseId; + i32 DistrictId; + i32 CustomerId; +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/district.cpp b/ydb/library/workload/tpcc/load_data/district.cpp new file mode 100644 index 0000000000..debd5dea8e --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/district.cpp @@ -0,0 +1,88 @@ +#include "district.h" + +#include <util/string/printf.h> + +#include <sstream> + +namespace NYdbWorkload { +namespace NTPCC { + +TDistrictLoadQueryGenerator::TDistrictLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed) + : TLoadQueryGenerator(params, seed) +{ + WarehouseId = 1; + DistrictId = 1; +} + +bool TDistrictLoadQueryGenerator::Finished() { + return (WarehouseId > Params.Warehouses); +} + + +TString TDistrictLoadQueryGenerator::GetCreateDDL(TString path) { + TString query = Sprintf(R"( + CREATE TABLE `%s/district` ( + D_W_ID Int32 NOT NULL, + D_ID Int32 NOT NULL, + D_YTD Double, + D_TAX Double, + D_NEXT_O_ID Int32, + D_NAME Utf8, + D_STREET_1 Utf8, + D_STREET_2 Utf8, + D_CITY Utf8, + D_STATE Utf8, + D_ZIP Utf8, + PRIMARY KEY (D_W_ID, D_ID) + ) WITH ( + AUTO_PARTITIONING_BY_LOAD = DISABLED + ); + )", path.c_str()); + + return query; +} + +std::string TDistrictLoadQueryGenerator::GetCleanDDL() { + std::string cleanQuery = "DROP TABLE `district`;"; + + return cleanQuery; +} + +NYdb::TValue TDistrictLoadQueryGenerator::GetNextBatchLoadDDL() { + NYdb::TValueBuilder rows; + if (Finished()) { + return rows.Build(); + } + + rows.BeginList(); + for (i32 id = 1; id <= Params.LoadBatchSize; ++id) { + rows.AddListItem().BeginStruct(); + rows.AddMember("D_W_ID").Int32(WarehouseId); + rows.AddMember("D_ID").Int32(DistrictId); + rows.AddMember("D_YTD").Double(300000); + rows.AddMember("D_TAX").Double(static_cast<double>(UniformRandom32(0, 2000, Rng)) / 10000.0); + rows.AddMember("D_NEXT_O_ID").Int32(ETPCCWorkloadConstants::TPCC_CUST_PER_DIST + 1); + rows.AddMember("D_NAME").Utf8(RandomString(UniformRandom32(6, 10, Rng))); + rows.AddMember("D_STREET_1").Utf8(RandomString(UniformRandom32(10, 20, Rng))); + rows.AddMember("D_STREET_2").Utf8(RandomString(UniformRandom32(10, 20, Rng))); + rows.AddMember("D_CITY").Utf8(RandomString(UniformRandom32(10, 20, Rng))); + rows.AddMember("D_STATE").Utf8(RandomString(2)); + rows.AddMember("D_ZIP").Utf8("123456789"); + rows.EndStruct(); + + DistrictId++; + if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) { + DistrictId = 1; + WarehouseId++; + if (Finished()) { + break; + } + } + } + rows.EndList(); + + return rows.Build(); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/district.h b/ydb/library/workload/tpcc/load_data/district.h new file mode 100644 index 0000000000..721e0825c1 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/district.h @@ -0,0 +1,28 @@ +#pragma once + +#include "query_generator.h" + + +namespace NYdbWorkload { +namespace NTPCC { + +class TDistrictLoadQueryGenerator : public TLoadQueryGenerator { +public: + + TDistrictLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed); + + static TString GetCreateDDL(TString path); + + static std::string GetCleanDDL(); + + NYdb::TValue GetNextBatchLoadDDL() override; + + bool Finished() override; + +private: + i32 WarehouseId; + i32 DistrictId; +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/history.cpp b/ydb/library/workload/tpcc/load_data/history.cpp new file mode 100644 index 0000000000..5f9b2a95ed --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/history.cpp @@ -0,0 +1,94 @@ +#include "history.h" + +#include <util/string/printf.h> + +#include <sstream> + +namespace NYdbWorkload { +namespace NTPCC { + +THistoryLoadQueryGenerator::THistoryLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed) + : TLoadQueryGenerator(params, seed) + , PartNum(partNum) +{ + WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1; + DistrictId = 1; + CustomerId = 1; +} + +bool THistoryLoadQueryGenerator::Finished() { + i32 whEnd = PartNum * Params.Warehouses / Params.Threads; + return (WarehouseId > whEnd); +} + +TString THistoryLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) { + std::string partNumS = std::to_string(partNum); + TString query = Sprintf(R"( + CREATE TABLE `%s/history` ( + H_C_W_ID Int32, + H_C_ID Int32, + H_C_D_ID Int32, + H_D_ID Int32, + H_W_ID Int32, + H_DATE Timestamp, + H_AMOUNT Double, + H_DATA Utf8, + H_C_NANO_TS Int64 NOT NULL, + PRIMARY KEY (H_C_W_ID, H_C_NANO_TS) + ) WITH ( + AUTO_PARTITIONING_BY_LOAD = DISABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s, + PARTITION_AT_KEYS = (%s) + ); + )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str()); + + return query; +} + +std::string THistoryLoadQueryGenerator::GetCleanDDL() { + std::string cleanQuery = "DROP TABLE `history`;"; + + return cleanQuery; +} + +NYdb::TValue THistoryLoadQueryGenerator::GetNextBatchLoadDDL() { + NYdb::TValueBuilder rows; + if (Finished()) { + return rows.Build(); + } + + rows.BeginList(); + for (i32 id = 1; id <= Params.LoadBatchSize; ++id) { + rows.AddListItem().BeginStruct(); + rows.AddMember("H_C_ID").Int32(CustomerId); + rows.AddMember("H_C_D_ID").Int32(DistrictId); + rows.AddMember("H_C_W_ID").Int32(WarehouseId); + rows.AddMember("H_D_ID").Int32(DistrictId); + rows.AddMember("H_W_ID").Int32(WarehouseId); + rows.AddMember("H_DATE").Timestamp(Now()); + rows.AddMember("H_AMOUNT").Double(10.0); + rows.AddMember("H_DATA").Utf8(RandomString(UniformRandom32(10, 24, Rng))); + rows.AddMember("H_C_NANO_TS").Int64(GetNowNanoSeconds()); + rows.EndStruct(); + + CustomerId++; + if (CustomerId > ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) { + CustomerId = 1; + DistrictId++; + if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) { + DistrictId = 1; + WarehouseId++; + if (Finished()) { + break; + } + } + } + } + rows.EndList(); + + return rows.Build(); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/history.h b/ydb/library/workload/tpcc/load_data/history.h new file mode 100644 index 0000000000..5aac3a8369 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/history.h @@ -0,0 +1,37 @@ +#pragma once + +#include "query_generator.h" + + +namespace NYdbWorkload { +namespace NTPCC { + +class THistoryLoadQueryGenerator : public TLoadQueryGenerator { +public: + + THistoryLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed); + + static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys); + + static std::string GetCleanDDL(); + + NYdb::TValue GetNextBatchLoadDDL() override; + + bool Finished() override; + +private: + ui32 PartNum; + i32 WarehouseId; + i32 DistrictId; + i32 CustomerId; + + ui64 GetNowNanoSeconds() { + std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(duration); + return static_cast<ui64>(nanoseconds.count()); + } +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/item.cpp b/ydb/library/workload/tpcc/load_data/item.cpp new file mode 100644 index 0000000000..0e7779bade --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/item.cpp @@ -0,0 +1,82 @@ +#include "item.h" + +#include <util/string/printf.h> + +#include <sstream> + +namespace NYdbWorkload { +namespace NTPCC { + +TItemLoadQueryGenerator::TItemLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed) + : TLoadQueryGenerator(params, seed) +{ + ItemId = 1; +} + +bool TItemLoadQueryGenerator::Finished() { + return (ItemId > ETPCCWorkloadConstants::TPCC_ITEM_COUNT); +} + +TString TItemLoadQueryGenerator::GetCreateDDL(TString path) { + TString query = Sprintf(R"( + CREATE TABLE `%s/item` ( + I_ID Int32 NOT NULL, + I_NAME Utf8, + I_PRICE Double, + I_DATA Utf8, + I_IM_ID Int32, + PRIMARY KEY (I_ID) + ) WITH ( + AUTO_PARTITIONING_BY_LOAD = DISABLED + ); + )", path.c_str()); + + return query; +} + +std::string TItemLoadQueryGenerator::GetCleanDDL() { + std::string cleanQuery = "DROP TABLE `item`;"; + + return cleanQuery; +} + +NYdb::TValue TItemLoadQueryGenerator::GetNextBatchLoadDDL() { + NYdb::TValueBuilder rows; + if (Finished()) { + return rows.Build(); + } + i32 partStart = ItemId; + i32 partEnd = std::min(static_cast<i32>(ETPCCWorkloadConstants::TPCC_ITEM_COUNT), + partStart + Params.LoadBatchSize - 1); + + rows.BeginList(); + for (i32 row = partStart; row <= partEnd; ++row) { + rows.AddListItem().BeginStruct(); + rows.AddMember("I_ID").Int32(row); + rows.AddMember("I_NAME").Utf8(RandomString(UniformRandom32(14, 24, Rng))); + rows.AddMember("I_PRICE").Double(static_cast<double>(UniformRandom32(100, 10000, Rng)) / 100.0); + + ui64 dataType = UniformRandom32(1, 100, Rng); + ui64 length = UniformRandom32(26, 50, Rng); + if (dataType > 10) { + // 90% of time i_data isa random string of length [26 .. 50] + rows.AddMember("I_DATA").Utf8(RandomString(length)); + } else { + // 10% of time i_data has "ORIGINAL" crammed somewhere in middle + ui64 placeForOriginal = UniformRandom32(1, length - 9, Rng); + rows.AddMember("I_DATA").Utf8( + RandomString(placeForOriginal) + "ORIGINAL" + RandomString(length - 8 - placeForOriginal) + ); + } + rows.AddMember("I_IM_ID").Int32(UniformRandom32(1, 10000, Rng)); + rows.EndStruct(); + } + rows.EndList(); + + ItemId = partEnd + 1; + + return rows.Build(); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/item.h b/ydb/library/workload/tpcc/load_data/item.h new file mode 100644 index 0000000000..a73140f016 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/item.h @@ -0,0 +1,27 @@ +#pragma once + +#include "query_generator.h" + + +namespace NYdbWorkload { +namespace NTPCC { + +class TItemLoadQueryGenerator : public TLoadQueryGenerator { +public: + + TItemLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed); + + static TString GetCreateDDL(TString path); + + static std::string GetCleanDDL(); + + NYdb::TValue GetNextBatchLoadDDL() override; + + bool Finished() override; + +private: + i32 ItemId; +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp b/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp new file mode 100644 index 0000000000..95489b174b --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp @@ -0,0 +1,83 @@ +#include "load_thread_pool.h" + +#include <ydb/library/workload/tpcc/tpcc_thread_resource.h> + +#include <ydb/public/api/protos/ydb_value.pb.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> + +namespace NYdbWorkload { +namespace NTPCC { + +TLoadThreadPool::TLoadThreadPool(std::shared_ptr<TDriver>& driver) + : TThreadPool(TThreadPool::TParams().SetBlocking(true).SetCatching(true)) + , Driver(driver) {} + +void* TLoadThreadPool::CreateThreadSpecificResource() { + auto tableClientSettings = NTable::TClientSettings() + .SessionPoolSettings( + NTable::TSessionPoolSettings() + .MaxActiveSessions(1)); + return new TThreadResource(std::make_shared<NTable::TTableClient>(*Driver, tableClientSettings)); +} + +void TLoadThreadPool::DestroyThreadSpecificResource(void* resource) { + delete static_cast<TThreadResource*>(resource); +} + +TLoadTask::TLoadTask(std::unique_ptr<TLoadQueryGenerator>&& queryGen) + : QueryGen(std::move(queryGen)) {} + +void TLoadTask::Process(void* threadResource) { + try { + int retryCount = 0; + auto query = QueryGen->GetNextBatchLoadDDL(); + auto retrySettings = NYdb::NTable::TRetryOperationSettings() + .MaxRetries(QueryGen->GetParams().RetryCount); + auto& db = static_cast<TThreadResource*>(threadResource)->Client; + + auto upsertQuery = [this, &query, &retryCount] (NTable::TTableClient& tableClient) { + TValue queryCopy(query); + + auto future = tableClient.BulkUpsert( + QueryGen->GetParams().DbPath, + std::move(queryCopy) + ); + + return future.Apply([&retryCount](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { + NYdb::TStatus status = bulkUpsertResult.GetValueSync(); + if (!status.IsSuccess()) { + ++retryCount; + } else if (status.GetIssues()) { + Cerr << status.GetIssues().ToString() << "\n"; + } + return NThreading::MakeFuture(status); + }); + }; + + while (true) { + auto statusF = db->RetryOperation(upsertQuery, retrySettings); + + // We want to prepare the request while the previous one is running + if (!QueryGen->Finished()) { + query = QueryGen->GetNextBatchLoadDDL(); + } else { + NConsoleClient::ThrowOnError(statusF.GetValueSync()); + break; + } + NConsoleClient::ThrowOnError(statusF.GetValueSync()); + + // TODO: Retry monitoring + retryCount = 0; + } + + } catch(NConsoleClient::TYdbErrorException ex) { + Cerr << ex; + throw; + } catch(...) { + throw; + } +} + + +} +} diff --git a/ydb/library/workload/tpcc/load_data/load_thread_pool.h b/ydb/library/workload/tpcc/load_data/load_thread_pool.h new file mode 100644 index 0000000000..c8995e308a --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/load_thread_pool.h @@ -0,0 +1,39 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/library/workload/tpcc/load_data/query_generator.h> + +#include <util/thread/pool.h> +#include <library/cpp/logger/log.h> + +namespace NYdbWorkload { +namespace NTPCC { + +using namespace NYdb; + +class TLoadThreadPool : public TThreadPool { +public: + TLoadThreadPool(std::shared_ptr<TDriver>& driver); + +protected: + void* CreateThreadSpecificResource() override; + + void DestroyThreadSpecificResource(void* resource) override; + +private: + std::shared_ptr<TDriver> Driver; +}; + + +class TLoadTask : public IObjectInQueue { +public: + TLoadTask(std::unique_ptr<TLoadQueryGenerator>&& queryGen); + + void Process(void* threadResource) override; + +private: + std::unique_ptr<TLoadQueryGenerator> QueryGen; +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/new_order.cpp b/ydb/library/workload/tpcc/load_data/new_order.cpp new file mode 100644 index 0000000000..bb31672f0d --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/new_order.cpp @@ -0,0 +1,85 @@ +#include "new_order.h" + +#include <util/string/printf.h> + +#include <sstream> + +namespace NYdbWorkload { +namespace NTPCC { + +TNewOrderLoadQueryGenerator::TNewOrderLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed) + : TLoadQueryGenerator(params, seed) + , PartNum(partNum) +{ + WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1; + DistrictId = 1; + // 900 rows in the NEW-ORDER table corresponding to the last + // 900 rows in the ORDER table for that district + // (i.e., with NO_O_ID between 2,101 and 3,000) + CustomerId = ETPCCWorkloadConstants::TPCC_FIRST_UNPROCESSED_O_ID; +} + +bool TNewOrderLoadQueryGenerator::Finished() { + i32 whEnd = PartNum * Params.Warehouses / Params.Threads; + return (WarehouseId > whEnd); +} + +TString TNewOrderLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) { + std::string partNumS = std::to_string(partNum); + TString query = Sprintf(R"( + CREATE TABLE `%s/new_order` ( + NO_W_ID Int32 NOT NULL, + NO_D_ID Int32 NOT NULL, + NO_O_ID Int32 NOT NULL, + PRIMARY KEY (NO_W_ID, NO_D_ID, NO_O_ID) + ) WITH ( + AUTO_PARTITIONING_BY_LOAD = DISABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s, + PARTITION_AT_KEYS = (%s) + ); + )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str()); + + return query; +} + +std::string TNewOrderLoadQueryGenerator::GetCleanDDL() { + std::string cleanQuery = "DROP TABLE `new_order`;"; + + return cleanQuery; +} + +NYdb::TValue TNewOrderLoadQueryGenerator::GetNextBatchLoadDDL() { + NYdb::TValueBuilder rows; + if (Finished()) { + return rows.Build(); + } + + rows.BeginList(); + for (i32 id = 1; id <= Params.LoadBatchSize; ++id) { + rows.AddListItem().BeginStruct(); + rows.AddMember("NO_W_ID").Int32(WarehouseId); + rows.AddMember("NO_D_ID").Int32(DistrictId); + rows.AddMember("NO_O_ID").Int32(CustomerId); + rows.EndStruct(); + + CustomerId++; + if (CustomerId > ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) { + CustomerId = ETPCCWorkloadConstants::TPCC_FIRST_UNPROCESSED_O_ID; + DistrictId++; + if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) { + DistrictId = 1; + WarehouseId++; + if (Finished()) { + break; + } + } + } + } + rows.EndList(); + + return rows.Build(); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/new_order.h b/ydb/library/workload/tpcc/load_data/new_order.h new file mode 100644 index 0000000000..e751962c6d --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/new_order.h @@ -0,0 +1,30 @@ +#pragma once + +#include "query_generator.h" + + +namespace NYdbWorkload { +namespace NTPCC { + +class TNewOrderLoadQueryGenerator : public TLoadQueryGenerator { +public: + + TNewOrderLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed); + + static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys); + + static std::string GetCleanDDL(); + + NYdb::TValue GetNextBatchLoadDDL() override; + + bool Finished() override; + +private: + ui32 PartNum; + i32 WarehouseId; + i32 DistrictId; + i32 CustomerId; +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/oorder.cpp b/ydb/library/workload/tpcc/load_data/oorder.cpp new file mode 100644 index 0000000000..f0eda4e532 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/oorder.cpp @@ -0,0 +1,102 @@ +#include "oorder.h" + +#include <util/string/printf.h> + +#include <sstream> + +namespace NYdbWorkload { +namespace NTPCC { + +TOorderLoadQueryGenerator::TOorderLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed) + : TLoadQueryGenerator(params, seed) + , PartNum(partNum) + , CustShuffle(ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) +{ + WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1; + DistrictId = 1; + CustomerId = 0; + for (ui64 id = 0; id < CustShuffle.size(); ++id) { + CustShuffle[id] = id + 1; + } + std::shuffle(CustShuffle.begin(), CustShuffle.end(), Rng); +} + +bool TOorderLoadQueryGenerator::Finished() { + i32 whEnd = PartNum * Params.Warehouses / Params.Threads; + return (WarehouseId > whEnd); +} + +TString TOorderLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) { + std::string partNumS = std::to_string(partNum); + TString query = Sprintf(R"( + CREATE TABLE `%s/oorder` ( + O_W_ID Int32 NOT NULL, + O_D_ID Int32 NOT NULL, + O_ID Int32 NOT NULL, + O_C_ID Int32, + O_CARRIER_ID Int32, + O_OL_CNT Int32, + O_ALL_LOCAL Int32, + O_ENTRY_D Timestamp, + PRIMARY KEY (O_W_ID, O_D_ID, O_ID) + ) WITH ( + AUTO_PARTITIONING_BY_LOAD = DISABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s, + PARTITION_AT_KEYS = (%s) + ); + )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str()); + + return query; +} + +std::string TOorderLoadQueryGenerator::GetCleanDDL() { + std::string cleanQuery = "DROP TABLE `oorder`;"; + + return cleanQuery; +} + +NYdb::TValue TOorderLoadQueryGenerator::GetNextBatchLoadDDL() { + NYdb::TValueBuilder rows; + if (Finished()) { + return rows.Build(); + } + + rows.BeginList(); + for (i32 id = 1; id <= Params.LoadBatchSize; ++id) { + rows.AddListItem().BeginStruct(); + rows.AddMember("O_W_ID").Int32(WarehouseId); + rows.AddMember("O_D_ID").Int32(DistrictId); + rows.AddMember("O_ID").Int32(CustomerId); + rows.AddMember("O_C_ID").Int32(CustShuffle[CustomerId]); + if (CustomerId + 1 < ETPCCWorkloadConstants::TPCC_FIRST_UNPROCESSED_O_ID) { + rows.AddMember("O_CARRIER_ID").Int32(UniformRandom32(10, 24, Rng)); + } else { + rows.AddMember("O_CARRIER_ID").Int32(0); + } + rows.AddMember("O_OL_CNT").Int32(UniformRandom32(5, 15, Rng)); + rows.AddMember("O_ALL_LOCAL").Int32(1); + rows.AddMember("O_ENTRY_D").Timestamp(Now()); + rows.EndStruct(); + + CustomerId++; + if (CustomerId == ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) { + CustomerId = 0; + DistrictId++; + std::shuffle(CustShuffle.begin(), CustShuffle.end(), Rng); + if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) { + DistrictId = 1; + WarehouseId++; + if (Finished()) { + break; + } + } + } + } + rows.EndList(); + + return rows.Build(); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/oorder.h b/ydb/library/workload/tpcc/load_data/oorder.h new file mode 100644 index 0000000000..160e0b8118 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/oorder.h @@ -0,0 +1,31 @@ +#pragma once + +#include "query_generator.h" + + +namespace NYdbWorkload { +namespace NTPCC { + +class TOorderLoadQueryGenerator : public TLoadQueryGenerator { +public: + + TOorderLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed); + + static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys); + + static std::string GetCleanDDL(); + + NYdb::TValue GetNextBatchLoadDDL() override; + + bool Finished() override; + +private: + ui32 PartNum; + i32 WarehouseId; + i32 DistrictId; + i32 CustomerId; + std::vector<ui64> CustShuffle; +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/order_line.cpp b/ydb/library/workload/tpcc/load_data/order_line.cpp new file mode 100644 index 0000000000..b254f6e61e --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/order_line.cpp @@ -0,0 +1,111 @@ +#include "order_line.h" + +#include <util/string/printf.h> + +#include <sstream> + +namespace NYdbWorkload { +namespace NTPCC { + +TOrderLineLoadQueryGenerator::TOrderLineLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed) + : TLoadQueryGenerator(params, seed) + , PartNum(partNum) +{ + WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1; + DistrictId = 1; + CustomerId = 1; + OlNumber = 1; + RandomCount = UniformRandom32(5, 15, Rng); +} + +bool TOrderLineLoadQueryGenerator::Finished() { + i32 whEnd = PartNum * Params.Warehouses / Params.Threads; + return (WarehouseId > whEnd); +} + +TString TOrderLineLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) { + std::string partNumS = std::to_string(partNum); + TString query = Sprintf(R"( + CREATE TABLE `%s/order_line` ( + OL_W_ID Int32 NOT NULL, + OL_D_ID Int32 NOT NULL, + OL_O_ID Int32 NOT NULL, + OL_NUMBER Int32 NOT NULL, + OL_I_ID Int32, + OL_DELIVERY_D Timestamp, + OL_AMOUNT Double, + OL_SUPPLY_W_ID Int32, + OL_QUANTITY Double, + OL_DIST_INFO Utf8, + PRIMARY KEY (OL_W_ID, OL_D_ID, OL_O_ID, OL_NUMBER) + ) WITH ( + AUTO_PARTITIONING_BY_LOAD = DISABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s, + PARTITION_AT_KEYS = (%s) + ); + )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str()); + + return query; +} + +std::string TOrderLineLoadQueryGenerator::GetCleanDDL() { + std::string cleanQuery = "DROP TABLE `order_line`;"; + + return cleanQuery; +} + +NYdb::TValue TOrderLineLoadQueryGenerator::GetNextBatchLoadDDL() { + NYdb::TValueBuilder rows; + if (Finished()) { + return rows.Build(); + } + + rows.BeginList(); + for (i32 id = 1; id <= Params.LoadBatchSize; ++id) { + rows.AddListItem().BeginStruct(); + rows.AddMember("OL_W_ID").Int32(WarehouseId); + rows.AddMember("OL_D_ID").Int32(DistrictId); + rows.AddMember("OL_O_ID").Int32(CustomerId); + rows.AddMember("OL_NUMBER").Int32(OlNumber); + + ui64 itemId = UniformRandom32(1, ETPCCWorkloadConstants::TPCC_ITEM_COUNT, Rng); + rows.AddMember("OL_I_ID").Int32(itemId); + if (itemId < ETPCCWorkloadConstants::TPCC_FIRST_UNPROCESSED_O_ID) { + rows.AddMember("OL_DELIVERY_D").Timestamp(Now()); + rows.AddMember("OL_AMOUNT").Double(0.0); + } else { + rows.AddMember("OL_DELIVERY_D").Timestamp(TInstant::Zero()); + rows.AddMember("OL_AMOUNT").Double(static_cast<double>(UniformRandom32(1, 999999, Rng)) / 100.0); + } + + rows.AddMember("OL_SUPPLY_W_ID").Int32(WarehouseId); + rows.AddMember("OL_QUANTITY").Double(5.0); + rows.AddMember("OL_DIST_INFO").Utf8(RandomString(24)); + rows.EndStruct(); + + OlNumber++; + if (OlNumber > RandomCount) { + CustomerId++; + OlNumber = 1; + RandomCount = UniformRandom32(5, 15, Rng); + if (CustomerId > ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) { + CustomerId = 1; + DistrictId++; + if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) { + DistrictId = 1; + WarehouseId++; + if (Finished()) { + break; + } + } + } + } + } + rows.EndList(); + + return rows.Build(); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/order_line.h b/ydb/library/workload/tpcc/load_data/order_line.h new file mode 100644 index 0000000000..7762cf9352 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/order_line.h @@ -0,0 +1,34 @@ +#pragma once + +#include "query_generator.h" + + +namespace NYdbWorkload { +namespace NTPCC { + +class TOrderLineLoadQueryGenerator : public TLoadQueryGenerator { +public: + + TOrderLineLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed); + + static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys); + + static std::string GetCleanDDL(); + + NYdb::TValue GetNextBatchLoadDDL() override; + + bool Finished() override; + +private: + ui32 PartNum; + i32 WarehouseId; + i32 DistrictId; + i32 CustomerId; + ui64 RandomCount; + ui64 OlNumber; + + std::uniform_int_distribution<ui64> RandomCountGen; +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/query_generator.cpp b/ydb/library/workload/tpcc/load_data/query_generator.cpp new file mode 100644 index 0000000000..d5cc5bec67 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/query_generator.cpp @@ -0,0 +1,35 @@ +#include "query_generator.h" + +namespace NYdbWorkload { +namespace NTPCC { + +TLoadQueryGenerator::TLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed) + : Params(params) + , Rng(seed, 0) +{ +} + +TTPCCWorkloadParams TLoadQueryGenerator::GetParams() { + return Params; +} + +TString TLoadQueryGenerator::RandomString(ui64 length) { + char str[length + 1]; + for (ui64 i = 0; i < length; ++i) { + str[i] = static_cast<char>(UniformRandom32('A', 'z', Rng)); + } + str[length] = '\0'; + return TString(str); +} + +TString TLoadQueryGenerator::RandomNumberString(ui64 length) { + char str[length + 1]; + for (ui64 i = 0; i < length; ++i) { + str[i] = static_cast<char>(UniformRandom32('0', '9', Rng)); + } + str[length] = '\0'; + return TString(str); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/query_generator.h b/ydb/library/workload/tpcc/load_data/query_generator.h new file mode 100644 index 0000000000..537e7c7da3 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/query_generator.h @@ -0,0 +1,36 @@ +#pragma once + +#include <ydb/library/workload/tpcc/tpcc_config.h> +#include <ydb/library/workload/tpcc/tpcc_util.h> + +#include <util/random/fast.h> + +#include <random> + +namespace NYdbWorkload { +namespace NTPCC { + +class TLoadQueryGenerator { +public: + + TLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed); + + virtual ~TLoadQueryGenerator() = default; + + virtual NYdb::TValue GetNextBatchLoadDDL() = 0; + + virtual bool Finished() = 0; + + TTPCCWorkloadParams GetParams(); + +protected: + TString RandomString(ui64 length); + TString RandomNumberString(ui64 length); + + TTPCCWorkloadParams Params; + + TFastRng32 Rng; +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/stock.cpp b/ydb/library/workload/tpcc/load_data/stock.cpp new file mode 100644 index 0000000000..67c4904d1d --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/stock.cpp @@ -0,0 +1,115 @@ +#include "stock.h" + +#include <util/string/printf.h> + +#include <sstream> + +namespace NYdbWorkload { +namespace NTPCC { + +TStockLoadQueryGenerator::TStockLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed) + : TLoadQueryGenerator(params, seed) + , PartNum(partNum) +{ + WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1; + ItemId = 1; +} + +bool TStockLoadQueryGenerator::Finished() { + i32 whEnd = PartNum * Params.Warehouses / Params.Threads; + return (WarehouseId > whEnd); +} + +TString TStockLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) { + std::string partNumS = std::to_string(partNum); + TString query = Sprintf(R"( + CREATE TABLE `%s/stock` ( + S_W_ID Int32 NOT NULL, + S_I_ID Int32 NOT NULL, + S_QUANTITY Int32, + S_YTD Double, + S_ORDER_CNT Int32, + S_REMOTE_CNT Int32, + S_DATA Utf8, + S_DIST_01 Utf8, + S_DIST_02 Utf8, + S_DIST_03 Utf8, + S_DIST_04 Utf8, + S_DIST_05 Utf8, + S_DIST_06 Utf8, + S_DIST_07 Utf8, + S_DIST_08 Utf8, + S_DIST_09 Utf8, + S_DIST_10 Utf8, + PRIMARY KEY (S_W_ID, S_I_ID) + ) WITH ( + AUTO_PARTITIONING_BY_LOAD = DISABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s, + PARTITION_AT_KEYS = (%s) + ); + )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str()); + + return query; +} + +std::string TStockLoadQueryGenerator::GetCleanDDL() { + std::string cleanQuery = "DROP TABLE `stock`;"; + + return cleanQuery; +} + +NYdb::TValue TStockLoadQueryGenerator::GetNextBatchLoadDDL() { + NYdb::TValueBuilder rows; + if (Finished()) { + return rows.Build(); + } + + std::uniform_int_distribution<ui64> quantityGen(10, 100); + std::uniform_int_distribution<ui64> dataLengthGen(26, 50); + std::uniform_int_distribution<ui64> dataTypeGen(1, 100); + + rows.BeginList(); + for (i32 id = 1; id <= Params.LoadBatchSize; ++id) { + rows.AddListItem().BeginStruct(); + rows.AddMember("S_W_ID").Int32(WarehouseId); + rows.AddMember("S_I_ID").Int32(ItemId); + rows.AddMember("S_QUANTITY").Int32(UniformRandom32(10, 100, Rng)); + rows.AddMember("S_ORDER_CNT").Int32(0); + rows.AddMember("S_REMOTE_CNT").Int32(0); + + ui64 dataType = UniformRandom32(1, 100, Rng); + ui64 length = UniformRandom32(26, 50, Rng); + if (dataType > 10) { + // 90% of time i_data isa random string of length [26 .. 50] + rows.AddMember("S_DATA").Utf8(RandomString(length)); + } else { + // 10% of time i_data has "ORIGINAL" crammed somewhere in middle + ui64 placeForOriginal = UniformRandom32(1, length - 9, Rng); + rows.AddMember("S_DATA").Utf8( + RandomString(placeForOriginal) + "ORIGINAL" + RandomString(length - 8 - placeForOriginal) + ); + } + + for (int distNum = 1; distNum <= 9; ++distNum) { + rows.AddMember("S_DIST_0" + std::to_string(distNum)).Utf8(RandomString(24)); + } + rows.AddMember("S_DIST_10").Utf8(RandomString(24)); + rows.EndStruct(); + + ItemId++; + if (ItemId > ETPCCWorkloadConstants::TPCC_ITEM_COUNT) { + ItemId = 1; + WarehouseId++; + if (Finished()) { + break; + } + } + } + rows.EndList(); + + return rows.Build(); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/stock.h b/ydb/library/workload/tpcc/load_data/stock.h new file mode 100644 index 0000000000..3e725062cf --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/stock.h @@ -0,0 +1,29 @@ +#pragma once + +#include "query_generator.h" + + +namespace NYdbWorkload { +namespace NTPCC { + +class TStockLoadQueryGenerator : public TLoadQueryGenerator { +public: + + TStockLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed); + + static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys); + + static std::string GetCleanDDL(); + + NYdb::TValue GetNextBatchLoadDDL() override; + + bool Finished() override; + +private: + ui32 PartNum; + i32 WarehouseId; + i32 ItemId; +}; + +} +} diff --git a/ydb/library/workload/tpcc/load_data/warehouse.cpp b/ydb/library/workload/tpcc/load_data/warehouse.cpp new file mode 100644 index 0000000000..63784ddb37 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/warehouse.cpp @@ -0,0 +1,77 @@ +#include "warehouse.h" + +#include <util/string/printf.h> + +#include <sstream> + +namespace NYdbWorkload { +namespace NTPCC { + +TWarehouseLoadQueryGenerator::TWarehouseLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed) + : TLoadQueryGenerator(params, seed) +{ + WarehouseId = 1; +} + +bool TWarehouseLoadQueryGenerator::Finished() { + return (WarehouseId > Params.Warehouses); +} + +TString TWarehouseLoadQueryGenerator::GetCreateDDL(TString path) { + TString query = Sprintf(R"( + CREATE TABLE `%s/warehouse` ( + W_ID Int32 NOT NULL, + W_YTD Double, + W_TAX Double, + W_NAME Utf8, + W_STREET_1 Utf8, + W_STREET_2 Utf8, + W_CITY Utf8, + W_STATE Utf8, + W_ZIP Utf8, + PRIMARY KEY (W_ID) + ) WITH ( + AUTO_PARTITIONING_BY_LOAD = DISABLED + ); + )", path.c_str()); + + return query; +} + +std::string TWarehouseLoadQueryGenerator::GetCleanDDL() { + std::string cleanQuery = "DROP TABLE `warehouse`;"; + + return cleanQuery; +} + +NYdb::TValue TWarehouseLoadQueryGenerator::GetNextBatchLoadDDL() { + NYdb::TValueBuilder rows; + if (Finished()) { + return rows.Build(); + } + + i32 whEnd = std::min(Params.Warehouses, WarehouseId + Params.LoadBatchSize - 1); + + rows.BeginList(); + for (i32 row = WarehouseId; row <= whEnd; ++row) { + rows.AddListItem().BeginStruct(); + rows.AddMember("W_ID").Int32(row); + rows.AddMember("W_YTD").Double(300000); + rows.AddMember("W_TAX").Double(static_cast<double>(UniformRandom32(0, 2000, Rng)) / 10000.0); + rows.AddMember("W_NAME").Utf8(RandomString(UniformRandom32(6, 10, Rng))); + rows.AddMember("W_STREET_1").Utf8(RandomString(UniformRandom32(10, 20, Rng))); + rows.AddMember("W_STREET_2").Utf8(RandomString(UniformRandom32(10, 20, Rng))); + rows.AddMember("W_CITY").Utf8(RandomString(UniformRandom32(10, 20, Rng))); + rows.AddMember("W_STATE").Utf8(RandomString(2)); + rows.AddMember("W_ZIP").Utf8("123456789"); + rows.EndStruct(); + } + rows.EndList(); + + WarehouseId = whEnd + 1; + + return rows.Build(); +} + +} +} diff --git a/ydb/library/workload/tpcc/load_data/warehouse.h b/ydb/library/workload/tpcc/load_data/warehouse.h new file mode 100644 index 0000000000..4e1dfe5ee6 --- /dev/null +++ b/ydb/library/workload/tpcc/load_data/warehouse.h @@ -0,0 +1,27 @@ +#pragma once + +#include "query_generator.h" + + +namespace NYdbWorkload { +namespace NTPCC { + +class TWarehouseLoadQueryGenerator : public TLoadQueryGenerator { +public: + + TWarehouseLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed); + + static TString GetCreateDDL(TString path); + + static std::string GetCleanDDL(); + + NYdb::TValue GetNextBatchLoadDDL() override; + + bool Finished() override; + +private: + i32 WarehouseId; +}; + +} +} diff --git a/ydb/library/workload/tpcc/tpcc_config.h b/ydb/library/workload/tpcc/tpcc_config.h new file mode 100644 index 0000000000..47bf656be1 --- /dev/null +++ b/ydb/library/workload/tpcc/tpcc_config.h @@ -0,0 +1,73 @@ +#pragma once + +#include <ydb/library/workload/workload_query_generator.h> + +namespace NYdbWorkload { +namespace NTPCC { + +static const std::vector<std::string> TPCCNameTokens { + "BAR", "OUGHT", "ABLE", "PRI", + "PRES", "ESE", "ANTI", "CALLY", + "ATION", "EING" +}; + +enum ETPCCWorkloadConstants : i32 { + TPCC_WAREHOUSES = 10, + TPCC_DIST_PER_WH = 10, + TPCC_CUST_PER_DIST = 3000, + TPCC_ITEM_COUNT = 100000, + TPCC_TERMINAL_PER_WH = 10, + + TPCC_THREADS = 10, + TPCC_RETRY_COUNT = 10, + TPCC_MIN_PARTITIONS = 50, + TPCC_MAX_PARTITIONS = 500000, + TPCC_AUTO_PARTITIONING = true, + TPCC_LOAD_BATCH_SIZE = 128, + + TPCC_NEWORDER_WEIGHT = 45, + TPCC_PAYMENT_WEIGHT = 43, + TPCC_ORDERSTATUS_WEIGHT = 4, + TPCC_DELIVERY_WEIGHT = 4, + TPCC_STOCKLEVEL_WEIGHT = 4, + + TPCC_NEWORDER_PRE_EXEC_WAIT = 18000, + TPCC_NEWORDER_POST_EXEC_WAIT = 12000, + TPCC_PAYMENT_PRE_EXEC_WAIT = 3000, + TPCC_PAYMENT_POST_EXEC_WAIT = 12000, + TPCC_ORDERSTATUS_PRE_EXEC_WAIT = 2000, + TPCC_ORDERSTATUS_POST_EXEC_WAIT = 10000, + TPCC_DELIVERY_PRE_EXEC_WAIT = 2000, + TPCC_DELIVERY_POST_EXEC_WAIT = 5000, + TPCC_STOCKLEVEL_PRE_EXEC_WAIT = 2000, + TPCC_STOCKLEVEL_POST_EXEC_WAIT = 5000, + TPCC_WORKLOAD_DURATION_SECONDS = 600, + + TPCC_FIRST_UNPROCESSED_O_ID = 2101, + TPCC_C_LAST_LOAD_C = 157, + TPCC_C_LAST_RUN_C = 223, + TPCC_INVALID_ITEM_ID = -12345, +}; + +struct TTPCCWorkloadParams : public TWorkloadParams { + i32 Warehouses = ETPCCWorkloadConstants::TPCC_WAREHOUSES; + i32 Threads = ETPCCWorkloadConstants::TPCC_THREADS; + i32 MinPartitions = ETPCCWorkloadConstants::TPCC_MIN_PARTITIONS; + i32 MaxPartitions = ETPCCWorkloadConstants::TPCC_MAX_PARTITIONS; + bool AutoPartitioning = ETPCCWorkloadConstants::TPCC_AUTO_PARTITIONING; + i32 LoadBatchSize = ETPCCWorkloadConstants::TPCC_LOAD_BATCH_SIZE; + i32 RetryCount = ETPCCWorkloadConstants::TPCC_RETRY_COUNT; + i32 Duration = ETPCCWorkloadConstants::TPCC_WORKLOAD_DURATION_SECONDS; + i32 OrderLineIdConst; +}; + +enum ETPCCProcedureType { + NewOrder, + Payment, + Delivery, + OrderStatus, + StockLevel +}; + +} +} diff --git a/ydb/library/workload/tpcc/tpcc_thread_resource.h b/ydb/library/workload/tpcc/tpcc_thread_resource.h new file mode 100644 index 0000000000..7cd7e7cd50 --- /dev/null +++ b/ydb/library/workload/tpcc/tpcc_thread_resource.h @@ -0,0 +1,22 @@ +#pragma once + +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +namespace NYdbWorkload { +namespace NTPCC { + +struct TThreadResource { + std::shared_ptr<NYdb::NTable::TTableClient> Client; + std::shared_ptr<NYdb::NTable::TSession> Session; + TThreadResource(std::shared_ptr<NYdb::NTable::TTableClient>&& client) + : Client(std::move(client)) + { + auto status = Client->CreateSession().GetValueSync(); + NYdb::NConsoleClient::ThrowOnError(status); + Session = std::make_shared<NYdb::NTable::TSession>(status.GetSession()); + } +}; + +} +} diff --git a/ydb/library/workload/tpcc/tpcc_util.h b/ydb/library/workload/tpcc/tpcc_util.h new file mode 100644 index 0000000000..051a62a7ba --- /dev/null +++ b/ydb/library/workload/tpcc/tpcc_util.h @@ -0,0 +1,39 @@ +#pragma once + +#include <ydb/library/workload/tpcc/tpcc_config.h> + +#include <util/random/fast.h> + +namespace NYdbWorkload { +namespace NTPCC { + +inline ui32 UniformRandom32(ui32 min, ui32 max, TFastRng32& rng) { + return (rng.GenRand() % (max - min + 1) + min); +} + +inline ui32 NonUniformRandom32(ui32 maxOr, ui32 delta, ui32 min, ui32 max, TFastRng32& rng) { + return ((UniformRandom32(0, maxOr, rng) | UniformRandom32(min, max, rng)) + delta) % (max - min + 1) + min; +} + +inline std::string GetLastName(ui64 num) { + return TPCCNameTokens[num / 100] + TPCCNameTokens[(num / 10) % 10] + TPCCNameTokens[num % 10]; +} + +inline std::string GetNonUniformRandomLastNameForLoad(TFastRng32& rng) { + return GetLastName(NonUniformRandom32(255, ETPCCWorkloadConstants::TPCC_C_LAST_LOAD_C, 0, 999, rng)); +} + +inline std::string GetNonUniformRandomLastNameForRun(TFastRng32& rng) { + return GetLastName(NonUniformRandom32(255, ETPCCWorkloadConstants::TPCC_C_LAST_RUN_C, 0, 999, rng)); +} + +inline ui32 GetNonUniformCustomerId(ui32 delta, TFastRng32& rng) { + return NonUniformRandom32(1023, delta, 1, ETPCCWorkloadConstants::TPCC_CUST_PER_DIST, rng); +} + +inline ui32 GetNonUniformItemId(ui32 delta, TFastRng32& rng) { + return NonUniformRandom32(8191, delta, 1, ETPCCWorkloadConstants::TPCC_CUST_PER_DIST, rng); +} + +} +} diff --git a/ydb/library/workload/tpcc/tpcc_workload.cpp b/ydb/library/workload/tpcc/tpcc_workload.cpp new file mode 100644 index 0000000000..991322c892 --- /dev/null +++ b/ydb/library/workload/tpcc/tpcc_workload.cpp @@ -0,0 +1,281 @@ +#include "tpcc_workload.h" + +#include <ydb/library/workload/tpcc/tpcc_util.h> +#include <ydb/library/workload/tpcc/tpcc_thread_resource.h> +#include <ydb/library/workload/tpcc/load_data/customer.h> +#include <ydb/library/workload/tpcc/load_data/district.h> +#include <ydb/library/workload/tpcc/load_data/history.h> +#include <ydb/library/workload/tpcc/load_data/item.h> +#include <ydb/library/workload/tpcc/load_data/new_order.h> +#include <ydb/library/workload/tpcc/load_data/oorder.h> +#include <ydb/library/workload/tpcc/load_data/order_line.h> +#include <ydb/library/workload/tpcc/load_data/stock.h> +#include <ydb/library/workload/tpcc/load_data/warehouse.h> + +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +#include <util/string/printf.h> +#include <library/cpp/logger/stream_creator.h> + + +namespace NYdbWorkload { +namespace NTPCC { + +TTPCCWorkload::TTPCCWorkload(std::shared_ptr<NYdb::TDriver>& driver, TTPCCWorkloadParams& params) + : Params(params) + , Driver(driver) + , Log(TCoutLogBackendCreator().CreateLogBackend()) + , Seed(Now().MicroSeconds()) + , Rng(Seed) +{ + std::string message = "Seed: " + std::to_string(Seed) + "\n"; + Log.Write(message.c_str(), message.size()); +} + +TString TTPCCWorkload::CreateTablesQuery() { + TStringStream query; + query << "--!syntax_v1"; + TString whParts = PartitionByKeys(Params.Warehouses, Params.Threads); + query << TWarehouseLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath)); + query << TItemLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath)); + query << TDistrictLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath)); + query << TStockLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts); + query << TCustomerLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts); + query << TOorderLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts); + query << TOrderLineLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts); + query << TNewOrderLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts); + query << THistoryLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts); + return query.Str(); +} + +TString TTPCCWorkload::CleanTablesQuery() { + TStringStream query; + query << THistoryLoadQueryGenerator::GetCleanDDL(); + query << TNewOrderLoadQueryGenerator::GetCleanDDL(); + query << TOrderLineLoadQueryGenerator::GetCleanDDL(); + query << TOorderLoadQueryGenerator::GetCleanDDL(); + query << TCustomerLoadQueryGenerator::GetCleanDDL(); + query << TDistrictLoadQueryGenerator::GetCleanDDL(); + query << TStockLoadQueryGenerator::GetCleanDDL(); + query << TItemLoadQueryGenerator::GetCleanDDL(); + query << TWarehouseLoadQueryGenerator::GetCleanDDL(); + return query.Str(); +} + +int TTPCCWorkload::InitTables() { + try { + // Create Tables + + std::string message = "Creating tables...\n"; + Log.Write(message.c_str(), message.size()); + + LoadThreadPool = std::make_unique<TLoadThreadPool>(Driver); + + auto tableClientSettings = NTable::TClientSettings() + .SessionPoolSettings( + NTable::TSessionPoolSettings() + .MaxActiveSessions(1)); + + auto db = std::make_unique<NTable::TTableClient>(*Driver, tableClientSettings); + + auto sessionResult = db->GetSession().GetValueSync(); + ThrowOnError(sessionResult); + + auto result = sessionResult.GetSession().ExecuteSchemeQuery(CreateTablesQuery()).GetValueSync(); + ThrowOnError(result); + + message = "DONE.\nLoading data...\n"; + Log.Write(message.c_str(), message.size()); + + auto started = Now(); + + + // Load Data + + auto genParams = Params; + LoadThreadPool->Start(Params.Threads); + + genParams.DbPath = Params.DbPath + "/warehouse"; + LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>( + std::make_unique<TWarehouseLoadQueryGenerator>(genParams, Rng.GenRand())) + ); + + genParams.DbPath = Params.DbPath + "/item"; + LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>( + std::make_unique<TItemLoadQueryGenerator>(genParams, Rng.GenRand())) + ); + + genParams.DbPath = Params.DbPath + "/district"; + LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>( + std::make_unique<TDistrictLoadQueryGenerator>(genParams, Rng.GenRand())) + ); + + for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) { + genParams.DbPath = Params.DbPath + "/customer"; + LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>( + std::make_unique<TCustomerLoadQueryGenerator>(genParams, threadNum, Rng.GenRand())) + ) +; + } + for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) { + genParams.DbPath = Params.DbPath + "/history"; + LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>( + std::make_unique<THistoryLoadQueryGenerator>(genParams, threadNum, Rng.GenRand())) + ) +; + } + for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) { + genParams.DbPath = Params.DbPath + "/new_order"; + LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>( + std::make_unique<TNewOrderLoadQueryGenerator>(genParams, threadNum, Rng.GenRand())) + ) +; + } + for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) { + genParams.DbPath = Params.DbPath + "/oorder"; + LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>( + std::make_unique<TOorderLoadQueryGenerator>(genParams, threadNum, Rng.GenRand())) + ) +; + } + for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) { + genParams.DbPath = Params.DbPath + "/stock"; + LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>( + std::make_unique<TStockLoadQueryGenerator>(genParams, threadNum, Rng.GenRand())) + ) +; + } + for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) { + genParams.DbPath = Params.DbPath + "/order_line"; + LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>( + std::make_unique<TOrderLineLoadQueryGenerator>(genParams, threadNum, Rng.GenRand())) + ) +; + } + LoadThreadPool->Stop(); + + message = "Duration:" + (Now() - started).ToString() + "\nDONE\nConfiguring partitioning\n"; + Log.Write(message.c_str(), message.size()); + + ConfigurePartitioning(); + + message = "DONE\n"; + Log.Write(message.c_str(), message.size()); + + } catch (const TYdbErrorException& ex) { + Cerr << ex; + return EXIT_FAILURE; + } catch (...) { + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} + +int TTPCCWorkload::CleanTables() { + try { + auto db = NTable::TTableClient(*Driver); + + auto sessionResult = db.GetSession().GetValueSync(); + ThrowOnError(sessionResult); + + auto result = sessionResult.GetSession().ExecuteSchemeQuery(CleanTablesQuery()).GetValueSync(); + ThrowOnError(result); + } catch (TYdbErrorException ex) { + Cerr << ex; + return EXIT_FAILURE; + } catch (...) { + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} + +void TTPCCWorkload::ConfigurePartitioning() { + static const std::vector<TString> heavyTables { + "customer", "history", "new_order", + "oorder", "order_line", "stock" + }; + + auto tableClientSettings = NTable::TClientSettings() + .SessionPoolSettings( + NTable::TSessionPoolSettings() + .MaxActiveSessions(1)); + + TThreadResource rsc(std::make_shared<NTable::TTableClient>(*Driver, tableClientSettings)); + + TString query; + for (auto table: heavyTables) { + query += Sprintf(R"( + ALTER TABLE `%s` SET ( + AUTO_PARTITIONING_BY_LOAD = %s, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s + ); + )", table.c_str(), Params.AutoPartitioning ? "ENABLED" : "DISABLED" + , ToString(Params.MinPartitions).c_str(), ToString(Params.MaxPartitions).c_str()); + } + + TStatus result = rsc.Session->ExecuteSchemeQuery(query.c_str()).GetValueSync(); + if (!result.IsSuccess()) { + ythrow yexception() << "Configuring partitioning failed with issues: " << result.GetIssues().ToString(); + } + + std::vector<ui32> prevPartition(heavyTables.size()); + TVector<NThreading::TFuture<void>> toWait; + + while (true) { + toWait.clear(); + + std::atomic_bool done = true; + for (ui32 i = 0; i < heavyTables.size(); ++i) { + NThreading::TFuture<void> future = rsc.Session->DescribeTable( + Params.DbPath + "/" + heavyTables[i], + NTable::TDescribeTableSettings().WithTableStatistics(true) + ).Apply( + [&prevPartition, i, &done](const NTable::TAsyncDescribeTableResult& future) { + NTable::TDescribeTableResult result = future.GetValueSync(); + + if (!result.IsSuccess()) { + ythrow yexception() << "Getting describe table failed with issues: " << result.GetIssues().ToString(); + } + + ui32 partCount = result.GetTableDescription().GetPartitionsCount(); + if (partCount != prevPartition[i]) { + prevPartition[i] = partCount; + done.store(false); + } + } + ); + toWait.emplace_back(std::move(future)); + } + + NThreading::WaitExceptionOrAll(toWait).GetValueSync(); + + if (done.load()) { + break; + } else { + Sleep(TDuration::Seconds(1)); + } + } +} + +TString TTPCCWorkload::PartitionByKeys(i32 keysCount, i32 partsCount) { + TStringStream partition; + for (i32 i = 1; i < partsCount; i++) { + partition << i * keysCount / partsCount; + if (i != partsCount - 1) { + partition << ", "; + } + } + return partition.Str(); +} + +int TTPCCWorkload::RunWorkload() { + Params.OrderLineIdConst = Rng.GenRand() % 8192; + return EXIT_FAILURE; +} + +} +} diff --git a/ydb/library/workload/tpcc/tpcc_workload.h b/ydb/library/workload/tpcc/tpcc_workload.h new file mode 100644 index 0000000000..485e571bd2 --- /dev/null +++ b/ydb/library/workload/tpcc/tpcc_workload.h @@ -0,0 +1,50 @@ +#pragma once + +#include "tpcc_config.h" + +#include <ydb/library/workload/tpcc/load_data/load_thread_pool.h> + +#include <ydb/public/lib/ydb_cli/common/command.h> +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> + +#include <library/cpp/logger/log.h> + + +namespace NYdbWorkload { +namespace NTPCC { + +using namespace NYdb; +using namespace NConsoleClient; + +class TTPCCWorkload { +public: + TTPCCWorkload(std::shared_ptr<TDriver>& driver, TTPCCWorkloadParams& params); + + ~TTPCCWorkload() = default; + + int InitTables(); + + int CleanTables(); + + int RunWorkload(); + +private: + void ConfigurePartitioning(); + + TString CreateTablesQuery(); + TString CleanTablesQuery(); + + TString PartitionByKeys(i32 keysCount, i32 partsCount); + + TTPCCWorkloadParams Params; + std::unique_ptr<TLoadThreadPool> LoadThreadPool; + + std::shared_ptr<TDriver> Driver; + + TLog Log; + ui64 Seed; + TFastRng64 Rng; +}; + +} +} diff --git a/ydb/library/workload/workload_factory.h b/ydb/library/workload/workload_factory.h index a786fae2c4..42f42e7035 100644 --- a/ydb/library/workload/workload_factory.h +++ b/ydb/library/workload/workload_factory.h @@ -9,6 +9,7 @@ namespace NYdbWorkload { enum class EWorkload { STOCK, KV, + TPCC, }; class TWorkloadFactory { diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make index 1e593d3d97..42b58b4059 100644 --- a/ydb/library/workload/ya.make +++ b/ydb/library/workload/ya.make @@ -4,8 +4,22 @@ SRCS( stock_workload.cpp kv_workload.cpp workload_factory.cpp + tpcc/tpcc_workload.cpp + tpcc/load_data/customer.cpp + tpcc/load_data/district.cpp + tpcc/load_data/history.cpp + tpcc/load_data/item.cpp + tpcc/load_data/new_order.cpp + tpcc/load_data/oorder.cpp + tpcc/load_data/order_line.cpp + tpcc/load_data/query_generator.cpp + tpcc/load_data/stock.cpp + tpcc/load_data/warehouse.cpp + tpcc/load_data/load_thread_pool.cpp ) +GENERATE_ENUM_SERIALIZATION(tpcc/tpcc_config.h) + PEERDIR( ydb/public/api/protos ydb/public/sdk/cpp/client/ydb_table diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt index 2dedacd78f..7096310791 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt @@ -60,6 +60,7 @@ target_sources(clicommands PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_write_scenario.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpch.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_sdk_core_access.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt index 6ff42c7956..fb96913a4d 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt @@ -61,6 +61,7 @@ target_sources(clicommands PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_write_scenario.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpch.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_sdk_core_access.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt index 6ff42c7956..fb96913a4d 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt @@ -61,6 +61,7 @@ target_sources(clicommands PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_write_scenario.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpch.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_sdk_core_access.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt index 2dedacd78f..7096310791 100644 --- a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt +++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt @@ -60,6 +60,7 @@ target_sources(clicommands PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_write_scenario.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpch.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_sdk_core_access.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp diff --git a/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp b/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp new file mode 100644 index 0000000000..53f4b5e6d8 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp @@ -0,0 +1,121 @@ +#include "tpcc_workload.h" + +#include <ydb/library/workload/tpcc/tpcc_workload.h> + +namespace NYdb { +namespace NConsoleClient { + +TCommandTPCCWorkload::TCommandTPCCWorkload() + : TClientCommandTree("tpcc", {}, "YDB TPCC workload") +{ + AddCommand(std::make_unique<TTPCCInitCommand>()); + AddCommand(std::make_unique<TTPCCCleanCommand>()); +} + +TTPCCWorkloadCommand::TTPCCWorkloadCommand(const TString& name, const std::initializer_list<TString>& aliases, const TString& description) + : TYdbCommand(name, aliases, description) + , ClientTimeoutMs(0) + , OperationTimeoutMs(0) + , CancelAfterTimeoutMs(0) + , WindowDurationSec(0) + , Quiet(false) + , PrintTimestamp(false) + , WindowHist(60000, 2) // highestTrackableValue 60000ms = 60s, precision 2 + , TotalHist(60000, 2) + , TotalRetries(0) + , WindowRetryCount(0) + , TotalErrors(0) + , WindowErrors(0) {} + +struct TTPCCWorkloadStats { + ui64 OpsCount = 0; + ui64 Percentile50 = 0; + ui64 Percentile95 = 0; + ui64 Percentile99 = 0; + ui64 Percentile100 = 0; +}; + +void TTPCCWorkloadCommand::Config(TConfig& config) { + TYdbCommand::Config(config); + + config.Opts->AddLongOption("quiet", "Quiet mode. Doesn't print statistics each second.") + .StoreTrue(&Quiet); + config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.") + .StoreTrue(&PrintTimestamp); + config.Opts->AddLongOption("client-timeout", "Client timeout in ms.") + .DefaultValue(1000).StoreResult(&ClientTimeoutMs); + config.Opts->AddLongOption("operation-timeout", "Operation timeout in ms.") + .DefaultValue(800).StoreResult(&OperationTimeoutMs); + config.Opts->AddLongOption("cancel-after", "Cancel after timeout in ms.") + .DefaultValue(800).StoreResult(&CancelAfterTimeoutMs); + config.Opts->AddLongOption("window", "Window duration in seconds.") + .DefaultValue(1).StoreResult(&WindowDurationSec); +} + +void TTPCCWorkloadCommand::PrepareForRun(TConfig& config) { + auto driverConfig = TDriverConfig() + .SetEndpoint(config.Address) + .SetDatabase(config.Database) + .SetBalancingPolicy(EBalancingPolicy::UseAllNodes) + .SetCredentialsProviderFactory(config.CredentialsGetter(config)); + Params.DbPath = config.Database; + + if (config.EnableSsl) { + driverConfig.UseSecureConnection(config.CaCerts); + } + Driver = std::make_shared<NYdb::TDriver>(NYdb::TDriver(driverConfig)); + Workload = std::unique_ptr<TTPCCWorkload>(new TTPCCWorkload(Driver, Params)); +} + +TTPCCInitCommand::TTPCCInitCommand() + : TTPCCWorkloadCommand("init", {}, "Create and initialize tables for TPCC workload") {} + +void TTPCCInitCommand::Config(TConfig& config) { + TTPCCWorkloadCommand::Config(config); + + config.SetFreeArgsNum(0); + + config.Opts->AddLongOption('w', "warehouses", "The number of TPC-C warehouses.") + .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_WAREHOUSES)).StoreResult(&Params.Warehouses); + config.Opts->AddLongOption('t', "threads", "Number of parallel threads in workload.") + .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_THREADS)).StoreResult(&Params.Threads); + config.Opts->AddLongOption("load-batch-size", "The size of the batch of queries with which data is loaded.") + .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_LOAD_BATCH_SIZE)).StoreResult(&Params.LoadBatchSize); + config.Opts->AddLongOption("min-partitions", "Minimum partitioning of tables.") + .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_MIN_PARTITIONS)).StoreResult(&Params.MinPartitions); + config.Opts->AddLongOption("max-partitions", "Maximum partitioning of tables.") + .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_MAX_PARTITIONS)).StoreResult(&Params.MaxPartitions); + config.Opts->AddLongOption("auto-partitioning", "Enable auto partitioning by load.") + .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_AUTO_PARTITIONING)).StoreResult(&Params.AutoPartitioning, true); +} + +int TTPCCInitCommand::Run(TConfig& config) { + PrepareForRun(config); + Cout << "INIT COMMAND\n"; + Cout << "Warehouses: " << Params.Warehouses << "\nThreads: " << Params.Threads << "\n"; + return Workload->InitTables(); +} + +void TTPCCInitCommand::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +TTPCCCleanCommand::TTPCCCleanCommand() + : TTPCCWorkloadCommand("clean", {}, "Drop the TPCC tables.") {} + +void TTPCCCleanCommand::Config(TConfig& config) { + TTPCCWorkloadCommand::Config(config); + config.SetFreeArgsNum(0); +} + +int TTPCCCleanCommand::Run(TConfig& config) { + PrepareForRun(config); + return Workload->CleanTables(); +} + +void TTPCCCleanCommand::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +} +} diff --git a/ydb/public/lib/ydb_cli/commands/tpcc_workload.h b/ydb/public/lib/ydb_cli/commands/tpcc_workload.h new file mode 100644 index 0000000000..e8a73f4c7c --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/tpcc_workload.h @@ -0,0 +1,84 @@ +#pragma once + +#include <ydb/library/workload/tpcc/tpcc_config.h> +#include <ydb/library/workload/tpcc/tpcc_workload.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_command.h> + +#include <library/cpp/histogram/hdr/histogram.h> +#include <util/datetime/base.h> +#include <util/system/spinlock.h> + +#include <memory> +#include <string> + +namespace NYdb { +namespace NConsoleClient { + +using namespace NYdbWorkload; +using namespace NTPCC; + +class TCommandTPCCWorkload : public TClientCommandTree { +public: + TCommandTPCCWorkload(); +}; + +class TTPCCWorkloadCommand : public TYdbCommand { +public: + TTPCCWorkloadCommand( + const TString& name, + const std::initializer_list<TString>& aliases = std::initializer_list<TString>(), + const TString& description = TString() + ); + + void Config(TConfig& config) override; + NTable::TSession GetSession(); + +protected: + void PrepareForRun(TConfig& config); + + std::shared_ptr<NYdb::TDriver> Driver; + std::unique_ptr<NTable::TTableClient> TableClient; + + TTPCCWorkloadParams Params; + std::unique_ptr<TTPCCWorkload> Workload; + unsigned int ClientTimeoutMs; + unsigned int OperationTimeoutMs; + unsigned int CancelAfterTimeoutMs; + unsigned int WindowDurationSec; + bool Quiet; + bool PrintTimestamp; + + TInstant StartTime; + TInstant StopTime; + + // Think about moving histograms to workload library. + // Histograms will also be useful in actor system workload. + TSpinLock HdrLock; + NHdr::THistogram WindowHist; + NHdr::THistogram TotalHist; + + std::atomic_uint64_t TotalRetries; + std::atomic_uint64_t WindowRetryCount; + std::atomic_uint64_t TotalErrors; + std::atomic_uint64_t WindowErrors; +}; + +class TTPCCInitCommand : public TTPCCWorkloadCommand { +public: + TTPCCInitCommand(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; +}; + +class TTPCCCleanCommand : public TTPCCWorkloadCommand { +public: + TTPCCCleanCommand(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; +}; + +} + +} diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make index 4626a01b41..f6d63294c0 100644 --- a/ydb/public/lib/ydb_cli/commands/ya.make +++ b/ydb/public/lib/ydb_cli/commands/ya.make @@ -12,6 +12,7 @@ SRCS( topic_write_scenario.cpp topic_readwrite_scenario.cpp tpch.cpp + tpcc_workload.cpp ydb_sdk_core_access.cpp ydb_command.cpp ydb_profile.cpp diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index baf6454146..9649ed13d9 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -4,14 +4,15 @@ #include "kv_workload.h" #include "click_bench.h" #include "tpch.h" +#include "tpcc_workload.h" #include "topic_workload/topic_workload.h" #include "transfer_workload/transfer_workload.h" -#include "util/random/random.h" #include "ydb/library/yverify_stream/yverify_stream.h" #include <ydb/library/workload/workload_factory.h> #include <ydb/public/lib/ydb_cli/commands/ydb_common.h> +#include <util/random/random.h> #include <library/cpp/threading/local_executor/local_executor.h> #include <atomic> @@ -46,6 +47,7 @@ TCommandWorkload::TCommandWorkload() AddCommand(std::make_unique<TCommandWorkloadTopic>()); AddCommand(std::make_unique<TCommandWorkloadTransfer>()); AddCommand(std::make_unique<TCommandTpch>()); + AddCommand(std::make_unique<TCommandTPCCWorkload>()); } TWorkloadCommand::TWorkloadCommand(const TString& name, const std::initializer_list<TString>& aliases, const TString& description) |