aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorturlybekovol <turlybekovol@yandex-team.com>2023-08-28 12:03:55 +0300
committerturlybekovol <turlybekovol@yandex-team.com>2023-08-28 12:37:00 +0300
commitda025ccc23fafcc3a3a572d83f82996be433fa89 (patch)
tree453e8ee6eca255335f4a08be466b62313600ef5f
parentbfdb8734c1c53000f3c653c2c9218c3cca1f952e (diff)
downloadydb-da025ccc23fafcc3a3a572d83f82996be433fa89.tar.gz
Workload TPC-C init
-rw-r--r--ydb/library/workload/CMakeLists.darwin-x86_64.txt24
-rw-r--r--ydb/library/workload/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/library/workload/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/library/workload/CMakeLists.windows-x86_64.txt24
-rw-r--r--ydb/library/workload/tpcc/load_data/customer.cpp126
-rw-r--r--ydb/library/workload/tpcc/load_data/customer.h30
-rw-r--r--ydb/library/workload/tpcc/load_data/district.cpp88
-rw-r--r--ydb/library/workload/tpcc/load_data/district.h28
-rw-r--r--ydb/library/workload/tpcc/load_data/history.cpp94
-rw-r--r--ydb/library/workload/tpcc/load_data/history.h37
-rw-r--r--ydb/library/workload/tpcc/load_data/item.cpp82
-rw-r--r--ydb/library/workload/tpcc/load_data/item.h27
-rw-r--r--ydb/library/workload/tpcc/load_data/load_thread_pool.cpp83
-rw-r--r--ydb/library/workload/tpcc/load_data/load_thread_pool.h39
-rw-r--r--ydb/library/workload/tpcc/load_data/new_order.cpp85
-rw-r--r--ydb/library/workload/tpcc/load_data/new_order.h30
-rw-r--r--ydb/library/workload/tpcc/load_data/oorder.cpp102
-rw-r--r--ydb/library/workload/tpcc/load_data/oorder.h31
-rw-r--r--ydb/library/workload/tpcc/load_data/order_line.cpp111
-rw-r--r--ydb/library/workload/tpcc/load_data/order_line.h34
-rw-r--r--ydb/library/workload/tpcc/load_data/query_generator.cpp35
-rw-r--r--ydb/library/workload/tpcc/load_data/query_generator.h36
-rw-r--r--ydb/library/workload/tpcc/load_data/stock.cpp115
-rw-r--r--ydb/library/workload/tpcc/load_data/stock.h29
-rw-r--r--ydb/library/workload/tpcc/load_data/warehouse.cpp77
-rw-r--r--ydb/library/workload/tpcc/load_data/warehouse.h27
-rw-r--r--ydb/library/workload/tpcc/tpcc_config.h73
-rw-r--r--ydb/library/workload/tpcc/tpcc_thread_resource.h22
-rw-r--r--ydb/library/workload/tpcc/tpcc_util.h39
-rw-r--r--ydb/library/workload/tpcc/tpcc_workload.cpp281
-rw-r--r--ydb/library/workload/tpcc/tpcc_workload.h50
-rw-r--r--ydb/library/workload/workload_factory.h1
-rw-r--r--ydb/library/workload/ya.make14
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp121
-rw-r--r--ydb/public/lib/ydb_cli/commands/tpcc_workload.h84
-rw-r--r--ydb/public/lib/ydb_cli/commands/ya.make1
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.cpp4
41 files changed, 2135 insertions, 1 deletions
diff --git a/ydb/library/workload/CMakeLists.darwin-x86_64.txt b/ydb/library/workload/CMakeLists.darwin-x86_64.txt
index 69416c94e7..4b6d17589e 100644
--- a/ydb/library/workload/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/workload/CMakeLists.darwin-x86_64.txt
@@ -6,11 +6,18 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(ydb-library-workload)
target_link_libraries(ydb-library-workload PUBLIC
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
api-protos
cpp-client-ydb_table
)
@@ -18,4 +25,21 @@ target_sources(ydb-library-workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/customer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/district.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/history.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/item.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/new_order.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/oorder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/order_line.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/query_generator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/stock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/warehouse.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp
+)
+generate_enum_serilization(ydb-library-workload
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_config.h
+ INCLUDE_HEADERS
+ ydb/library/workload/tpcc/tpcc_config.h
)
diff --git a/ydb/library/workload/CMakeLists.linux-aarch64.txt b/ydb/library/workload/CMakeLists.linux-aarch64.txt
index d5dba36558..6006afe3cc 100644
--- a/ydb/library/workload/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/workload/CMakeLists.linux-aarch64.txt
@@ -6,12 +6,19 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(ydb-library-workload)
target_link_libraries(ydb-library-workload PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
api-protos
cpp-client-ydb_table
)
@@ -19,4 +26,21 @@ target_sources(ydb-library-workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/customer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/district.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/history.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/item.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/new_order.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/oorder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/order_line.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/query_generator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/stock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/warehouse.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp
+)
+generate_enum_serilization(ydb-library-workload
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_config.h
+ INCLUDE_HEADERS
+ ydb/library/workload/tpcc/tpcc_config.h
)
diff --git a/ydb/library/workload/CMakeLists.linux-x86_64.txt b/ydb/library/workload/CMakeLists.linux-x86_64.txt
index d5dba36558..6006afe3cc 100644
--- a/ydb/library/workload/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/workload/CMakeLists.linux-x86_64.txt
@@ -6,12 +6,19 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(ydb-library-workload)
target_link_libraries(ydb-library-workload PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
api-protos
cpp-client-ydb_table
)
@@ -19,4 +26,21 @@ target_sources(ydb-library-workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/customer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/district.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/history.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/item.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/new_order.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/oorder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/order_line.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/query_generator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/stock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/warehouse.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp
+)
+generate_enum_serilization(ydb-library-workload
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_config.h
+ INCLUDE_HEADERS
+ ydb/library/workload/tpcc/tpcc_config.h
)
diff --git a/ydb/library/workload/CMakeLists.windows-x86_64.txt b/ydb/library/workload/CMakeLists.windows-x86_64.txt
index 69416c94e7..4b6d17589e 100644
--- a/ydb/library/workload/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/workload/CMakeLists.windows-x86_64.txt
@@ -6,11 +6,18 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(ydb-library-workload)
target_link_libraries(ydb-library-workload PUBLIC
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
api-protos
cpp-client-ydb_table
)
@@ -18,4 +25,21 @@ target_sources(ydb-library-workload PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/workload/stock_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/library/workload/kv_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/library/workload/workload_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_workload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/customer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/district.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/history.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/item.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/new_order.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/oorder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/order_line.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/query_generator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/stock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/warehouse.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp
+)
+generate_enum_serilization(ydb-library-workload
+ ${CMAKE_SOURCE_DIR}/ydb/library/workload/tpcc/tpcc_config.h
+ INCLUDE_HEADERS
+ ydb/library/workload/tpcc/tpcc_config.h
)
diff --git a/ydb/library/workload/tpcc/load_data/customer.cpp b/ydb/library/workload/tpcc/load_data/customer.cpp
new file mode 100644
index 0000000000..9c491acc15
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/customer.cpp
@@ -0,0 +1,126 @@
+#include "customer.h"
+
+#include <util/string/printf.h>
+
+#include <sstream>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TCustomerLoadQueryGenerator::TCustomerLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed)
+ : TLoadQueryGenerator(params, seed)
+ , PartNum(partNum)
+{
+ WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1;
+ DistrictId = 1;
+ CustomerId = 1;
+}
+
+bool TCustomerLoadQueryGenerator::Finished() {
+ i32 whEnd = PartNum * Params.Warehouses / Params.Threads;
+ return (WarehouseId > whEnd);
+}
+
+TString TCustomerLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) {
+ std::string partNumS = std::to_string(partNum);
+ TString query = Sprintf(R"(
+ CREATE TABLE `%s/customer` (
+ C_W_ID Int32 NOT NULL,
+ C_D_ID Int32 NOT NULL,
+ C_ID Int32 NOT NULL,
+ C_DISCOUNT Double,
+ C_CREDIT Utf8,
+ C_LAST Utf8,
+ C_FIRST Utf8,
+ C_CREDIT_LIM Double,
+ C_BALANCE Double,
+ C_YTD_PAYMENT Double,
+ C_PAYMENT_CNT Int32,
+ C_DELIVERY_CNT Int32,
+ C_STREET_1 Utf8,
+ C_STREET_2 Utf8,
+ C_CITY Utf8,
+ C_STATE Utf8,
+ C_ZIP Utf8,
+ C_PHONE Utf8,
+ C_SINCE Timestamp,
+ C_MIDDLE Utf8,
+ C_DATA Utf8,
+ PRIMARY KEY (C_W_ID, C_D_ID, C_ID)
+ ) WITH (
+ AUTO_PARTITIONING_BY_LOAD = DISABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s,
+ PARTITION_AT_KEYS = (%s)
+ );
+ )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str());
+
+ return query;
+}
+
+std::string TCustomerLoadQueryGenerator::GetCleanDDL() {
+ std::string cleanQuery = "DROP TABLE `customer`;";
+
+ return cleanQuery;
+}
+
+NYdb::TValue TCustomerLoadQueryGenerator::GetNextBatchLoadDDL() {
+ NYdb::TValueBuilder rows;
+ if (Finished()) {
+ return rows.Build();
+ }
+
+ rows.BeginList();
+ for (i32 id = 1; id <= Params.LoadBatchSize; ++id) {
+ rows.AddListItem().BeginStruct();
+ rows.AddMember("C_W_ID").Int32(WarehouseId);
+ rows.AddMember("C_D_ID").Int32(DistrictId);
+ rows.AddMember("C_ID").Int32(CustomerId);
+ rows.AddMember("C_DISCOUNT").Double(static_cast<double>(UniformRandom32(1, 5000, Rng)) / 10000.0);
+ if (UniformRandom32(1, 100, Rng) <= 10) {
+ rows.AddMember("C_CREDIT").Utf8("BC");
+ } else {
+ rows.AddMember("C_CREDIT").Utf8("GC");
+ }
+ if (CustomerId <= 1000) {
+ rows.AddMember("C_LAST").Utf8(GetLastName(CustomerId - 1));
+ } else {
+ rows.AddMember("C_LAST").Utf8(GetNonUniformRandomLastNameForLoad(Rng));
+ }
+ rows.AddMember("C_FIRST").Utf8(RandomString(UniformRandom32(8, 16, Rng)));
+ rows.AddMember("C_CREDIT_LIM").Double(50000.0);
+ rows.AddMember("C_BALANCE").Double(-10.0);
+ rows.AddMember("C_YTD_PAYMENT").Double(10.0);
+ rows.AddMember("C_PAYMENT_CNT").Int32(1);
+ rows.AddMember("C_DELIVERY_CNT").Int32(0);
+ rows.AddMember("C_STREET_1").Utf8(RandomString(UniformRandom32(10, 20, Rng)));
+ rows.AddMember("C_STREET_2").Utf8(RandomString(UniformRandom32(10, 20, Rng)));
+ rows.AddMember("C_CITY").Utf8(RandomString(UniformRandom32(10, 20, Rng)));
+ rows.AddMember("C_STATE").Utf8(RandomString(2));
+ rows.AddMember("C_ZIP").Utf8(RandomNumberString(4) + "11111");
+ rows.AddMember("C_PHONE").Utf8(RandomNumberString(16));
+ rows.AddMember("C_SINCE").Timestamp(Now());
+ rows.AddMember("C_MIDDLE").Utf8("OE");
+ rows.AddMember("C_DATA").Utf8(RandomString(UniformRandom32(300, 500, Rng)));
+ rows.EndStruct();
+
+ CustomerId++;
+ if (CustomerId > ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) {
+ CustomerId = 1;
+ DistrictId++;
+ if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) {
+ DistrictId = 1;
+ WarehouseId++;
+ if (Finished()) {
+ break;
+ }
+ }
+ }
+ }
+ rows.EndList();
+
+ return rows.Build();
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/customer.h b/ydb/library/workload/tpcc/load_data/customer.h
new file mode 100644
index 0000000000..1f380d5f73
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/customer.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include "query_generator.h"
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class TCustomerLoadQueryGenerator : public TLoadQueryGenerator {
+public:
+
+ TCustomerLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed);
+
+ static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys);
+
+ static std::string GetCleanDDL();
+
+ NYdb::TValue GetNextBatchLoadDDL() override;
+
+ bool Finished() override;
+
+private:
+ ui32 PartNum;
+ i32 WarehouseId;
+ i32 DistrictId;
+ i32 CustomerId;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/district.cpp b/ydb/library/workload/tpcc/load_data/district.cpp
new file mode 100644
index 0000000000..debd5dea8e
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/district.cpp
@@ -0,0 +1,88 @@
+#include "district.h"
+
+#include <util/string/printf.h>
+
+#include <sstream>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TDistrictLoadQueryGenerator::TDistrictLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed)
+ : TLoadQueryGenerator(params, seed)
+{
+ WarehouseId = 1;
+ DistrictId = 1;
+}
+
+bool TDistrictLoadQueryGenerator::Finished() {
+ return (WarehouseId > Params.Warehouses);
+}
+
+
+TString TDistrictLoadQueryGenerator::GetCreateDDL(TString path) {
+ TString query = Sprintf(R"(
+ CREATE TABLE `%s/district` (
+ D_W_ID Int32 NOT NULL,
+ D_ID Int32 NOT NULL,
+ D_YTD Double,
+ D_TAX Double,
+ D_NEXT_O_ID Int32,
+ D_NAME Utf8,
+ D_STREET_1 Utf8,
+ D_STREET_2 Utf8,
+ D_CITY Utf8,
+ D_STATE Utf8,
+ D_ZIP Utf8,
+ PRIMARY KEY (D_W_ID, D_ID)
+ ) WITH (
+ AUTO_PARTITIONING_BY_LOAD = DISABLED
+ );
+ )", path.c_str());
+
+ return query;
+}
+
+std::string TDistrictLoadQueryGenerator::GetCleanDDL() {
+ std::string cleanQuery = "DROP TABLE `district`;";
+
+ return cleanQuery;
+}
+
+NYdb::TValue TDistrictLoadQueryGenerator::GetNextBatchLoadDDL() {
+ NYdb::TValueBuilder rows;
+ if (Finished()) {
+ return rows.Build();
+ }
+
+ rows.BeginList();
+ for (i32 id = 1; id <= Params.LoadBatchSize; ++id) {
+ rows.AddListItem().BeginStruct();
+ rows.AddMember("D_W_ID").Int32(WarehouseId);
+ rows.AddMember("D_ID").Int32(DistrictId);
+ rows.AddMember("D_YTD").Double(300000);
+ rows.AddMember("D_TAX").Double(static_cast<double>(UniformRandom32(0, 2000, Rng)) / 10000.0);
+ rows.AddMember("D_NEXT_O_ID").Int32(ETPCCWorkloadConstants::TPCC_CUST_PER_DIST + 1);
+ rows.AddMember("D_NAME").Utf8(RandomString(UniformRandom32(6, 10, Rng)));
+ rows.AddMember("D_STREET_1").Utf8(RandomString(UniformRandom32(10, 20, Rng)));
+ rows.AddMember("D_STREET_2").Utf8(RandomString(UniformRandom32(10, 20, Rng)));
+ rows.AddMember("D_CITY").Utf8(RandomString(UniformRandom32(10, 20, Rng)));
+ rows.AddMember("D_STATE").Utf8(RandomString(2));
+ rows.AddMember("D_ZIP").Utf8("123456789");
+ rows.EndStruct();
+
+ DistrictId++;
+ if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) {
+ DistrictId = 1;
+ WarehouseId++;
+ if (Finished()) {
+ break;
+ }
+ }
+ }
+ rows.EndList();
+
+ return rows.Build();
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/district.h b/ydb/library/workload/tpcc/load_data/district.h
new file mode 100644
index 0000000000..721e0825c1
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/district.h
@@ -0,0 +1,28 @@
+#pragma once
+
+#include "query_generator.h"
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class TDistrictLoadQueryGenerator : public TLoadQueryGenerator {
+public:
+
+ TDistrictLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed);
+
+ static TString GetCreateDDL(TString path);
+
+ static std::string GetCleanDDL();
+
+ NYdb::TValue GetNextBatchLoadDDL() override;
+
+ bool Finished() override;
+
+private:
+ i32 WarehouseId;
+ i32 DistrictId;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/history.cpp b/ydb/library/workload/tpcc/load_data/history.cpp
new file mode 100644
index 0000000000..5f9b2a95ed
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/history.cpp
@@ -0,0 +1,94 @@
+#include "history.h"
+
+#include <util/string/printf.h>
+
+#include <sstream>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+THistoryLoadQueryGenerator::THistoryLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed)
+ : TLoadQueryGenerator(params, seed)
+ , PartNum(partNum)
+{
+ WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1;
+ DistrictId = 1;
+ CustomerId = 1;
+}
+
+bool THistoryLoadQueryGenerator::Finished() {
+ i32 whEnd = PartNum * Params.Warehouses / Params.Threads;
+ return (WarehouseId > whEnd);
+}
+
+TString THistoryLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) {
+ std::string partNumS = std::to_string(partNum);
+ TString query = Sprintf(R"(
+ CREATE TABLE `%s/history` (
+ H_C_W_ID Int32,
+ H_C_ID Int32,
+ H_C_D_ID Int32,
+ H_D_ID Int32,
+ H_W_ID Int32,
+ H_DATE Timestamp,
+ H_AMOUNT Double,
+ H_DATA Utf8,
+ H_C_NANO_TS Int64 NOT NULL,
+ PRIMARY KEY (H_C_W_ID, H_C_NANO_TS)
+ ) WITH (
+ AUTO_PARTITIONING_BY_LOAD = DISABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s,
+ PARTITION_AT_KEYS = (%s)
+ );
+ )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str());
+
+ return query;
+}
+
+std::string THistoryLoadQueryGenerator::GetCleanDDL() {
+ std::string cleanQuery = "DROP TABLE `history`;";
+
+ return cleanQuery;
+}
+
+NYdb::TValue THistoryLoadQueryGenerator::GetNextBatchLoadDDL() {
+ NYdb::TValueBuilder rows;
+ if (Finished()) {
+ return rows.Build();
+ }
+
+ rows.BeginList();
+ for (i32 id = 1; id <= Params.LoadBatchSize; ++id) {
+ rows.AddListItem().BeginStruct();
+ rows.AddMember("H_C_ID").Int32(CustomerId);
+ rows.AddMember("H_C_D_ID").Int32(DistrictId);
+ rows.AddMember("H_C_W_ID").Int32(WarehouseId);
+ rows.AddMember("H_D_ID").Int32(DistrictId);
+ rows.AddMember("H_W_ID").Int32(WarehouseId);
+ rows.AddMember("H_DATE").Timestamp(Now());
+ rows.AddMember("H_AMOUNT").Double(10.0);
+ rows.AddMember("H_DATA").Utf8(RandomString(UniformRandom32(10, 24, Rng)));
+ rows.AddMember("H_C_NANO_TS").Int64(GetNowNanoSeconds());
+ rows.EndStruct();
+
+ CustomerId++;
+ if (CustomerId > ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) {
+ CustomerId = 1;
+ DistrictId++;
+ if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) {
+ DistrictId = 1;
+ WarehouseId++;
+ if (Finished()) {
+ break;
+ }
+ }
+ }
+ }
+ rows.EndList();
+
+ return rows.Build();
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/history.h b/ydb/library/workload/tpcc/load_data/history.h
new file mode 100644
index 0000000000..5aac3a8369
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/history.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include "query_generator.h"
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class THistoryLoadQueryGenerator : public TLoadQueryGenerator {
+public:
+
+ THistoryLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed);
+
+ static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys);
+
+ static std::string GetCleanDDL();
+
+ NYdb::TValue GetNextBatchLoadDDL() override;
+
+ bool Finished() override;
+
+private:
+ ui32 PartNum;
+ i32 WarehouseId;
+ i32 DistrictId;
+ i32 CustomerId;
+
+ ui64 GetNowNanoSeconds() {
+ std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
+ auto duration = now.time_since_epoch();
+ auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(duration);
+ return static_cast<ui64>(nanoseconds.count());
+ }
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/item.cpp b/ydb/library/workload/tpcc/load_data/item.cpp
new file mode 100644
index 0000000000..0e7779bade
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/item.cpp
@@ -0,0 +1,82 @@
+#include "item.h"
+
+#include <util/string/printf.h>
+
+#include <sstream>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TItemLoadQueryGenerator::TItemLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed)
+ : TLoadQueryGenerator(params, seed)
+{
+ ItemId = 1;
+}
+
+bool TItemLoadQueryGenerator::Finished() {
+ return (ItemId > ETPCCWorkloadConstants::TPCC_ITEM_COUNT);
+}
+
+TString TItemLoadQueryGenerator::GetCreateDDL(TString path) {
+ TString query = Sprintf(R"(
+ CREATE TABLE `%s/item` (
+ I_ID Int32 NOT NULL,
+ I_NAME Utf8,
+ I_PRICE Double,
+ I_DATA Utf8,
+ I_IM_ID Int32,
+ PRIMARY KEY (I_ID)
+ ) WITH (
+ AUTO_PARTITIONING_BY_LOAD = DISABLED
+ );
+ )", path.c_str());
+
+ return query;
+}
+
+std::string TItemLoadQueryGenerator::GetCleanDDL() {
+ std::string cleanQuery = "DROP TABLE `item`;";
+
+ return cleanQuery;
+}
+
+NYdb::TValue TItemLoadQueryGenerator::GetNextBatchLoadDDL() {
+ NYdb::TValueBuilder rows;
+ if (Finished()) {
+ return rows.Build();
+ }
+ i32 partStart = ItemId;
+ i32 partEnd = std::min(static_cast<i32>(ETPCCWorkloadConstants::TPCC_ITEM_COUNT),
+ partStart + Params.LoadBatchSize - 1);
+
+ rows.BeginList();
+ for (i32 row = partStart; row <= partEnd; ++row) {
+ rows.AddListItem().BeginStruct();
+ rows.AddMember("I_ID").Int32(row);
+ rows.AddMember("I_NAME").Utf8(RandomString(UniformRandom32(14, 24, Rng)));
+ rows.AddMember("I_PRICE").Double(static_cast<double>(UniformRandom32(100, 10000, Rng)) / 100.0);
+
+ ui64 dataType = UniformRandom32(1, 100, Rng);
+ ui64 length = UniformRandom32(26, 50, Rng);
+ if (dataType > 10) {
+ // 90% of time i_data isa random string of length [26 .. 50]
+ rows.AddMember("I_DATA").Utf8(RandomString(length));
+ } else {
+ // 10% of time i_data has "ORIGINAL" crammed somewhere in middle
+ ui64 placeForOriginal = UniformRandom32(1, length - 9, Rng);
+ rows.AddMember("I_DATA").Utf8(
+ RandomString(placeForOriginal) + "ORIGINAL" + RandomString(length - 8 - placeForOriginal)
+ );
+ }
+ rows.AddMember("I_IM_ID").Int32(UniformRandom32(1, 10000, Rng));
+ rows.EndStruct();
+ }
+ rows.EndList();
+
+ ItemId = partEnd + 1;
+
+ return rows.Build();
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/item.h b/ydb/library/workload/tpcc/load_data/item.h
new file mode 100644
index 0000000000..a73140f016
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/item.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include "query_generator.h"
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class TItemLoadQueryGenerator : public TLoadQueryGenerator {
+public:
+
+ TItemLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed);
+
+ static TString GetCreateDDL(TString path);
+
+ static std::string GetCleanDDL();
+
+ NYdb::TValue GetNextBatchLoadDDL() override;
+
+ bool Finished() override;
+
+private:
+ i32 ItemId;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp b/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp
new file mode 100644
index 0000000000..95489b174b
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/load_thread_pool.cpp
@@ -0,0 +1,83 @@
+#include "load_thread_pool.h"
+
+#include <ydb/library/workload/tpcc/tpcc_thread_resource.h>
+
+#include <ydb/public/api/protos/ydb_value.pb.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TLoadThreadPool::TLoadThreadPool(std::shared_ptr<TDriver>& driver)
+ : TThreadPool(TThreadPool::TParams().SetBlocking(true).SetCatching(true))
+ , Driver(driver) {}
+
+void* TLoadThreadPool::CreateThreadSpecificResource() {
+ auto tableClientSettings = NTable::TClientSettings()
+ .SessionPoolSettings(
+ NTable::TSessionPoolSettings()
+ .MaxActiveSessions(1));
+ return new TThreadResource(std::make_shared<NTable::TTableClient>(*Driver, tableClientSettings));
+}
+
+void TLoadThreadPool::DestroyThreadSpecificResource(void* resource) {
+ delete static_cast<TThreadResource*>(resource);
+}
+
+TLoadTask::TLoadTask(std::unique_ptr<TLoadQueryGenerator>&& queryGen)
+ : QueryGen(std::move(queryGen)) {}
+
+void TLoadTask::Process(void* threadResource) {
+ try {
+ int retryCount = 0;
+ auto query = QueryGen->GetNextBatchLoadDDL();
+ auto retrySettings = NYdb::NTable::TRetryOperationSettings()
+ .MaxRetries(QueryGen->GetParams().RetryCount);
+ auto& db = static_cast<TThreadResource*>(threadResource)->Client;
+
+ auto upsertQuery = [this, &query, &retryCount] (NTable::TTableClient& tableClient) {
+ TValue queryCopy(query);
+
+ auto future = tableClient.BulkUpsert(
+ QueryGen->GetParams().DbPath,
+ std::move(queryCopy)
+ );
+
+ return future.Apply([&retryCount](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
+ NYdb::TStatus status = bulkUpsertResult.GetValueSync();
+ if (!status.IsSuccess()) {
+ ++retryCount;
+ } else if (status.GetIssues()) {
+ Cerr << status.GetIssues().ToString() << "\n";
+ }
+ return NThreading::MakeFuture(status);
+ });
+ };
+
+ while (true) {
+ auto statusF = db->RetryOperation(upsertQuery, retrySettings);
+
+ // We want to prepare the request while the previous one is running
+ if (!QueryGen->Finished()) {
+ query = QueryGen->GetNextBatchLoadDDL();
+ } else {
+ NConsoleClient::ThrowOnError(statusF.GetValueSync());
+ break;
+ }
+ NConsoleClient::ThrowOnError(statusF.GetValueSync());
+
+ // TODO: Retry monitoring
+ retryCount = 0;
+ }
+
+ } catch(NConsoleClient::TYdbErrorException ex) {
+ Cerr << ex;
+ throw;
+ } catch(...) {
+ throw;
+ }
+}
+
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/load_thread_pool.h b/ydb/library/workload/tpcc/load_data/load_thread_pool.h
new file mode 100644
index 0000000000..c8995e308a
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/load_thread_pool.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/library/workload/tpcc/load_data/query_generator.h>
+
+#include <util/thread/pool.h>
+#include <library/cpp/logger/log.h>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+using namespace NYdb;
+
+class TLoadThreadPool : public TThreadPool {
+public:
+ TLoadThreadPool(std::shared_ptr<TDriver>& driver);
+
+protected:
+ void* CreateThreadSpecificResource() override;
+
+ void DestroyThreadSpecificResource(void* resource) override;
+
+private:
+ std::shared_ptr<TDriver> Driver;
+};
+
+
+class TLoadTask : public IObjectInQueue {
+public:
+ TLoadTask(std::unique_ptr<TLoadQueryGenerator>&& queryGen);
+
+ void Process(void* threadResource) override;
+
+private:
+ std::unique_ptr<TLoadQueryGenerator> QueryGen;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/new_order.cpp b/ydb/library/workload/tpcc/load_data/new_order.cpp
new file mode 100644
index 0000000000..bb31672f0d
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/new_order.cpp
@@ -0,0 +1,85 @@
+#include "new_order.h"
+
+#include <util/string/printf.h>
+
+#include <sstream>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TNewOrderLoadQueryGenerator::TNewOrderLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed)
+ : TLoadQueryGenerator(params, seed)
+ , PartNum(partNum)
+{
+ WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1;
+ DistrictId = 1;
+ // 900 rows in the NEW-ORDER table corresponding to the last
+ // 900 rows in the ORDER table for that district
+ // (i.e., with NO_O_ID between 2,101 and 3,000)
+ CustomerId = ETPCCWorkloadConstants::TPCC_FIRST_UNPROCESSED_O_ID;
+}
+
+bool TNewOrderLoadQueryGenerator::Finished() {
+ i32 whEnd = PartNum * Params.Warehouses / Params.Threads;
+ return (WarehouseId > whEnd);
+}
+
+TString TNewOrderLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) {
+ std::string partNumS = std::to_string(partNum);
+ TString query = Sprintf(R"(
+ CREATE TABLE `%s/new_order` (
+ NO_W_ID Int32 NOT NULL,
+ NO_D_ID Int32 NOT NULL,
+ NO_O_ID Int32 NOT NULL,
+ PRIMARY KEY (NO_W_ID, NO_D_ID, NO_O_ID)
+ ) WITH (
+ AUTO_PARTITIONING_BY_LOAD = DISABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s,
+ PARTITION_AT_KEYS = (%s)
+ );
+ )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str());
+
+ return query;
+}
+
+std::string TNewOrderLoadQueryGenerator::GetCleanDDL() {
+ std::string cleanQuery = "DROP TABLE `new_order`;";
+
+ return cleanQuery;
+}
+
+NYdb::TValue TNewOrderLoadQueryGenerator::GetNextBatchLoadDDL() {
+ NYdb::TValueBuilder rows;
+ if (Finished()) {
+ return rows.Build();
+ }
+
+ rows.BeginList();
+ for (i32 id = 1; id <= Params.LoadBatchSize; ++id) {
+ rows.AddListItem().BeginStruct();
+ rows.AddMember("NO_W_ID").Int32(WarehouseId);
+ rows.AddMember("NO_D_ID").Int32(DistrictId);
+ rows.AddMember("NO_O_ID").Int32(CustomerId);
+ rows.EndStruct();
+
+ CustomerId++;
+ if (CustomerId > ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) {
+ CustomerId = ETPCCWorkloadConstants::TPCC_FIRST_UNPROCESSED_O_ID;
+ DistrictId++;
+ if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) {
+ DistrictId = 1;
+ WarehouseId++;
+ if (Finished()) {
+ break;
+ }
+ }
+ }
+ }
+ rows.EndList();
+
+ return rows.Build();
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/new_order.h b/ydb/library/workload/tpcc/load_data/new_order.h
new file mode 100644
index 0000000000..e751962c6d
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/new_order.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include "query_generator.h"
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class TNewOrderLoadQueryGenerator : public TLoadQueryGenerator {
+public:
+
+ TNewOrderLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed);
+
+ static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys);
+
+ static std::string GetCleanDDL();
+
+ NYdb::TValue GetNextBatchLoadDDL() override;
+
+ bool Finished() override;
+
+private:
+ ui32 PartNum;
+ i32 WarehouseId;
+ i32 DistrictId;
+ i32 CustomerId;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/oorder.cpp b/ydb/library/workload/tpcc/load_data/oorder.cpp
new file mode 100644
index 0000000000..f0eda4e532
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/oorder.cpp
@@ -0,0 +1,102 @@
+#include "oorder.h"
+
+#include <util/string/printf.h>
+
+#include <sstream>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TOorderLoadQueryGenerator::TOorderLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed)
+ : TLoadQueryGenerator(params, seed)
+ , PartNum(partNum)
+ , CustShuffle(ETPCCWorkloadConstants::TPCC_CUST_PER_DIST)
+{
+ WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1;
+ DistrictId = 1;
+ CustomerId = 0;
+ for (ui64 id = 0; id < CustShuffle.size(); ++id) {
+ CustShuffle[id] = id + 1;
+ }
+ std::shuffle(CustShuffle.begin(), CustShuffle.end(), Rng);
+}
+
+bool TOorderLoadQueryGenerator::Finished() {
+ i32 whEnd = PartNum * Params.Warehouses / Params.Threads;
+ return (WarehouseId > whEnd);
+}
+
+TString TOorderLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) {
+ std::string partNumS = std::to_string(partNum);
+ TString query = Sprintf(R"(
+ CREATE TABLE `%s/oorder` (
+ O_W_ID Int32 NOT NULL,
+ O_D_ID Int32 NOT NULL,
+ O_ID Int32 NOT NULL,
+ O_C_ID Int32,
+ O_CARRIER_ID Int32,
+ O_OL_CNT Int32,
+ O_ALL_LOCAL Int32,
+ O_ENTRY_D Timestamp,
+ PRIMARY KEY (O_W_ID, O_D_ID, O_ID)
+ ) WITH (
+ AUTO_PARTITIONING_BY_LOAD = DISABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s,
+ PARTITION_AT_KEYS = (%s)
+ );
+ )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str());
+
+ return query;
+}
+
+std::string TOorderLoadQueryGenerator::GetCleanDDL() {
+ std::string cleanQuery = "DROP TABLE `oorder`;";
+
+ return cleanQuery;
+}
+
+NYdb::TValue TOorderLoadQueryGenerator::GetNextBatchLoadDDL() {
+ NYdb::TValueBuilder rows;
+ if (Finished()) {
+ return rows.Build();
+ }
+
+ rows.BeginList();
+ for (i32 id = 1; id <= Params.LoadBatchSize; ++id) {
+ rows.AddListItem().BeginStruct();
+ rows.AddMember("O_W_ID").Int32(WarehouseId);
+ rows.AddMember("O_D_ID").Int32(DistrictId);
+ rows.AddMember("O_ID").Int32(CustomerId);
+ rows.AddMember("O_C_ID").Int32(CustShuffle[CustomerId]);
+ if (CustomerId + 1 < ETPCCWorkloadConstants::TPCC_FIRST_UNPROCESSED_O_ID) {
+ rows.AddMember("O_CARRIER_ID").Int32(UniformRandom32(10, 24, Rng));
+ } else {
+ rows.AddMember("O_CARRIER_ID").Int32(0);
+ }
+ rows.AddMember("O_OL_CNT").Int32(UniformRandom32(5, 15, Rng));
+ rows.AddMember("O_ALL_LOCAL").Int32(1);
+ rows.AddMember("O_ENTRY_D").Timestamp(Now());
+ rows.EndStruct();
+
+ CustomerId++;
+ if (CustomerId == ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) {
+ CustomerId = 0;
+ DistrictId++;
+ std::shuffle(CustShuffle.begin(), CustShuffle.end(), Rng);
+ if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) {
+ DistrictId = 1;
+ WarehouseId++;
+ if (Finished()) {
+ break;
+ }
+ }
+ }
+ }
+ rows.EndList();
+
+ return rows.Build();
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/oorder.h b/ydb/library/workload/tpcc/load_data/oorder.h
new file mode 100644
index 0000000000..160e0b8118
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/oorder.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include "query_generator.h"
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class TOorderLoadQueryGenerator : public TLoadQueryGenerator {
+public:
+
+ TOorderLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed);
+
+ static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys);
+
+ static std::string GetCleanDDL();
+
+ NYdb::TValue GetNextBatchLoadDDL() override;
+
+ bool Finished() override;
+
+private:
+ ui32 PartNum;
+ i32 WarehouseId;
+ i32 DistrictId;
+ i32 CustomerId;
+ std::vector<ui64> CustShuffle;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/order_line.cpp b/ydb/library/workload/tpcc/load_data/order_line.cpp
new file mode 100644
index 0000000000..b254f6e61e
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/order_line.cpp
@@ -0,0 +1,111 @@
+#include "order_line.h"
+
+#include <util/string/printf.h>
+
+#include <sstream>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TOrderLineLoadQueryGenerator::TOrderLineLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed)
+ : TLoadQueryGenerator(params, seed)
+ , PartNum(partNum)
+{
+ WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1;
+ DistrictId = 1;
+ CustomerId = 1;
+ OlNumber = 1;
+ RandomCount = UniformRandom32(5, 15, Rng);
+}
+
+bool TOrderLineLoadQueryGenerator::Finished() {
+ i32 whEnd = PartNum * Params.Warehouses / Params.Threads;
+ return (WarehouseId > whEnd);
+}
+
+TString TOrderLineLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) {
+ std::string partNumS = std::to_string(partNum);
+ TString query = Sprintf(R"(
+ CREATE TABLE `%s/order_line` (
+ OL_W_ID Int32 NOT NULL,
+ OL_D_ID Int32 NOT NULL,
+ OL_O_ID Int32 NOT NULL,
+ OL_NUMBER Int32 NOT NULL,
+ OL_I_ID Int32,
+ OL_DELIVERY_D Timestamp,
+ OL_AMOUNT Double,
+ OL_SUPPLY_W_ID Int32,
+ OL_QUANTITY Double,
+ OL_DIST_INFO Utf8,
+ PRIMARY KEY (OL_W_ID, OL_D_ID, OL_O_ID, OL_NUMBER)
+ ) WITH (
+ AUTO_PARTITIONING_BY_LOAD = DISABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s,
+ PARTITION_AT_KEYS = (%s)
+ );
+ )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str());
+
+ return query;
+}
+
+std::string TOrderLineLoadQueryGenerator::GetCleanDDL() {
+ std::string cleanQuery = "DROP TABLE `order_line`;";
+
+ return cleanQuery;
+}
+
+NYdb::TValue TOrderLineLoadQueryGenerator::GetNextBatchLoadDDL() {
+ NYdb::TValueBuilder rows;
+ if (Finished()) {
+ return rows.Build();
+ }
+
+ rows.BeginList();
+ for (i32 id = 1; id <= Params.LoadBatchSize; ++id) {
+ rows.AddListItem().BeginStruct();
+ rows.AddMember("OL_W_ID").Int32(WarehouseId);
+ rows.AddMember("OL_D_ID").Int32(DistrictId);
+ rows.AddMember("OL_O_ID").Int32(CustomerId);
+ rows.AddMember("OL_NUMBER").Int32(OlNumber);
+
+ ui64 itemId = UniformRandom32(1, ETPCCWorkloadConstants::TPCC_ITEM_COUNT, Rng);
+ rows.AddMember("OL_I_ID").Int32(itemId);
+ if (itemId < ETPCCWorkloadConstants::TPCC_FIRST_UNPROCESSED_O_ID) {
+ rows.AddMember("OL_DELIVERY_D").Timestamp(Now());
+ rows.AddMember("OL_AMOUNT").Double(0.0);
+ } else {
+ rows.AddMember("OL_DELIVERY_D").Timestamp(TInstant::Zero());
+ rows.AddMember("OL_AMOUNT").Double(static_cast<double>(UniformRandom32(1, 999999, Rng)) / 100.0);
+ }
+
+ rows.AddMember("OL_SUPPLY_W_ID").Int32(WarehouseId);
+ rows.AddMember("OL_QUANTITY").Double(5.0);
+ rows.AddMember("OL_DIST_INFO").Utf8(RandomString(24));
+ rows.EndStruct();
+
+ OlNumber++;
+ if (OlNumber > RandomCount) {
+ CustomerId++;
+ OlNumber = 1;
+ RandomCount = UniformRandom32(5, 15, Rng);
+ if (CustomerId > ETPCCWorkloadConstants::TPCC_CUST_PER_DIST) {
+ CustomerId = 1;
+ DistrictId++;
+ if (DistrictId > ETPCCWorkloadConstants::TPCC_DIST_PER_WH) {
+ DistrictId = 1;
+ WarehouseId++;
+ if (Finished()) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ rows.EndList();
+
+ return rows.Build();
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/order_line.h b/ydb/library/workload/tpcc/load_data/order_line.h
new file mode 100644
index 0000000000..7762cf9352
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/order_line.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include "query_generator.h"
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class TOrderLineLoadQueryGenerator : public TLoadQueryGenerator {
+public:
+
+ TOrderLineLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed);
+
+ static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys);
+
+ static std::string GetCleanDDL();
+
+ NYdb::TValue GetNextBatchLoadDDL() override;
+
+ bool Finished() override;
+
+private:
+ ui32 PartNum;
+ i32 WarehouseId;
+ i32 DistrictId;
+ i32 CustomerId;
+ ui64 RandomCount;
+ ui64 OlNumber;
+
+ std::uniform_int_distribution<ui64> RandomCountGen;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/query_generator.cpp b/ydb/library/workload/tpcc/load_data/query_generator.cpp
new file mode 100644
index 0000000000..d5cc5bec67
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/query_generator.cpp
@@ -0,0 +1,35 @@
+#include "query_generator.h"
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TLoadQueryGenerator::TLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed)
+ : Params(params)
+ , Rng(seed, 0)
+{
+}
+
+TTPCCWorkloadParams TLoadQueryGenerator::GetParams() {
+ return Params;
+}
+
+TString TLoadQueryGenerator::RandomString(ui64 length) {
+ char str[length + 1];
+ for (ui64 i = 0; i < length; ++i) {
+ str[i] = static_cast<char>(UniformRandom32('A', 'z', Rng));
+ }
+ str[length] = '\0';
+ return TString(str);
+}
+
+TString TLoadQueryGenerator::RandomNumberString(ui64 length) {
+ char str[length + 1];
+ for (ui64 i = 0; i < length; ++i) {
+ str[i] = static_cast<char>(UniformRandom32('0', '9', Rng));
+ }
+ str[length] = '\0';
+ return TString(str);
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/query_generator.h b/ydb/library/workload/tpcc/load_data/query_generator.h
new file mode 100644
index 0000000000..537e7c7da3
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/query_generator.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include <ydb/library/workload/tpcc/tpcc_config.h>
+#include <ydb/library/workload/tpcc/tpcc_util.h>
+
+#include <util/random/fast.h>
+
+#include <random>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class TLoadQueryGenerator {
+public:
+
+ TLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed);
+
+ virtual ~TLoadQueryGenerator() = default;
+
+ virtual NYdb::TValue GetNextBatchLoadDDL() = 0;
+
+ virtual bool Finished() = 0;
+
+ TTPCCWorkloadParams GetParams();
+
+protected:
+ TString RandomString(ui64 length);
+ TString RandomNumberString(ui64 length);
+
+ TTPCCWorkloadParams Params;
+
+ TFastRng32 Rng;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/stock.cpp b/ydb/library/workload/tpcc/load_data/stock.cpp
new file mode 100644
index 0000000000..67c4904d1d
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/stock.cpp
@@ -0,0 +1,115 @@
+#include "stock.h"
+
+#include <util/string/printf.h>
+
+#include <sstream>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TStockLoadQueryGenerator::TStockLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed)
+ : TLoadQueryGenerator(params, seed)
+ , PartNum(partNum)
+{
+ WarehouseId = (partNum - 1) * params.Warehouses / params.Threads + 1;
+ ItemId = 1;
+}
+
+bool TStockLoadQueryGenerator::Finished() {
+ i32 whEnd = PartNum * Params.Warehouses / Params.Threads;
+ return (WarehouseId > whEnd);
+}
+
+TString TStockLoadQueryGenerator::GetCreateDDL(TString path, ui32 partNum, TString partAtKeys) {
+ std::string partNumS = std::to_string(partNum);
+ TString query = Sprintf(R"(
+ CREATE TABLE `%s/stock` (
+ S_W_ID Int32 NOT NULL,
+ S_I_ID Int32 NOT NULL,
+ S_QUANTITY Int32,
+ S_YTD Double,
+ S_ORDER_CNT Int32,
+ S_REMOTE_CNT Int32,
+ S_DATA Utf8,
+ S_DIST_01 Utf8,
+ S_DIST_02 Utf8,
+ S_DIST_03 Utf8,
+ S_DIST_04 Utf8,
+ S_DIST_05 Utf8,
+ S_DIST_06 Utf8,
+ S_DIST_07 Utf8,
+ S_DIST_08 Utf8,
+ S_DIST_09 Utf8,
+ S_DIST_10 Utf8,
+ PRIMARY KEY (S_W_ID, S_I_ID)
+ ) WITH (
+ AUTO_PARTITIONING_BY_LOAD = DISABLED,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s,
+ PARTITION_AT_KEYS = (%s)
+ );
+ )", path.c_str(), partNumS.c_str(), partNumS.c_str(), partAtKeys.c_str());
+
+ return query;
+}
+
+std::string TStockLoadQueryGenerator::GetCleanDDL() {
+ std::string cleanQuery = "DROP TABLE `stock`;";
+
+ return cleanQuery;
+}
+
+NYdb::TValue TStockLoadQueryGenerator::GetNextBatchLoadDDL() {
+ NYdb::TValueBuilder rows;
+ if (Finished()) {
+ return rows.Build();
+ }
+
+ std::uniform_int_distribution<ui64> quantityGen(10, 100);
+ std::uniform_int_distribution<ui64> dataLengthGen(26, 50);
+ std::uniform_int_distribution<ui64> dataTypeGen(1, 100);
+
+ rows.BeginList();
+ for (i32 id = 1; id <= Params.LoadBatchSize; ++id) {
+ rows.AddListItem().BeginStruct();
+ rows.AddMember("S_W_ID").Int32(WarehouseId);
+ rows.AddMember("S_I_ID").Int32(ItemId);
+ rows.AddMember("S_QUANTITY").Int32(UniformRandom32(10, 100, Rng));
+ rows.AddMember("S_ORDER_CNT").Int32(0);
+ rows.AddMember("S_REMOTE_CNT").Int32(0);
+
+ ui64 dataType = UniformRandom32(1, 100, Rng);
+ ui64 length = UniformRandom32(26, 50, Rng);
+ if (dataType > 10) {
+ // 90% of time i_data isa random string of length [26 .. 50]
+ rows.AddMember("S_DATA").Utf8(RandomString(length));
+ } else {
+ // 10% of time i_data has "ORIGINAL" crammed somewhere in middle
+ ui64 placeForOriginal = UniformRandom32(1, length - 9, Rng);
+ rows.AddMember("S_DATA").Utf8(
+ RandomString(placeForOriginal) + "ORIGINAL" + RandomString(length - 8 - placeForOriginal)
+ );
+ }
+
+ for (int distNum = 1; distNum <= 9; ++distNum) {
+ rows.AddMember("S_DIST_0" + std::to_string(distNum)).Utf8(RandomString(24));
+ }
+ rows.AddMember("S_DIST_10").Utf8(RandomString(24));
+ rows.EndStruct();
+
+ ItemId++;
+ if (ItemId > ETPCCWorkloadConstants::TPCC_ITEM_COUNT) {
+ ItemId = 1;
+ WarehouseId++;
+ if (Finished()) {
+ break;
+ }
+ }
+ }
+ rows.EndList();
+
+ return rows.Build();
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/stock.h b/ydb/library/workload/tpcc/load_data/stock.h
new file mode 100644
index 0000000000..3e725062cf
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/stock.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include "query_generator.h"
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class TStockLoadQueryGenerator : public TLoadQueryGenerator {
+public:
+
+ TStockLoadQueryGenerator(TTPCCWorkloadParams& params, ui32 partNum, ui64 seed);
+
+ static TString GetCreateDDL(TString path, ui32 partNum, TString partAtKeys);
+
+ static std::string GetCleanDDL();
+
+ NYdb::TValue GetNextBatchLoadDDL() override;
+
+ bool Finished() override;
+
+private:
+ ui32 PartNum;
+ i32 WarehouseId;
+ i32 ItemId;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/warehouse.cpp b/ydb/library/workload/tpcc/load_data/warehouse.cpp
new file mode 100644
index 0000000000..63784ddb37
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/warehouse.cpp
@@ -0,0 +1,77 @@
+#include "warehouse.h"
+
+#include <util/string/printf.h>
+
+#include <sstream>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TWarehouseLoadQueryGenerator::TWarehouseLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed)
+ : TLoadQueryGenerator(params, seed)
+{
+ WarehouseId = 1;
+}
+
+bool TWarehouseLoadQueryGenerator::Finished() {
+ return (WarehouseId > Params.Warehouses);
+}
+
+TString TWarehouseLoadQueryGenerator::GetCreateDDL(TString path) {
+ TString query = Sprintf(R"(
+ CREATE TABLE `%s/warehouse` (
+ W_ID Int32 NOT NULL,
+ W_YTD Double,
+ W_TAX Double,
+ W_NAME Utf8,
+ W_STREET_1 Utf8,
+ W_STREET_2 Utf8,
+ W_CITY Utf8,
+ W_STATE Utf8,
+ W_ZIP Utf8,
+ PRIMARY KEY (W_ID)
+ ) WITH (
+ AUTO_PARTITIONING_BY_LOAD = DISABLED
+ );
+ )", path.c_str());
+
+ return query;
+}
+
+std::string TWarehouseLoadQueryGenerator::GetCleanDDL() {
+ std::string cleanQuery = "DROP TABLE `warehouse`;";
+
+ return cleanQuery;
+}
+
+NYdb::TValue TWarehouseLoadQueryGenerator::GetNextBatchLoadDDL() {
+ NYdb::TValueBuilder rows;
+ if (Finished()) {
+ return rows.Build();
+ }
+
+ i32 whEnd = std::min(Params.Warehouses, WarehouseId + Params.LoadBatchSize - 1);
+
+ rows.BeginList();
+ for (i32 row = WarehouseId; row <= whEnd; ++row) {
+ rows.AddListItem().BeginStruct();
+ rows.AddMember("W_ID").Int32(row);
+ rows.AddMember("W_YTD").Double(300000);
+ rows.AddMember("W_TAX").Double(static_cast<double>(UniformRandom32(0, 2000, Rng)) / 10000.0);
+ rows.AddMember("W_NAME").Utf8(RandomString(UniformRandom32(6, 10, Rng)));
+ rows.AddMember("W_STREET_1").Utf8(RandomString(UniformRandom32(10, 20, Rng)));
+ rows.AddMember("W_STREET_2").Utf8(RandomString(UniformRandom32(10, 20, Rng)));
+ rows.AddMember("W_CITY").Utf8(RandomString(UniformRandom32(10, 20, Rng)));
+ rows.AddMember("W_STATE").Utf8(RandomString(2));
+ rows.AddMember("W_ZIP").Utf8("123456789");
+ rows.EndStruct();
+ }
+ rows.EndList();
+
+ WarehouseId = whEnd + 1;
+
+ return rows.Build();
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/load_data/warehouse.h b/ydb/library/workload/tpcc/load_data/warehouse.h
new file mode 100644
index 0000000000..4e1dfe5ee6
--- /dev/null
+++ b/ydb/library/workload/tpcc/load_data/warehouse.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include "query_generator.h"
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+class TWarehouseLoadQueryGenerator : public TLoadQueryGenerator {
+public:
+
+ TWarehouseLoadQueryGenerator(TTPCCWorkloadParams& params, ui64 seed);
+
+ static TString GetCreateDDL(TString path);
+
+ static std::string GetCleanDDL();
+
+ NYdb::TValue GetNextBatchLoadDDL() override;
+
+ bool Finished() override;
+
+private:
+ i32 WarehouseId;
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/tpcc_config.h b/ydb/library/workload/tpcc/tpcc_config.h
new file mode 100644
index 0000000000..47bf656be1
--- /dev/null
+++ b/ydb/library/workload/tpcc/tpcc_config.h
@@ -0,0 +1,73 @@
+#pragma once
+
+#include <ydb/library/workload/workload_query_generator.h>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+static const std::vector<std::string> TPCCNameTokens {
+ "BAR", "OUGHT", "ABLE", "PRI",
+ "PRES", "ESE", "ANTI", "CALLY",
+ "ATION", "EING"
+};
+
+enum ETPCCWorkloadConstants : i32 {
+ TPCC_WAREHOUSES = 10,
+ TPCC_DIST_PER_WH = 10,
+ TPCC_CUST_PER_DIST = 3000,
+ TPCC_ITEM_COUNT = 100000,
+ TPCC_TERMINAL_PER_WH = 10,
+
+ TPCC_THREADS = 10,
+ TPCC_RETRY_COUNT = 10,
+ TPCC_MIN_PARTITIONS = 50,
+ TPCC_MAX_PARTITIONS = 500000,
+ TPCC_AUTO_PARTITIONING = true,
+ TPCC_LOAD_BATCH_SIZE = 128,
+
+ TPCC_NEWORDER_WEIGHT = 45,
+ TPCC_PAYMENT_WEIGHT = 43,
+ TPCC_ORDERSTATUS_WEIGHT = 4,
+ TPCC_DELIVERY_WEIGHT = 4,
+ TPCC_STOCKLEVEL_WEIGHT = 4,
+
+ TPCC_NEWORDER_PRE_EXEC_WAIT = 18000,
+ TPCC_NEWORDER_POST_EXEC_WAIT = 12000,
+ TPCC_PAYMENT_PRE_EXEC_WAIT = 3000,
+ TPCC_PAYMENT_POST_EXEC_WAIT = 12000,
+ TPCC_ORDERSTATUS_PRE_EXEC_WAIT = 2000,
+ TPCC_ORDERSTATUS_POST_EXEC_WAIT = 10000,
+ TPCC_DELIVERY_PRE_EXEC_WAIT = 2000,
+ TPCC_DELIVERY_POST_EXEC_WAIT = 5000,
+ TPCC_STOCKLEVEL_PRE_EXEC_WAIT = 2000,
+ TPCC_STOCKLEVEL_POST_EXEC_WAIT = 5000,
+ TPCC_WORKLOAD_DURATION_SECONDS = 600,
+
+ TPCC_FIRST_UNPROCESSED_O_ID = 2101,
+ TPCC_C_LAST_LOAD_C = 157,
+ TPCC_C_LAST_RUN_C = 223,
+ TPCC_INVALID_ITEM_ID = -12345,
+};
+
+struct TTPCCWorkloadParams : public TWorkloadParams {
+ i32 Warehouses = ETPCCWorkloadConstants::TPCC_WAREHOUSES;
+ i32 Threads = ETPCCWorkloadConstants::TPCC_THREADS;
+ i32 MinPartitions = ETPCCWorkloadConstants::TPCC_MIN_PARTITIONS;
+ i32 MaxPartitions = ETPCCWorkloadConstants::TPCC_MAX_PARTITIONS;
+ bool AutoPartitioning = ETPCCWorkloadConstants::TPCC_AUTO_PARTITIONING;
+ i32 LoadBatchSize = ETPCCWorkloadConstants::TPCC_LOAD_BATCH_SIZE;
+ i32 RetryCount = ETPCCWorkloadConstants::TPCC_RETRY_COUNT;
+ i32 Duration = ETPCCWorkloadConstants::TPCC_WORKLOAD_DURATION_SECONDS;
+ i32 OrderLineIdConst;
+};
+
+enum ETPCCProcedureType {
+ NewOrder,
+ Payment,
+ Delivery,
+ OrderStatus,
+ StockLevel
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/tpcc_thread_resource.h b/ydb/library/workload/tpcc/tpcc_thread_resource.h
new file mode 100644
index 0000000000..7cd7e7cd50
--- /dev/null
+++ b/ydb/library/workload/tpcc/tpcc_thread_resource.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+struct TThreadResource {
+ std::shared_ptr<NYdb::NTable::TTableClient> Client;
+ std::shared_ptr<NYdb::NTable::TSession> Session;
+ TThreadResource(std::shared_ptr<NYdb::NTable::TTableClient>&& client)
+ : Client(std::move(client))
+ {
+ auto status = Client->CreateSession().GetValueSync();
+ NYdb::NConsoleClient::ThrowOnError(status);
+ Session = std::make_shared<NYdb::NTable::TSession>(status.GetSession());
+ }
+};
+
+}
+}
diff --git a/ydb/library/workload/tpcc/tpcc_util.h b/ydb/library/workload/tpcc/tpcc_util.h
new file mode 100644
index 0000000000..051a62a7ba
--- /dev/null
+++ b/ydb/library/workload/tpcc/tpcc_util.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include <ydb/library/workload/tpcc/tpcc_config.h>
+
+#include <util/random/fast.h>
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+inline ui32 UniformRandom32(ui32 min, ui32 max, TFastRng32& rng) {
+ return (rng.GenRand() % (max - min + 1) + min);
+}
+
+inline ui32 NonUniformRandom32(ui32 maxOr, ui32 delta, ui32 min, ui32 max, TFastRng32& rng) {
+ return ((UniformRandom32(0, maxOr, rng) | UniformRandom32(min, max, rng)) + delta) % (max - min + 1) + min;
+}
+
+inline std::string GetLastName(ui64 num) {
+ return TPCCNameTokens[num / 100] + TPCCNameTokens[(num / 10) % 10] + TPCCNameTokens[num % 10];
+}
+
+inline std::string GetNonUniformRandomLastNameForLoad(TFastRng32& rng) {
+ return GetLastName(NonUniformRandom32(255, ETPCCWorkloadConstants::TPCC_C_LAST_LOAD_C, 0, 999, rng));
+}
+
+inline std::string GetNonUniformRandomLastNameForRun(TFastRng32& rng) {
+ return GetLastName(NonUniformRandom32(255, ETPCCWorkloadConstants::TPCC_C_LAST_RUN_C, 0, 999, rng));
+}
+
+inline ui32 GetNonUniformCustomerId(ui32 delta, TFastRng32& rng) {
+ return NonUniformRandom32(1023, delta, 1, ETPCCWorkloadConstants::TPCC_CUST_PER_DIST, rng);
+}
+
+inline ui32 GetNonUniformItemId(ui32 delta, TFastRng32& rng) {
+ return NonUniformRandom32(8191, delta, 1, ETPCCWorkloadConstants::TPCC_CUST_PER_DIST, rng);
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/tpcc_workload.cpp b/ydb/library/workload/tpcc/tpcc_workload.cpp
new file mode 100644
index 0000000000..991322c892
--- /dev/null
+++ b/ydb/library/workload/tpcc/tpcc_workload.cpp
@@ -0,0 +1,281 @@
+#include "tpcc_workload.h"
+
+#include <ydb/library/workload/tpcc/tpcc_util.h>
+#include <ydb/library/workload/tpcc/tpcc_thread_resource.h>
+#include <ydb/library/workload/tpcc/load_data/customer.h>
+#include <ydb/library/workload/tpcc/load_data/district.h>
+#include <ydb/library/workload/tpcc/load_data/history.h>
+#include <ydb/library/workload/tpcc/load_data/item.h>
+#include <ydb/library/workload/tpcc/load_data/new_order.h>
+#include <ydb/library/workload/tpcc/load_data/oorder.h>
+#include <ydb/library/workload/tpcc/load_data/order_line.h>
+#include <ydb/library/workload/tpcc/load_data/stock.h>
+#include <ydb/library/workload/tpcc/load_data/warehouse.h>
+
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <util/string/printf.h>
+#include <library/cpp/logger/stream_creator.h>
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+TTPCCWorkload::TTPCCWorkload(std::shared_ptr<NYdb::TDriver>& driver, TTPCCWorkloadParams& params)
+ : Params(params)
+ , Driver(driver)
+ , Log(TCoutLogBackendCreator().CreateLogBackend())
+ , Seed(Now().MicroSeconds())
+ , Rng(Seed)
+{
+ std::string message = "Seed: " + std::to_string(Seed) + "\n";
+ Log.Write(message.c_str(), message.size());
+}
+
+TString TTPCCWorkload::CreateTablesQuery() {
+ TStringStream query;
+ query << "--!syntax_v1";
+ TString whParts = PartitionByKeys(Params.Warehouses, Params.Threads);
+ query << TWarehouseLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath));
+ query << TItemLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath));
+ query << TDistrictLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath));
+ query << TStockLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts);
+ query << TCustomerLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts);
+ query << TOorderLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts);
+ query << TOrderLineLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts);
+ query << TNewOrderLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts);
+ query << THistoryLoadQueryGenerator::GetCreateDDL(TString(Params.DbPath), Params.Threads, whParts);
+ return query.Str();
+}
+
+TString TTPCCWorkload::CleanTablesQuery() {
+ TStringStream query;
+ query << THistoryLoadQueryGenerator::GetCleanDDL();
+ query << TNewOrderLoadQueryGenerator::GetCleanDDL();
+ query << TOrderLineLoadQueryGenerator::GetCleanDDL();
+ query << TOorderLoadQueryGenerator::GetCleanDDL();
+ query << TCustomerLoadQueryGenerator::GetCleanDDL();
+ query << TDistrictLoadQueryGenerator::GetCleanDDL();
+ query << TStockLoadQueryGenerator::GetCleanDDL();
+ query << TItemLoadQueryGenerator::GetCleanDDL();
+ query << TWarehouseLoadQueryGenerator::GetCleanDDL();
+ return query.Str();
+}
+
+int TTPCCWorkload::InitTables() {
+ try {
+ // Create Tables
+
+ std::string message = "Creating tables...\n";
+ Log.Write(message.c_str(), message.size());
+
+ LoadThreadPool = std::make_unique<TLoadThreadPool>(Driver);
+
+ auto tableClientSettings = NTable::TClientSettings()
+ .SessionPoolSettings(
+ NTable::TSessionPoolSettings()
+ .MaxActiveSessions(1));
+
+ auto db = std::make_unique<NTable::TTableClient>(*Driver, tableClientSettings);
+
+ auto sessionResult = db->GetSession().GetValueSync();
+ ThrowOnError(sessionResult);
+
+ auto result = sessionResult.GetSession().ExecuteSchemeQuery(CreateTablesQuery()).GetValueSync();
+ ThrowOnError(result);
+
+ message = "DONE.\nLoading data...\n";
+ Log.Write(message.c_str(), message.size());
+
+ auto started = Now();
+
+
+ // Load Data
+
+ auto genParams = Params;
+ LoadThreadPool->Start(Params.Threads);
+
+ genParams.DbPath = Params.DbPath + "/warehouse";
+ LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>(
+ std::make_unique<TWarehouseLoadQueryGenerator>(genParams, Rng.GenRand()))
+ );
+
+ genParams.DbPath = Params.DbPath + "/item";
+ LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>(
+ std::make_unique<TItemLoadQueryGenerator>(genParams, Rng.GenRand()))
+ );
+
+ genParams.DbPath = Params.DbPath + "/district";
+ LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>(
+ std::make_unique<TDistrictLoadQueryGenerator>(genParams, Rng.GenRand()))
+ );
+
+ for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) {
+ genParams.DbPath = Params.DbPath + "/customer";
+ LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>(
+ std::make_unique<TCustomerLoadQueryGenerator>(genParams, threadNum, Rng.GenRand()))
+ )
+;
+ }
+ for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) {
+ genParams.DbPath = Params.DbPath + "/history";
+ LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>(
+ std::make_unique<THistoryLoadQueryGenerator>(genParams, threadNum, Rng.GenRand()))
+ )
+;
+ }
+ for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) {
+ genParams.DbPath = Params.DbPath + "/new_order";
+ LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>(
+ std::make_unique<TNewOrderLoadQueryGenerator>(genParams, threadNum, Rng.GenRand()))
+ )
+;
+ }
+ for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) {
+ genParams.DbPath = Params.DbPath + "/oorder";
+ LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>(
+ std::make_unique<TOorderLoadQueryGenerator>(genParams, threadNum, Rng.GenRand()))
+ )
+;
+ }
+ for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) {
+ genParams.DbPath = Params.DbPath + "/stock";
+ LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>(
+ std::make_unique<TStockLoadQueryGenerator>(genParams, threadNum, Rng.GenRand()))
+ )
+;
+ }
+ for (i32 threadNum = 1; threadNum <= Params.Threads; ++threadNum) {
+ genParams.DbPath = Params.DbPath + "/order_line";
+ LoadThreadPool->SafeAddAndOwn(MakeHolder<TLoadTask>(
+ std::make_unique<TOrderLineLoadQueryGenerator>(genParams, threadNum, Rng.GenRand()))
+ )
+;
+ }
+ LoadThreadPool->Stop();
+
+ message = "Duration:" + (Now() - started).ToString() + "\nDONE\nConfiguring partitioning\n";
+ Log.Write(message.c_str(), message.size());
+
+ ConfigurePartitioning();
+
+ message = "DONE\n";
+ Log.Write(message.c_str(), message.size());
+
+ } catch (const TYdbErrorException& ex) {
+ Cerr << ex;
+ return EXIT_FAILURE;
+ } catch (...) {
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+}
+
+int TTPCCWorkload::CleanTables() {
+ try {
+ auto db = NTable::TTableClient(*Driver);
+
+ auto sessionResult = db.GetSession().GetValueSync();
+ ThrowOnError(sessionResult);
+
+ auto result = sessionResult.GetSession().ExecuteSchemeQuery(CleanTablesQuery()).GetValueSync();
+ ThrowOnError(result);
+ } catch (TYdbErrorException ex) {
+ Cerr << ex;
+ return EXIT_FAILURE;
+ } catch (...) {
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+}
+
+void TTPCCWorkload::ConfigurePartitioning() {
+ static const std::vector<TString> heavyTables {
+ "customer", "history", "new_order",
+ "oorder", "order_line", "stock"
+ };
+
+ auto tableClientSettings = NTable::TClientSettings()
+ .SessionPoolSettings(
+ NTable::TSessionPoolSettings()
+ .MaxActiveSessions(1));
+
+ TThreadResource rsc(std::make_shared<NTable::TTableClient>(*Driver, tableClientSettings));
+
+ TString query;
+ for (auto table: heavyTables) {
+ query += Sprintf(R"(
+ ALTER TABLE `%s` SET (
+ AUTO_PARTITIONING_BY_LOAD = %s,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %s,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %s
+ );
+ )", table.c_str(), Params.AutoPartitioning ? "ENABLED" : "DISABLED"
+ , ToString(Params.MinPartitions).c_str(), ToString(Params.MaxPartitions).c_str());
+ }
+
+ TStatus result = rsc.Session->ExecuteSchemeQuery(query.c_str()).GetValueSync();
+ if (!result.IsSuccess()) {
+ ythrow yexception() << "Configuring partitioning failed with issues: " << result.GetIssues().ToString();
+ }
+
+ std::vector<ui32> prevPartition(heavyTables.size());
+ TVector<NThreading::TFuture<void>> toWait;
+
+ while (true) {
+ toWait.clear();
+
+ std::atomic_bool done = true;
+ for (ui32 i = 0; i < heavyTables.size(); ++i) {
+ NThreading::TFuture<void> future = rsc.Session->DescribeTable(
+ Params.DbPath + "/" + heavyTables[i],
+ NTable::TDescribeTableSettings().WithTableStatistics(true)
+ ).Apply(
+ [&prevPartition, i, &done](const NTable::TAsyncDescribeTableResult& future) {
+ NTable::TDescribeTableResult result = future.GetValueSync();
+
+ if (!result.IsSuccess()) {
+ ythrow yexception() << "Getting describe table failed with issues: " << result.GetIssues().ToString();
+ }
+
+ ui32 partCount = result.GetTableDescription().GetPartitionsCount();
+ if (partCount != prevPartition[i]) {
+ prevPartition[i] = partCount;
+ done.store(false);
+ }
+ }
+ );
+ toWait.emplace_back(std::move(future));
+ }
+
+ NThreading::WaitExceptionOrAll(toWait).GetValueSync();
+
+ if (done.load()) {
+ break;
+ } else {
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+}
+
+TString TTPCCWorkload::PartitionByKeys(i32 keysCount, i32 partsCount) {
+ TStringStream partition;
+ for (i32 i = 1; i < partsCount; i++) {
+ partition << i * keysCount / partsCount;
+ if (i != partsCount - 1) {
+ partition << ", ";
+ }
+ }
+ return partition.Str();
+}
+
+int TTPCCWorkload::RunWorkload() {
+ Params.OrderLineIdConst = Rng.GenRand() % 8192;
+ return EXIT_FAILURE;
+}
+
+}
+}
diff --git a/ydb/library/workload/tpcc/tpcc_workload.h b/ydb/library/workload/tpcc/tpcc_workload.h
new file mode 100644
index 0000000000..485e571bd2
--- /dev/null
+++ b/ydb/library/workload/tpcc/tpcc_workload.h
@@ -0,0 +1,50 @@
+#pragma once
+
+#include "tpcc_config.h"
+
+#include <ydb/library/workload/tpcc/load_data/load_thread_pool.h>
+
+#include <ydb/public/lib/ydb_cli/common/command.h>
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+
+#include <library/cpp/logger/log.h>
+
+
+namespace NYdbWorkload {
+namespace NTPCC {
+
+using namespace NYdb;
+using namespace NConsoleClient;
+
+class TTPCCWorkload {
+public:
+ TTPCCWorkload(std::shared_ptr<TDriver>& driver, TTPCCWorkloadParams& params);
+
+ ~TTPCCWorkload() = default;
+
+ int InitTables();
+
+ int CleanTables();
+
+ int RunWorkload();
+
+private:
+ void ConfigurePartitioning();
+
+ TString CreateTablesQuery();
+ TString CleanTablesQuery();
+
+ TString PartitionByKeys(i32 keysCount, i32 partsCount);
+
+ TTPCCWorkloadParams Params;
+ std::unique_ptr<TLoadThreadPool> LoadThreadPool;
+
+ std::shared_ptr<TDriver> Driver;
+
+ TLog Log;
+ ui64 Seed;
+ TFastRng64 Rng;
+};
+
+}
+}
diff --git a/ydb/library/workload/workload_factory.h b/ydb/library/workload/workload_factory.h
index a786fae2c4..42f42e7035 100644
--- a/ydb/library/workload/workload_factory.h
+++ b/ydb/library/workload/workload_factory.h
@@ -9,6 +9,7 @@ namespace NYdbWorkload {
enum class EWorkload {
STOCK,
KV,
+ TPCC,
};
class TWorkloadFactory {
diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make
index 1e593d3d97..42b58b4059 100644
--- a/ydb/library/workload/ya.make
+++ b/ydb/library/workload/ya.make
@@ -4,8 +4,22 @@ SRCS(
stock_workload.cpp
kv_workload.cpp
workload_factory.cpp
+ tpcc/tpcc_workload.cpp
+ tpcc/load_data/customer.cpp
+ tpcc/load_data/district.cpp
+ tpcc/load_data/history.cpp
+ tpcc/load_data/item.cpp
+ tpcc/load_data/new_order.cpp
+ tpcc/load_data/oorder.cpp
+ tpcc/load_data/order_line.cpp
+ tpcc/load_data/query_generator.cpp
+ tpcc/load_data/stock.cpp
+ tpcc/load_data/warehouse.cpp
+ tpcc/load_data/load_thread_pool.cpp
)
+GENERATE_ENUM_SERIALIZATION(tpcc/tpcc_config.h)
+
PEERDIR(
ydb/public/api/protos
ydb/public/sdk/cpp/client/ydb_table
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt
index 2dedacd78f..7096310791 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.darwin-x86_64.txt
@@ -60,6 +60,7 @@ target_sources(clicommands PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_write_scenario.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_sdk_core_access.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt
index 6ff42c7956..fb96913a4d 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-aarch64.txt
@@ -61,6 +61,7 @@ target_sources(clicommands PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_write_scenario.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_sdk_core_access.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt
index 6ff42c7956..fb96913a4d 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.linux-x86_64.txt
@@ -61,6 +61,7 @@ target_sources(clicommands PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_write_scenario.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_sdk_core_access.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp
diff --git a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt
index 2dedacd78f..7096310791 100644
--- a/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/lib/ydb_cli/commands/CMakeLists.windows-x86_64.txt
@@ -60,6 +60,7 @@ target_sources(clicommands PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_write_scenario.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/topic_readwrite_scenario.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_sdk_core_access.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_command.cpp
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/commands/ydb_profile.cpp
diff --git a/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp b/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp
new file mode 100644
index 0000000000..53f4b5e6d8
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/tpcc_workload.cpp
@@ -0,0 +1,121 @@
+#include "tpcc_workload.h"
+
+#include <ydb/library/workload/tpcc/tpcc_workload.h>
+
+namespace NYdb {
+namespace NConsoleClient {
+
+TCommandTPCCWorkload::TCommandTPCCWorkload()
+ : TClientCommandTree("tpcc", {}, "YDB TPCC workload")
+{
+ AddCommand(std::make_unique<TTPCCInitCommand>());
+ AddCommand(std::make_unique<TTPCCCleanCommand>());
+}
+
+TTPCCWorkloadCommand::TTPCCWorkloadCommand(const TString& name, const std::initializer_list<TString>& aliases, const TString& description)
+ : TYdbCommand(name, aliases, description)
+ , ClientTimeoutMs(0)
+ , OperationTimeoutMs(0)
+ , CancelAfterTimeoutMs(0)
+ , WindowDurationSec(0)
+ , Quiet(false)
+ , PrintTimestamp(false)
+ , WindowHist(60000, 2) // highestTrackableValue 60000ms = 60s, precision 2
+ , TotalHist(60000, 2)
+ , TotalRetries(0)
+ , WindowRetryCount(0)
+ , TotalErrors(0)
+ , WindowErrors(0) {}
+
+struct TTPCCWorkloadStats {
+ ui64 OpsCount = 0;
+ ui64 Percentile50 = 0;
+ ui64 Percentile95 = 0;
+ ui64 Percentile99 = 0;
+ ui64 Percentile100 = 0;
+};
+
+void TTPCCWorkloadCommand::Config(TConfig& config) {
+ TYdbCommand::Config(config);
+
+ config.Opts->AddLongOption("quiet", "Quiet mode. Doesn't print statistics each second.")
+ .StoreTrue(&Quiet);
+ config.Opts->AddLongOption("print-timestamp", "Print timestamp each second with statistics.")
+ .StoreTrue(&PrintTimestamp);
+ config.Opts->AddLongOption("client-timeout", "Client timeout in ms.")
+ .DefaultValue(1000).StoreResult(&ClientTimeoutMs);
+ config.Opts->AddLongOption("operation-timeout", "Operation timeout in ms.")
+ .DefaultValue(800).StoreResult(&OperationTimeoutMs);
+ config.Opts->AddLongOption("cancel-after", "Cancel after timeout in ms.")
+ .DefaultValue(800).StoreResult(&CancelAfterTimeoutMs);
+ config.Opts->AddLongOption("window", "Window duration in seconds.")
+ .DefaultValue(1).StoreResult(&WindowDurationSec);
+}
+
+void TTPCCWorkloadCommand::PrepareForRun(TConfig& config) {
+ auto driverConfig = TDriverConfig()
+ .SetEndpoint(config.Address)
+ .SetDatabase(config.Database)
+ .SetBalancingPolicy(EBalancingPolicy::UseAllNodes)
+ .SetCredentialsProviderFactory(config.CredentialsGetter(config));
+ Params.DbPath = config.Database;
+
+ if (config.EnableSsl) {
+ driverConfig.UseSecureConnection(config.CaCerts);
+ }
+ Driver = std::make_shared<NYdb::TDriver>(NYdb::TDriver(driverConfig));
+ Workload = std::unique_ptr<TTPCCWorkload>(new TTPCCWorkload(Driver, Params));
+}
+
+TTPCCInitCommand::TTPCCInitCommand()
+ : TTPCCWorkloadCommand("init", {}, "Create and initialize tables for TPCC workload") {}
+
+void TTPCCInitCommand::Config(TConfig& config) {
+ TTPCCWorkloadCommand::Config(config);
+
+ config.SetFreeArgsNum(0);
+
+ config.Opts->AddLongOption('w', "warehouses", "The number of TPC-C warehouses.")
+ .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_WAREHOUSES)).StoreResult(&Params.Warehouses);
+ config.Opts->AddLongOption('t', "threads", "Number of parallel threads in workload.")
+ .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_THREADS)).StoreResult(&Params.Threads);
+ config.Opts->AddLongOption("load-batch-size", "The size of the batch of queries with which data is loaded.")
+ .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_LOAD_BATCH_SIZE)).StoreResult(&Params.LoadBatchSize);
+ config.Opts->AddLongOption("min-partitions", "Minimum partitioning of tables.")
+ .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_MIN_PARTITIONS)).StoreResult(&Params.MinPartitions);
+ config.Opts->AddLongOption("max-partitions", "Maximum partitioning of tables.")
+ .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_MAX_PARTITIONS)).StoreResult(&Params.MaxPartitions);
+ config.Opts->AddLongOption("auto-partitioning", "Enable auto partitioning by load.")
+ .DefaultValue(static_cast<int>(ETPCCWorkloadConstants::TPCC_AUTO_PARTITIONING)).StoreResult(&Params.AutoPartitioning, true);
+}
+
+int TTPCCInitCommand::Run(TConfig& config) {
+ PrepareForRun(config);
+ Cout << "INIT COMMAND\n";
+ Cout << "Warehouses: " << Params.Warehouses << "\nThreads: " << Params.Threads << "\n";
+ return Workload->InitTables();
+}
+
+void TTPCCInitCommand::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+TTPCCCleanCommand::TTPCCCleanCommand()
+ : TTPCCWorkloadCommand("clean", {}, "Drop the TPCC tables.") {}
+
+void TTPCCCleanCommand::Config(TConfig& config) {
+ TTPCCWorkloadCommand::Config(config);
+ config.SetFreeArgsNum(0);
+}
+
+int TTPCCCleanCommand::Run(TConfig& config) {
+ PrepareForRun(config);
+ return Workload->CleanTables();
+}
+
+void TTPCCCleanCommand::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+}
+}
diff --git a/ydb/public/lib/ydb_cli/commands/tpcc_workload.h b/ydb/public/lib/ydb_cli/commands/tpcc_workload.h
new file mode 100644
index 0000000000..e8a73f4c7c
--- /dev/null
+++ b/ydb/public/lib/ydb_cli/commands/tpcc_workload.h
@@ -0,0 +1,84 @@
+#pragma once
+
+#include <ydb/library/workload/tpcc/tpcc_config.h>
+#include <ydb/library/workload/tpcc/tpcc_workload.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_command.h>
+
+#include <library/cpp/histogram/hdr/histogram.h>
+#include <util/datetime/base.h>
+#include <util/system/spinlock.h>
+
+#include <memory>
+#include <string>
+
+namespace NYdb {
+namespace NConsoleClient {
+
+using namespace NYdbWorkload;
+using namespace NTPCC;
+
+class TCommandTPCCWorkload : public TClientCommandTree {
+public:
+ TCommandTPCCWorkload();
+};
+
+class TTPCCWorkloadCommand : public TYdbCommand {
+public:
+ TTPCCWorkloadCommand(
+ const TString& name,
+ const std::initializer_list<TString>& aliases = std::initializer_list<TString>(),
+ const TString& description = TString()
+ );
+
+ void Config(TConfig& config) override;
+ NTable::TSession GetSession();
+
+protected:
+ void PrepareForRun(TConfig& config);
+
+ std::shared_ptr<NYdb::TDriver> Driver;
+ std::unique_ptr<NTable::TTableClient> TableClient;
+
+ TTPCCWorkloadParams Params;
+ std::unique_ptr<TTPCCWorkload> Workload;
+ unsigned int ClientTimeoutMs;
+ unsigned int OperationTimeoutMs;
+ unsigned int CancelAfterTimeoutMs;
+ unsigned int WindowDurationSec;
+ bool Quiet;
+ bool PrintTimestamp;
+
+ TInstant StartTime;
+ TInstant StopTime;
+
+ // Think about moving histograms to workload library.
+ // Histograms will also be useful in actor system workload.
+ TSpinLock HdrLock;
+ NHdr::THistogram WindowHist;
+ NHdr::THistogram TotalHist;
+
+ std::atomic_uint64_t TotalRetries;
+ std::atomic_uint64_t WindowRetryCount;
+ std::atomic_uint64_t TotalErrors;
+ std::atomic_uint64_t WindowErrors;
+};
+
+class TTPCCInitCommand : public TTPCCWorkloadCommand {
+public:
+ TTPCCInitCommand();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+};
+
+class TTPCCCleanCommand : public TTPCCWorkloadCommand {
+public:
+ TTPCCCleanCommand();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+};
+
+}
+
+}
diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make
index 4626a01b41..f6d63294c0 100644
--- a/ydb/public/lib/ydb_cli/commands/ya.make
+++ b/ydb/public/lib/ydb_cli/commands/ya.make
@@ -12,6 +12,7 @@ SRCS(
topic_write_scenario.cpp
topic_readwrite_scenario.cpp
tpch.cpp
+ tpcc_workload.cpp
ydb_sdk_core_access.cpp
ydb_command.cpp
ydb_profile.cpp
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
index baf6454146..9649ed13d9 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
@@ -4,14 +4,15 @@
#include "kv_workload.h"
#include "click_bench.h"
#include "tpch.h"
+#include "tpcc_workload.h"
#include "topic_workload/topic_workload.h"
#include "transfer_workload/transfer_workload.h"
-#include "util/random/random.h"
#include "ydb/library/yverify_stream/yverify_stream.h"
#include <ydb/library/workload/workload_factory.h>
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
+#include <util/random/random.h>
#include <library/cpp/threading/local_executor/local_executor.h>
#include <atomic>
@@ -46,6 +47,7 @@ TCommandWorkload::TCommandWorkload()
AddCommand(std::make_unique<TCommandWorkloadTopic>());
AddCommand(std::make_unique<TCommandWorkloadTransfer>());
AddCommand(std::make_unique<TCommandTpch>());
+ AddCommand(std::make_unique<TCommandTPCCWorkload>());
}
TWorkloadCommand::TWorkloadCommand(const TString& name, const std::initializer_list<TString>& aliases, const TString& description)