diff options
| author | Олег <[email protected]> | 2025-05-29 12:22:27 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-05-29 09:22:27 +0000 |
| commit | 61ea77b1a877ab9cb011361696ddcc9c90896deb (patch) | |
| tree | 7fb61da9793153996dc19a224c4cfc66656a57bf | |
| parent | 4ab3a5e265c54c5462eb7311554eaded2f9e6f9c (diff) | |
Improve parallel clickbench (#18977)
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/ya.make | 1 | ||||
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp | 22 | ||||
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_benchmark.h | 7 | ||||
| -rw-r--r-- | ydb/tests/functional/tpc/medium/test_clickbench.py | 2 | ||||
| -rw-r--r-- | ydb/tests/olap/lib/ydb_cli.py | 15 | ||||
| -rw-r--r-- | ydb/tests/olap/load/lib/clickbench.py | 26 | ||||
| -rw-r--r-- | ydb/tests/olap/load/lib/conftest.py | 19 |
7 files changed, 60 insertions, 32 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make index 5d17b732d6c..e63b14bd19f 100644 --- a/ydb/public/lib/ydb_cli/commands/ya.make +++ b/ydb/public/lib/ydb_cli/commands/ya.make @@ -79,7 +79,6 @@ PEERDIR( yql/essentials/public/decimal ) -GENERATE_ENUM_SERIALIZATION(ydb_benchmark.h) GENERATE_ENUM_SERIALIZATION(ydb_ping.h) GENERATE_ENUM_SERIALIZATION(ydb_latency.h) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp index fcd2d483e9d..b0032d0eb9a 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp @@ -142,6 +142,7 @@ TVector<TString> ColumnNames { "RttMin", "RttMax", "RttAvg", + "GrossTime", "SuccessCount", "FailsCount", "DiffsCount" @@ -158,6 +159,8 @@ struct TTestInfoProduct { double Median = 1; double UnixBench = 1; double Std = 0; + std::vector<TDuration> ClientTimings; + void operator *=(const BenchmarkUtils::TTestInfo& other) { ColdTime *= other.ColdTime.MillisecondsFloat(); Min *= other.Min.MillisecondsFloat(); @@ -279,6 +282,11 @@ void CollectStats(TPrettyTable& table, IOutputStream* csv, NJson::TJsonValue* js CollectField<true>(row, index++, csv, json, name, testInfo.RttMin); CollectField<true>(row, index++, csv, json, name, testInfo.RttMax); CollectField<true>(row, index++, csv, json, name, testInfo.RttMean); + auto grossTime = TDuration::Zero(); + for (const auto& clientTime: testInfo.ClientTimings) { + grossTime += clientTime; + } + CollectField<true>(row, index++, csv, json, name, grossTime); CollectField(row, index++, csv, json, name, sCount); CollectField(row, index++, csv, json, name, fCount); CollectField(row, index++, csv, json, name, dCount); @@ -303,7 +311,7 @@ public: void Process(void*) override { bool execute = Iteration >= 0; // explain in other case - if (Owner.Threads <= 1) { + if (Owner.Threads == 0) { PrintQueryHeader(); } auto t1 = TInstant::Now(); @@ -331,7 +339,7 @@ public: } ClientDuration = TInstant::Now() - t1; Owner.SavePlans(Result, QueryName, execute ? ToString(Iteration) : "explain"); - if (Owner.Threads <= 1) { + if (Owner.Threads == 0) { PrintResult(); } } @@ -396,7 +404,7 @@ int TWorkloadCommandBenchmark::RunBench(NYdbWorkload::IWorkloadQueryGenerator& w iterations.emplace_back(MakeIntrusive<TIterationExecution>(*this, query, queryName, qInfo.ExpectedResult.c_str())); } } - if (Threads > 1) { + if (Threads > 0) { Shuffle(iterations.begin(), iterations.end()); } TMap<TString, TIterations> queryExecByName; @@ -408,11 +416,13 @@ int TWorkloadCommandBenchmark::RunBench(NYdbWorkload::IWorkloadQueryGenerator& w GlobalDeadline = (GlobalTimeout != TDuration::Zero()) ? Now() + GlobalTimeout : TInstant::Max(); TThreadPool pool; - pool.Start(Threads > 1 ? Threads : 0); + pool.Start(Threads); + const auto startTime = Now(); for (auto iter: iterations) { pool.SafeAdd(iter.Get()); } pool.Stop(); + const auto grossTime = Now() - startTime; ui32 queriesWithAllSuccess = 0; ui32 queriesWithSomeFails = 0; @@ -448,7 +458,7 @@ int TWorkloadCommandBenchmark::RunBench(NYdbWorkload::IWorkloadQueryGenerator& w std::optional<TString> prevResult; TOFStream outFStream(TStringBuilder() << OutFilePath << "." << queryName << ".out"); for (const auto& iterExec: queryExec) { - if (Threads > 1) { + if (Threads > 0) { iterExec->PrintQueryHeader(); iterExec->PrintResult(); } @@ -506,8 +516,10 @@ int TWorkloadCommandBenchmark::RunBench(NYdbWorkload::IWorkloadQueryGenerator& w } if (queriesWithAllSuccess) { + sumInfo.ClientTimings.push_back(grossTime); CollectStats(statTable, csvReport.Get(), jsonReport.Get(), "Sum", queriesWithAllSuccess, queriesWithSomeFails, queriesWithDiff, sumInfo); sumInfo /= queriesWithAllSuccess; + sumInfo.ClientTimings.back() = grossTime / queriesWithAllSuccess; CollectStats(statTable, csvReport.Get(), jsonReport.Get(), "Avg", queriesWithAllSuccess, queriesWithSomeFails, queriesWithDiff, sumInfo); productInfo ^= queriesWithAllSuccess; CollectStats(statTable, csvReport.Get(), jsonReport.Get(), "GAvg", queriesWithAllSuccess, queriesWithSomeFails, queriesWithDiff, productInfo); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.h b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.h index d1c1857967d..3083824b176 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.h @@ -11,11 +11,6 @@ namespace BenchmarkUtils { class TWorkloadCommandBenchmark final: public TWorkloadCommandBase { public: - enum class EQueryExecutor { - Scan /* "scan" */, - Generic /* "generic" */ - }; - TWorkloadCommandBenchmark(NYdbWorkload::TWorkloadParams& params, const NYdbWorkload::IWorkloadQueryGenerator::TWorkloadType& workload); virtual void Config(TConfig& config) override; @@ -47,7 +42,7 @@ private: TDuration RequestTimeout = TDuration::Zero(); TInstant GlobalDeadline = TInstant::Max(); NYdb::NRetry::TRetryOperationSettings RetrySettings; - ui32 Threads = 1; + ui32 Threads = 0; }; }
\ No newline at end of file diff --git a/ydb/tests/functional/tpc/medium/test_clickbench.py b/ydb/tests/functional/tpc/medium/test_clickbench.py index dae272467a0..f96bf129676 100644 --- a/ydb/tests/functional/tpc/medium/test_clickbench.py +++ b/ydb/tests/functional/tpc/medium/test_clickbench.py @@ -15,7 +15,7 @@ class TestClickbench(clickbench.TestClickbench, FunctionalTestBase): super().setup_class() -class TestClickbenchParallel(clickbench.TestClickbenchParallel, FunctionalTestBase): +class TestClickbenchParallel(clickbench.TestClickbenchParallel8, FunctionalTestBase): verify_data: bool = False iterations: int = 2 diff --git a/ydb/tests/olap/lib/ydb_cli.py b/ydb/tests/olap/lib/ydb_cli.py index 02b44ffa4e8..b591be224fa 100644 --- a/ydb/tests/olap/lib/ydb_cli.py +++ b/ydb/tests/olap/lib/ydb_cli.py @@ -136,7 +136,7 @@ class YdbCliHelper: def __init__(self, workload_type: WorkloadType, db_path: str, - query_names: set[str], + query_names: list[str], iterations: int, timeout: float, check_canonical: CheckCanonicalPolicy, @@ -160,7 +160,7 @@ class YdbCliHelper: self.returncode = None self.stderr = None self.stdout = None - self.__prefix = md5(','.join(query_names).encode()).hexdigest() if len(query_names) != 1 else [q for q in query_names][0] + self.__prefix = md5(','.join(query_names).encode()).hexdigest() if len(query_names) != 1 else query_names[0] self.__plan_path = f'{self.__prefix}.plan' self.__query_output_path = f'{self.__prefix}.result' self.json_path = f'{self.__prefix}.stats.json' @@ -196,7 +196,7 @@ class YdbCliHelper: cmd += ['--syntax', self.query_syntax] if self.scale is not None and self.scale > 0: cmd += ['--scale', str(self.scale)] - if self.threads > 1: + if self.threads > 0: cmd += ['--threads', str(self.threads)] return cmd @@ -335,9 +335,9 @@ class YdbCliHelper: self.__process_returncode() @staticmethod - def workload_run(workload_type: WorkloadType, path: str, query_names: str, iterations: int = 5, + def workload_run(workload_type: WorkloadType, path: str, query_names: list[str], iterations: int = 5, timeout: float = 100., check_canonical: CheckCanonicalPolicy = CheckCanonicalPolicy.NO, query_syntax: str = '', - scale: Optional[int] = None, query_prefix=None, external_path='', threads: int = 1) -> dict[str, YdbCliHelper.WorkloadRunResult]: + scale: Optional[int] = None, query_prefix=None, external_path='', threads: int = 0) -> dict[str, YdbCliHelper.WorkloadRunResult]: runner = YdbCliHelper.WorkloadRunner( workload_type, path, @@ -351,6 +351,7 @@ class YdbCliHelper: external_path=external_path, threads=threads ) + extended_query_names = query_names + ["Sum", "Avg", "GAvg"] if runner.run(): - return {q: YdbCliHelper.WorkloadResultParser(runner, q).result for q in query_names} - return {q: runner.result for q in query_names} + return {q: YdbCliHelper.WorkloadResultParser(runner, q).result for q in extended_query_names} + return {q: runner.result for q in extended_query_names} diff --git a/ydb/tests/olap/load/lib/clickbench.py b/ydb/tests/olap/load/lib/clickbench.py index 408b193ed4d..ce6ed0abfa8 100644 --- a/ydb/tests/olap/load/lib/clickbench.py +++ b/ydb/tests/olap/load/lib/clickbench.py @@ -24,7 +24,7 @@ class TestClickbench(LoadSuiteBase): fail_count = 0 results = YdbCliHelper.workload_run( path=cls.path, - query_names=set(QUERY_NAMES), + query_names=QUERY_NAMES, iterations=1, workload_type=cls.workload_type, timeout=cls._get_query_settings().timeout, @@ -50,9 +50,9 @@ class TestClickbenchPg(TestClickbench): query_syntax = 'pg' -class TestClickbenchParallel(LoadSuiteParallel): +class ClickbenchParallelBase(LoadSuiteParallel): workload_type: WorkloadType = WorkloadType.Clickbench - iterations: int = 10 + iterations: int = 5 def get_query_list() -> list[str]: return QUERY_NAMES @@ -65,3 +65,23 @@ class TestClickbenchParallel(LoadSuiteParallel): if cls.verify_data and getenv('NO_VERIFY_DATA', '0') != '1' and getenv('NO_VERIFY_DATA_CLICKBENCH', '0') != '1': cls.check_tables_size(folder=None, tables={'clickbench/hits': 99997497}) super().do_setup_class() + + +class TestClickbenchParallel1(ClickbenchParallelBase): + threads: int = 1 + + +class TestClickbenchParallel2(ClickbenchParallelBase): + threads: int = 2 + + +class TestClickbenchParallel4(ClickbenchParallelBase): + threads: int = 4 + + +class TestClickbenchParallel8(ClickbenchParallelBase): + threads: int = 8 + + +class TestClickbenchParallel16(ClickbenchParallelBase): + threads: int = 16 diff --git a/ydb/tests/olap/load/lib/conftest.py b/ydb/tests/olap/load/lib/conftest.py index f8c78b584a1..12afe5088f6 100644 --- a/ydb/tests/olap/load/lib/conftest.py +++ b/ydb/tests/olap/load/lib/conftest.py @@ -1,10 +1,11 @@ from __future__ import annotations -import pytest import allure import json -import yatest -import os import logging +import os +import pytest +import yatest + from allure_commons._core import plugin_manager from allure_pytest.listener import AllureListener from copy import deepcopy @@ -12,11 +13,11 @@ from datetime import datetime from pytz import timezone from time import time from typing import Optional -from ydb.tests.olap.lib.ydb_cli import YdbCliHelper, WorkloadType, CheckCanonicalPolicy -from ydb.tests.olap.lib.ydb_cluster import YdbCluster from ydb.tests.olap.lib.allure_utils import allure_test_description, NodeErrors from ydb.tests.olap.lib.results_processor import ResultsProcessor from ydb.tests.olap.lib.utils import get_external_param +from ydb.tests.olap.lib.ydb_cli import YdbCliHelper, WorkloadType, CheckCanonicalPolicy +from ydb.tests.olap.lib.ydb_cluster import YdbCluster from ydb.tests.olap.scenario.helpers.scenario_tests_helper import ScenarioTestHelper @@ -374,7 +375,7 @@ class LoadSuiteBase: self.save_nodes_state() result = YdbCliHelper.workload_run( path=path, - query_names={query_name}, + query_names=[query_name], iterations=qparams.iterations, workload_type=self.workload_type, timeout=qparams.timeout, @@ -388,7 +389,7 @@ class LoadSuiteBase: class LoadSuiteParallel(LoadSuiteBase): - threads: int = 8 + threads: int = 0 def get_query_list() -> list[str]: return [] @@ -404,7 +405,7 @@ class LoadSuiteParallel(LoadSuiteBase): cls.save_nodes_state() cls.__results = YdbCliHelper.workload_run( path=cls.get_path(), - query_names=set(cls.get_query_list()), + query_names=cls.get_query_list(), iterations=qparams.iterations, workload_type=cls.workload_type, timeout=qparams.timeout, @@ -422,4 +423,4 @@ class LoadSuiteParallel(LoadSuiteBase): def pytest_generate_tests(metafunc): if issubclass(metafunc.cls, LoadSuiteParallel): - metafunc.parametrize("query_name", metafunc.cls.get_query_list()) + metafunc.parametrize("query_name", metafunc.cls.get_query_list() + ["Sum"]) |
