diff options
author | Evgeniy Ivanov <eivanov89@ydb.tech> | 2025-06-18 10:43:55 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-18 11:43:55 +0300 |
commit | abeada2245d690448e23e82b049de75ebdfced38 (patch) | |
tree | a86255107897d6a20666719094b10ff666c82dcc | |
parent | 3335cfb9021ae7a96f69c9e1d8edbe774bcfb3ef (diff) | |
download | ydb-abeada2245d690448e23e82b049de75ebdfced38.tar.gz |
Tpcc cli and import cleanup (#17333) (#19771)
-rw-r--r-- | ydb/library/workload/tpcc/constants.h | 3 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/import.cpp | 40 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/runner.cpp | 85 | ||||
-rw-r--r-- | ydb/library/workload/tpcc/runner.h | 39 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload_tpcc.cpp | 136 |
5 files changed, 200 insertions, 103 deletions
diff --git a/ydb/library/workload/tpcc/constants.h b/ydb/library/workload/tpcc/constants.h index 9158c78f7b3..8659531f507 100644 --- a/ydb/library/workload/tpcc/constants.h +++ b/ydb/library/workload/tpcc/constants.h @@ -85,4 +85,7 @@ enum class ETransactionType { constexpr const size_t TUI_LOG_LINES = 10; +// lower limit, real number is higher +constexpr const size_t WAREHOUSES_PER_CPU_CORE = 1500; + } // namespace NYdb::NTPCC diff --git a/ydb/library/workload/tpcc/import.cpp b/ydb/library/workload/tpcc/import.cpp index 0428763a54b..50df64f3271 100644 --- a/ydb/library/workload/tpcc/import.cpp +++ b/ydb/library/workload/tpcc/import.cpp @@ -792,7 +792,12 @@ public: } void ImportSync() { - Config.SetDisplayUpdateInterval(); + if (Config.WarehouseCount == 0) { + std::cerr << "Specified zero warehouses" << std::endl; + std::exit(1); + } + + Config.SetDisplay(); CalculateApproximateDataSize(); // we want to switch buffers and draw UI ASAP to properly display logs @@ -801,12 +806,19 @@ public: UpdateDisplayIfNeeded(Clock::now()); } + // TODO: detect number of threads + if (Config.LoadThreadCount == 0) { + LOG_W("Automatic calculation of loading threads is not implemented, falling back to the default"); + Config.LoadThreadCount = DEFAULT_LOAD_THREAD_COUNT; + } + // in particular this log message LOG_I("Starting TPC-C data import for " << Config.WarehouseCount << " warehouses using " << - Config.ThreadCount << " threads. Approximate data size: " << GetFormattedSize(LoadState.ApproximateDataSize)); + Config.LoadThreadCount << " threads. Approximate data size: " + << GetFormattedSize(LoadState.ApproximateDataSize)); // TODO: detect number of threads - size_t threadCount = std::min(Config.WarehouseCount, Config.ThreadCount ); + size_t threadCount = std::min(Config.WarehouseCount, Config.LoadThreadCount); threadCount = std::max(threadCount, size_t(1)); // TODO: calculate optimal number of drivers (but per thread looks good) @@ -1071,7 +1083,7 @@ private: std::stringstream headerSs; headerSs << "TPC-C Import: " << Config.WarehouseCount << " warehouses, " - << Config.ThreadCount << " threads Estimated size: " + << Config.LoadThreadCount << " threads Estimated size: " << GetFormattedSize(LoadState.ApproximateDataSize); std::stringstream progressSs; @@ -1101,21 +1113,19 @@ private: text(speedSs.str()) }); - auto topRow = hbox({ - importDetails | flex, - separator() - }); + auto topRow = window(text("TPC-C data upload"), hbox({ + importDetails + })); // Index progress section (always shown) Elements indexElements; TString indexText; if (LoadState.IndexBuildStates.empty()) { - indexText = "Index Creation Progress didn't start"; + indexText = "Index Creation Didn't Start"; } else { - indexText = "Index Creation Progress:"; + indexText = "Index Creation"; } - indexElements.push_back(text(indexText)); if (LoadState.IndexBuildStates.empty()) { // Index building not started yet, need to leave enough space @@ -1152,11 +1162,13 @@ private: } } + auto indicesRow = window(text(indexText), vbox(indexElements)); + // Create scrollable logs panel Elements logElements; LogBackend->GetLogLines([&](const std::string& line) { - logElements.push_back(text(line)); + logElements.push_back(paragraph(line)); }); auto logsContent = vbox(logElements); @@ -1166,9 +1178,7 @@ private: auto layout = vbox({ topRow, - separator(), - vbox(indexElements), - separator(), + indicesRow, logsPanel | flex }); diff --git a/ydb/library/workload/tpcc/runner.cpp b/ydb/library/workload/tpcc/runner.cpp index 775c3fbd533..b606737286d 100644 --- a/ydb/library/workload/tpcc/runner.cpp +++ b/ydb/library/workload/tpcc/runner.cpp @@ -8,6 +8,7 @@ #include "transactions.h" #include <ydb/public/lib/ydb_cli/commands/ydb_command.h> +#include <ydb/public/lib/ydb_cli/common/interactive.h> #include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h> #include <library/cpp/logger/log.h> @@ -216,14 +217,27 @@ TPCCRunner::TPCCRunner(const NConsoleClient::TClientCommand::TConfig& connection std::exit(1); } - const size_t networkThreadCount = NConsoleClient::TYdbCommand::GetNetworkThreadNum(ConnectionConfig); - const size_t maxTerminalThreadCount = cpuCount > networkThreadCount ? cpuCount - networkThreadCount : 1; + if (Config.WarehouseCount == 0) { + std::cerr << "Specified zero warehouses" << std::endl; + std::exit(1); + } const size_t terminalsCount = Config.WarehouseCount * TERMINALS_PER_WAREHOUSE; - // we might consider using less than maxTerminalThreads - const size_t threadCount = Config.ThreadCount == 0 ? - std::min(maxTerminalThreadCount, terminalsCount) : Config.ThreadCount; + size_t threadCount = 0; + if (Config.ThreadCount == 0) { + // here we calculate max possible efficient thread number + const size_t networkThreadCount = NConsoleClient::TYdbCommand::GetNetworkThreadNum(ConnectionConfig); + const size_t maxTerminalThreadCount = cpuCount > networkThreadCount ? cpuCount - networkThreadCount : 1; + threadCount = std::min(maxTerminalThreadCount, terminalsCount); + + // usually this allows to lower number of threads + const size_t recommendedThreadCount = + (Config.WarehouseCount + WAREHOUSES_PER_CPU_CORE - 1) / WAREHOUSES_PER_CPU_CORE; + threadCount = std::min(threadCount, recommendedThreadCount); + } else { + threadCount = Config.ThreadCount; + } // The number of terminals might be hundreds of thousands. // For now, we don't have more than 32 network threads (check TYdbCommand::GetNetworkThreadNum()), @@ -351,7 +365,7 @@ void TPCCRunner::Join() { } void TPCCRunner::RunSync() { - Config.SetDisplayUpdateInterval(); + Config.SetDisplay(); Clock::time_point now = Clock::now(); @@ -363,14 +377,14 @@ void TPCCRunner::RunSync() { // We don't want to start all terminals at the same time, because then there will be // a huge queue of ready terminals, which we can't handle bool forcedWarmup = false; - int minWarmupSeconds = Terminals.size() * MinWarmupPerTerminal.count() / 1000 + 1; - int minWarmupMinutes = (minWarmupSeconds + 59) / 60; - int warmupMinutes; - if (Config.WarmupMinutes < minWarmupMinutes) { + uint32_t minWarmupSeconds = Terminals.size() * MinWarmupPerTerminal.count() / 1000 + 1; + uint32_t minWarmupMinutes = (minWarmupSeconds + 59) / 60; + uint32_t warmupMinutes; + if (Config.WarmupDuration.Minutes() < minWarmupMinutes) { forcedWarmup = true; // we must print log message later after display update warmupMinutes = minWarmupMinutes; } else { - warmupMinutes = Config.WarmupMinutes; + warmupMinutes = Config.WarmupDuration.Minutes(); } WarmupStartTs = Clock::now(); @@ -411,14 +425,14 @@ void TPCCRunner::RunSync() { StopWarmup.store(true, std::memory_order_relaxed); - LOG_I("Measuring during " << Config.RunMinutes << " minutes"); + LOG_I("Measuring during " << Config.RunDuration); MeasurementsStartTs = Clock::now(); // reset statistics LastStatisticsSnapshot = std::make_unique<TAllStatistics>(PerThreadTerminalStats.size(), MeasurementsStartTs); - StopDeadline = MeasurementsStartTs + std::chrono::minutes(Config.RunMinutes); + StopDeadline = MeasurementsStartTs + std::chrono::seconds(Config.RunDuration.Seconds()); while (!GetGlobalInterruptSource().stop_requested()) { if (now >= StopDeadline) { break; @@ -469,10 +483,12 @@ void TPCCRunner::UpdateDisplayTextMode(const TCalculatedStatusData& data) { ss << std::endl << "Efficiency: " << std::setprecision(1) << data.Efficiency << "% | " << "tpmC: " << std::setprecision(1) << data.Tpmc; - std::cout << ss.str(); + LOG_I(ss.str()); // Per thread statistics (two columns) - std::cout << "\nPer thread statistics:" << std::endl; + + std::stringstream debugSs; + debugSs << "\nPer thread statistics:" << std::endl; size_t threadCount = LastStatisticsSnapshot->StatVec.size(); size_t halfCount = (threadCount + 1) / 2; @@ -487,10 +503,10 @@ void TPCCRunner::UpdateDisplayTextMode(const TCalculatedStatusData& data) { << std::setw(15) << "queue p90, ms"; // Print headers side by side - std::cout << threadsHeader.str() << " | " << threadsHeader.str() << std::endl; + debugSs << threadsHeader.str() << " | " << threadsHeader.str() << std::endl; size_t totalWidth = threadsHeader.str().length() * 2 + 3; - std::cout << std::string(totalWidth, '-') << std::endl; + debugSs << std::string(totalWidth, '-') << std::endl; // Print thread data in two columns for (size_t i = 0; i < halfCount; ++i) { @@ -525,13 +541,15 @@ void TPCCRunner::UpdateDisplayTextMode(const TCalculatedStatusData& data) { rightLine << std::string(threadsHeader.str().length(), ' '); } - std::cout << leftLine.str() << " | " << rightLine.str() << std::endl; + debugSs << leftLine.str() << " | " << rightLine.str() << std::endl; } - std::cout << std::string(totalWidth, '-') << std::endl; + debugSs << std::string(totalWidth, '-') << std::endl; // Transaction statistics - std::cout << "\n\n"; - PrintTransactionStatisticsPretty(std::cout); + debugSs << "\n"; + PrintTransactionStatisticsPretty(debugSs); + + LOG_D(debugSs.str()); } void TPCCRunner::UpdateDisplayTuiMode(const TCalculatedStatusData& data) { @@ -903,6 +921,31 @@ void TPCCRunner::PrintFinalResultPretty() { //----------------------------------------------------------------------------- +void TRunConfig::SetDisplay() { + if (NoTui) { + DisplayMode = EDisplayMode::Text; + } else { + if (NConsoleClient::IsStdoutInteractive()) { + DisplayMode = EDisplayMode::Tui; + } else { + DisplayMode = EDisplayMode::Text; + } + } + + switch (DisplayMode) { + case EDisplayMode::None: + return; + case EDisplayMode::Text: + DisplayUpdateInterval = DisplayUpdateTextInterval; + return; + case EDisplayMode::Tui: + DisplayUpdateInterval = DisplayUpdateTuiInterval; + return; + } +} + +//----------------------------------------------------------------------------- + void RunSync(const NConsoleClient::TClientCommand::TConfig& connectionConfig, const TRunConfig& runConfig) { TPCCRunner runner(connectionConfig, runConfig); runner.RunSync(); diff --git a/ydb/library/workload/tpcc/runner.h b/ydb/library/workload/tpcc/runner.h index c4c0a8c2ad6..34cec46bda3 100644 --- a/ydb/library/workload/tpcc/runner.h +++ b/ydb/library/workload/tpcc/runner.h @@ -4,14 +4,20 @@ #include <library/cpp/logger/priority.h> +#include <util/datetime/base.h> + #include <stop_token> namespace NYdb::NTPCC { constexpr int DEFAULT_WAREHOUSE_COUNT = 1; -constexpr int DEFAULT_WARMUP_MINUTES = 1; // TODO -constexpr int DEFAULT_RUN_MINUTES = 2; // TODO +constexpr TDuration DEFAULT_WARMUP_DURATION = TDuration::Minutes(1); // TODO +constexpr TDuration DEFAULT_RUN_DURATION = TDuration::Minutes(2); // TODO + constexpr int DEFAULT_MAX_SESSIONS = 100; // TODO +constexpr int DEFAULT_THREAD_COUNT = 0; // autodetect +constexpr int DEFAULT_LOAD_THREAD_COUNT = 10; + constexpr int DEFAULT_LOG_LEVEL = 6; // TODO: properly use enum struct TRunConfig { @@ -21,6 +27,11 @@ struct TRunConfig { Tui, }; + enum class EFormat { + Pretty = 0, + Json, + }; + TRunConfig() = default; void SetFullPath(const NConsoleClient::TClientCommand::TConfig& connectionConfig) { if (Path.empty()) { @@ -35,37 +46,29 @@ struct TRunConfig { Path = connectionConfig.Database + '/' + Path; } - void SetDisplayUpdateInterval() { - switch (DisplayMode) { - case EDisplayMode::None: - return; - case EDisplayMode::Text: - DisplayUpdateInterval = DisplayUpdateTextInterval; - return; - case EDisplayMode::Tui: - DisplayUpdateInterval = DisplayUpdateTuiInterval; - return; - break; - } - } + void SetDisplay(); int WarehouseCount = DEFAULT_WAREHOUSE_COUNT; - int WarmupMinutes = DEFAULT_WARMUP_MINUTES; - int RunMinutes = DEFAULT_RUN_MINUTES; + TDuration WarmupDuration = DEFAULT_WARMUP_DURATION; + TDuration RunDuration = DEFAULT_RUN_DURATION; int MaxInflight = DEFAULT_MAX_SESSIONS; TString Path; + EFormat Format = EFormat::Pretty; + TString JsonResultPath; // advanced settings (normally, used by developer only) - int ThreadCount = 0; + int ThreadCount = DEFAULT_THREAD_COUNT; + int LoadThreadCount = DEFAULT_LOAD_THREAD_COUNT; int DriverCount = 0; ELogPriority LogPriority = static_cast<ELogPriority>(DEFAULT_LOG_LEVEL); bool NoDelays = false; bool ExtendedStats = false; + bool NoTui = false; EDisplayMode DisplayMode = EDisplayMode::None; // instead of actual transaction just async sleep and return SUCCESS diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_tpcc.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_tpcc.cpp index 58dd963de34..dc92c512958 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload_tpcc.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_tpcc.cpp @@ -77,6 +77,7 @@ public: TCommandTPCCImport(std::shared_ptr<NTPCC::TRunConfig> runConfig); ~TCommandTPCCImport() = default; + virtual void Config(TConfig& config) override; virtual int Run(TConfig& config) override; private: @@ -89,6 +90,41 @@ TCommandTPCCImport::TCommandTPCCImport(std::shared_ptr<NTPCC::TRunConfig> runCon { } +void TCommandTPCCImport::Config(TConfig& config) { + TYdbCommand::Config(config); + + config.Opts->AddLongOption( + 'w', "warehouses", TStringBuilder() << "Number of warehouses") + .OptionalArgument("INT").StoreResult(&RunConfig->WarehouseCount).DefaultValue(RunConfig->WarehouseCount); + + // TODO: detect automatically + config.Opts->AddLongOption( + "threads", TStringBuilder() << "Number of threads loading the data") + .RequiredArgument("INT").StoreResult(&RunConfig->LoadThreadCount).DefaultValue(RunConfig->LoadThreadCount); + + config.Opts->AddLongOption( + "no-tui", TStringBuilder() << "Disable TUI (by default autodected)") + .Optional().StoreTrue(&RunConfig->NoTui); + + // advanced hidden options (mainly for developers) + + auto logLevelOpt = config.Opts->AddLongOption( + "log-level", TStringBuilder() << "Log level from 0 to 8, default is 6 (INFO)") + .Optional().StoreMappedResult(&RunConfig->LogPriority, [](const TString& v) { + return FromString<ELogPriority>(v); + }).DefaultValue(RunConfig->LogPriority); + + auto connectionsOpt = config.Opts->AddLongOption( + "connections", TStringBuilder() << "Number of SDK driver/client instances (default: auto)") + .RequiredArgument("INT").StoreResult(&RunConfig->DriverCount).DefaultValue(0); + + // for now. Later might be "config.HelpCommandVerbosiltyLevel <= 1" or advanced section + if (true) { + logLevelOpt.Hidden(); + connectionsOpt.Hidden(); + } +} + int TCommandTPCCImport::Run(TConfig& connectionConfig) { RunConfig->SetFullPath(connectionConfig); NTPCC::ImportSync(connectionConfig, *RunConfig); @@ -120,41 +156,74 @@ TCommandTPCCRun::TCommandTPCCRun(std::shared_ptr<NTPCC::TRunConfig> runConfig) void TCommandTPCCRun::Config(TConfig& config) { TYdbCommand::Config(config); + config.Opts->AddLongOption( + 'w', "warehouses", TStringBuilder() << "Number of warehouses") + .OptionalArgument("INT").StoreResult(&RunConfig->WarehouseCount).DefaultValue(RunConfig->WarehouseCount); + // TODO: default value should be auto config.Opts->AddLongOption( - "warmup", TStringBuilder() << "Warmup time in minutes") - .RequiredArgument("INT").StoreResult(&RunConfig->WarmupMinutes).DefaultValue(RunConfig->WarmupMinutes); + "warmup", TStringBuilder() << "Warmup time. Example: 10s, 5m, 1h") + .RequiredArgument("DURATION").StoreResult(&RunConfig->WarmupDuration).DefaultValue(RunConfig->WarmupDuration); + config.Opts->AddLongOption( - 't', "time", TStringBuilder() << "Execution time in minutes") - .RequiredArgument("INT").StoreResult(&RunConfig->RunMinutes).DefaultValue(RunConfig->RunMinutes); + 't', "time", TStringBuilder() << "Execution time. Example: 10s, 5m, 1h") + .RequiredArgument("DURATION").StoreResult(&RunConfig->RunDuration).DefaultValue(RunConfig->RunDuration); // TODO: default value should be auto config.Opts->AddLongOption( 'm', "max-sessions", TStringBuilder() << "Soft limit on number of DB sessions") .RequiredArgument("INT").StoreResult(&RunConfig->MaxInflight).DefaultValue(RunConfig->MaxInflight); + // TODO: detect automatically config.Opts->AddLongOption( - "json-result-path", TStringBuilder() << "Store the result in JSON format at the specified path") - .OptionalArgument("STRING").StoreResult(&RunConfig->JsonResultPath).DefaultValue(""); + "threads", TStringBuilder() << "Number of threads executing queries (by default autodected)") + .RequiredArgument("INT").StoreResult(&RunConfig->ThreadCount); - // advanced options mainly for developers (all hidden) + config.Opts->AddLongOption( + 'f', "format", TStringBuilder() << "Output format: " << GetEnumAllNames<NTPCC::TRunConfig::EFormat>()) + .OptionalArgument("STRING").StoreResult(&RunConfig->Format).DefaultValue(RunConfig->Format); config.Opts->AddLongOption( + "no-tui", TStringBuilder() << "Disable TUI (by default autodected)") + .Optional().StoreTrue(&RunConfig->NoTui); + + // advanced hidden options (mainly for developers) + + auto extendedStatsOpt = config.Opts->AddLongOption( + "extended-stats", TStringBuilder() << "Print additional statistics") + .Optional().StoreTrue(&RunConfig->ExtendedStats); + + auto logLevelOpt = config.Opts->AddLongOption( + "log-level", TStringBuilder() << "Log level from 0 to 8, default is 6 (INFO)") + .Optional().StoreMappedResult(&RunConfig->LogPriority, [](const TString& v) { + return FromString<ELogPriority>(v); + }).DefaultValue(RunConfig->LogPriority); + + auto connectionsOpt = config.Opts->AddLongOption( "connections", TStringBuilder() << "Number of SDK driver/client instances (default: auto)") - .RequiredArgument("INT").StoreResult(&RunConfig->DriverCount).DefaultValue(0) - .Hidden(); - config.Opts->AddLongOption( + .RequiredArgument("INT").StoreResult(&RunConfig->DriverCount).DefaultValue(0); + + auto noDelaysOpt = config.Opts->AddLongOption( "no-delays", TStringBuilder() << "Disable TPC-C keying/thinking delays") - .Optional().StoreTrue(&RunConfig->NoDelays) - .Hidden(); - config.Opts->AddLongOption( + .Optional().StoreTrue(&RunConfig->NoDelays); + + auto simulateOpt = config.Opts->AddLongOption( "simulate", TStringBuilder() << "Simulate transaction execution (delay is simulated transaction latency ms)") - .OptionalArgument("INT").StoreResult(&RunConfig->SimulateTransactionMs).DefaultValue(0) - .Hidden(); - config.Opts->AddLongOption( + .OptionalArgument("INT").StoreResult(&RunConfig->SimulateTransactionMs).DefaultValue(0); + + auto simulateSelect1Opt = config.Opts->AddLongOption( "simulate-select1", TStringBuilder() << "Instead of real queries, execute specified number of SELECT 1 queries") - .OptionalArgument("INT").StoreResult(&RunConfig->SimulateTransactionSelect1Count).DefaultValue(0) - .Hidden(); + .OptionalArgument("INT").StoreResult(&RunConfig->SimulateTransactionSelect1Count).DefaultValue(0); + + // for now. Later might be "config.HelpCommandVerbosiltyLevel <= 1" or advanced section + if (true) { + extendedStatsOpt.Hidden(); + logLevelOpt.Hidden(); + connectionsOpt.Hidden(); + noDelaysOpt.Hidden(); + simulateOpt.Hidden(); + simulateSelect1Opt.Hidden(); + } } int TCommandTPCCRun::Run(TConfig& connectionConfig) { @@ -180,40 +249,9 @@ TCommandTPCC::TCommandTPCC() void TCommandTPCC::Config(TConfig& config) { TClientCommandTree::Config(config); - const TString& availableMonitoringModes = GetEnumAllNames<NTPCC::TRunConfig::EDisplayMode>(); - config.Opts->AddLongOption( 'p', "path", TStringBuilder() << "Database path where benchmark tables are located") .RequiredArgument("STRING").StoreResult(&RunConfig->Path); - - config.Opts->AddLongOption( - 'w', "warehouses", TStringBuilder() << "Number of warehouses") - .OptionalArgument("INT").StoreResult(&RunConfig->WarehouseCount).DefaultValue(RunConfig->WarehouseCount); - - config.Opts->AddLongOption( - "display-mode", TStringBuilder() << "Benchmark execution display mode: " << availableMonitoringModes) - .OptionalArgument("STRING") - .StoreResult(&RunConfig->DisplayMode).DefaultValue(NTPCC::TRunConfig::EDisplayMode::None); - - // advanced options mainly for developers (all hidden) - - // TODO: detect automatically - config.Opts->AddLongOption( - "threads", TStringBuilder() << "Number of threads executing queries (default: auto)") - .RequiredArgument("INT").StoreResult(&RunConfig->ThreadCount).DefaultValue(0) - .Hidden(); - - config.Opts->AddLongOption( - "extended-stats", TStringBuilder() << "Print additional statistics") - .Optional().StoreTrue(&RunConfig->ExtendedStats) - .Hidden(); - config.Opts->AddLongOption( - "log-level", TStringBuilder() << "Log level from 0 to 8, default is 6 (INFO)") - .Optional().StoreMappedResult(&RunConfig->LogPriority, [](const TString& v) { - int intValue = FromString<int>(v); - return static_cast<ELogPriority>(intValue); - }).DefaultValue(RunConfig->LogPriority) - .Hidden(); } } // namespace NYdb::NConsoleClient |