summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorОлег <[email protected]>2025-05-29 12:22:27 +0300
committerGitHub <[email protected]>2025-05-29 09:22:27 +0000
commit61ea77b1a877ab9cb011361696ddcc9c90896deb (patch)
tree7fb61da9793153996dc19a224c4cfc66656a57bf
parent4ab3a5e265c54c5462eb7311554eaded2f9e6f9c (diff)
Improve parallel clickbench (#18977)
-rw-r--r--ydb/public/lib/ydb_cli/commands/ya.make1
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp22
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_benchmark.h7
-rw-r--r--ydb/tests/functional/tpc/medium/test_clickbench.py2
-rw-r--r--ydb/tests/olap/lib/ydb_cli.py15
-rw-r--r--ydb/tests/olap/load/lib/clickbench.py26
-rw-r--r--ydb/tests/olap/load/lib/conftest.py19
7 files changed, 60 insertions, 32 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make
index 5d17b732d6c..e63b14bd19f 100644
--- a/ydb/public/lib/ydb_cli/commands/ya.make
+++ b/ydb/public/lib/ydb_cli/commands/ya.make
@@ -79,7 +79,6 @@ PEERDIR(
yql/essentials/public/decimal
)
-GENERATE_ENUM_SERIALIZATION(ydb_benchmark.h)
GENERATE_ENUM_SERIALIZATION(ydb_ping.h)
GENERATE_ENUM_SERIALIZATION(ydb_latency.h)
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp
index fcd2d483e9d..b0032d0eb9a 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp
@@ -142,6 +142,7 @@ TVector<TString> ColumnNames {
"RttMin",
"RttMax",
"RttAvg",
+ "GrossTime",
"SuccessCount",
"FailsCount",
"DiffsCount"
@@ -158,6 +159,8 @@ struct TTestInfoProduct {
double Median = 1;
double UnixBench = 1;
double Std = 0;
+ std::vector<TDuration> ClientTimings;
+
void operator *=(const BenchmarkUtils::TTestInfo& other) {
ColdTime *= other.ColdTime.MillisecondsFloat();
Min *= other.Min.MillisecondsFloat();
@@ -279,6 +282,11 @@ void CollectStats(TPrettyTable& table, IOutputStream* csv, NJson::TJsonValue* js
CollectField<true>(row, index++, csv, json, name, testInfo.RttMin);
CollectField<true>(row, index++, csv, json, name, testInfo.RttMax);
CollectField<true>(row, index++, csv, json, name, testInfo.RttMean);
+ auto grossTime = TDuration::Zero();
+ for (const auto& clientTime: testInfo.ClientTimings) {
+ grossTime += clientTime;
+ }
+ CollectField<true>(row, index++, csv, json, name, grossTime);
CollectField(row, index++, csv, json, name, sCount);
CollectField(row, index++, csv, json, name, fCount);
CollectField(row, index++, csv, json, name, dCount);
@@ -303,7 +311,7 @@ public:
void Process(void*) override {
bool execute = Iteration >= 0; // explain in other case
- if (Owner.Threads <= 1) {
+ if (Owner.Threads == 0) {
PrintQueryHeader();
}
auto t1 = TInstant::Now();
@@ -331,7 +339,7 @@ public:
}
ClientDuration = TInstant::Now() - t1;
Owner.SavePlans(Result, QueryName, execute ? ToString(Iteration) : "explain");
- if (Owner.Threads <= 1) {
+ if (Owner.Threads == 0) {
PrintResult();
}
}
@@ -396,7 +404,7 @@ int TWorkloadCommandBenchmark::RunBench(NYdbWorkload::IWorkloadQueryGenerator& w
iterations.emplace_back(MakeIntrusive<TIterationExecution>(*this, query, queryName, qInfo.ExpectedResult.c_str()));
}
}
- if (Threads > 1) {
+ if (Threads > 0) {
Shuffle(iterations.begin(), iterations.end());
}
TMap<TString, TIterations> queryExecByName;
@@ -408,11 +416,13 @@ int TWorkloadCommandBenchmark::RunBench(NYdbWorkload::IWorkloadQueryGenerator& w
GlobalDeadline = (GlobalTimeout != TDuration::Zero()) ? Now() + GlobalTimeout : TInstant::Max();
TThreadPool pool;
- pool.Start(Threads > 1 ? Threads : 0);
+ pool.Start(Threads);
+ const auto startTime = Now();
for (auto iter: iterations) {
pool.SafeAdd(iter.Get());
}
pool.Stop();
+ const auto grossTime = Now() - startTime;
ui32 queriesWithAllSuccess = 0;
ui32 queriesWithSomeFails = 0;
@@ -448,7 +458,7 @@ int TWorkloadCommandBenchmark::RunBench(NYdbWorkload::IWorkloadQueryGenerator& w
std::optional<TString> prevResult;
TOFStream outFStream(TStringBuilder() << OutFilePath << "." << queryName << ".out");
for (const auto& iterExec: queryExec) {
- if (Threads > 1) {
+ if (Threads > 0) {
iterExec->PrintQueryHeader();
iterExec->PrintResult();
}
@@ -506,8 +516,10 @@ int TWorkloadCommandBenchmark::RunBench(NYdbWorkload::IWorkloadQueryGenerator& w
}
if (queriesWithAllSuccess) {
+ sumInfo.ClientTimings.push_back(grossTime);
CollectStats(statTable, csvReport.Get(), jsonReport.Get(), "Sum", queriesWithAllSuccess, queriesWithSomeFails, queriesWithDiff, sumInfo);
sumInfo /= queriesWithAllSuccess;
+ sumInfo.ClientTimings.back() = grossTime / queriesWithAllSuccess;
CollectStats(statTable, csvReport.Get(), jsonReport.Get(), "Avg", queriesWithAllSuccess, queriesWithSomeFails, queriesWithDiff, sumInfo);
productInfo ^= queriesWithAllSuccess;
CollectStats(statTable, csvReport.Get(), jsonReport.Get(), "GAvg", queriesWithAllSuccess, queriesWithSomeFails, queriesWithDiff, productInfo);
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.h b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.h
index d1c1857967d..3083824b176 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.h
@@ -11,11 +11,6 @@ namespace BenchmarkUtils {
class TWorkloadCommandBenchmark final: public TWorkloadCommandBase {
public:
- enum class EQueryExecutor {
- Scan /* "scan" */,
- Generic /* "generic" */
- };
-
TWorkloadCommandBenchmark(NYdbWorkload::TWorkloadParams& params, const NYdbWorkload::IWorkloadQueryGenerator::TWorkloadType& workload);
virtual void Config(TConfig& config) override;
@@ -47,7 +42,7 @@ private:
TDuration RequestTimeout = TDuration::Zero();
TInstant GlobalDeadline = TInstant::Max();
NYdb::NRetry::TRetryOperationSettings RetrySettings;
- ui32 Threads = 1;
+ ui32 Threads = 0;
};
} \ No newline at end of file
diff --git a/ydb/tests/functional/tpc/medium/test_clickbench.py b/ydb/tests/functional/tpc/medium/test_clickbench.py
index dae272467a0..f96bf129676 100644
--- a/ydb/tests/functional/tpc/medium/test_clickbench.py
+++ b/ydb/tests/functional/tpc/medium/test_clickbench.py
@@ -15,7 +15,7 @@ class TestClickbench(clickbench.TestClickbench, FunctionalTestBase):
super().setup_class()
-class TestClickbenchParallel(clickbench.TestClickbenchParallel, FunctionalTestBase):
+class TestClickbenchParallel(clickbench.TestClickbenchParallel8, FunctionalTestBase):
verify_data: bool = False
iterations: int = 2
diff --git a/ydb/tests/olap/lib/ydb_cli.py b/ydb/tests/olap/lib/ydb_cli.py
index 02b44ffa4e8..b591be224fa 100644
--- a/ydb/tests/olap/lib/ydb_cli.py
+++ b/ydb/tests/olap/lib/ydb_cli.py
@@ -136,7 +136,7 @@ class YdbCliHelper:
def __init__(self,
workload_type: WorkloadType,
db_path: str,
- query_names: set[str],
+ query_names: list[str],
iterations: int,
timeout: float,
check_canonical: CheckCanonicalPolicy,
@@ -160,7 +160,7 @@ class YdbCliHelper:
self.returncode = None
self.stderr = None
self.stdout = None
- self.__prefix = md5(','.join(query_names).encode()).hexdigest() if len(query_names) != 1 else [q for q in query_names][0]
+ self.__prefix = md5(','.join(query_names).encode()).hexdigest() if len(query_names) != 1 else query_names[0]
self.__plan_path = f'{self.__prefix}.plan'
self.__query_output_path = f'{self.__prefix}.result'
self.json_path = f'{self.__prefix}.stats.json'
@@ -196,7 +196,7 @@ class YdbCliHelper:
cmd += ['--syntax', self.query_syntax]
if self.scale is not None and self.scale > 0:
cmd += ['--scale', str(self.scale)]
- if self.threads > 1:
+ if self.threads > 0:
cmd += ['--threads', str(self.threads)]
return cmd
@@ -335,9 +335,9 @@ class YdbCliHelper:
self.__process_returncode()
@staticmethod
- def workload_run(workload_type: WorkloadType, path: str, query_names: str, iterations: int = 5,
+ def workload_run(workload_type: WorkloadType, path: str, query_names: list[str], iterations: int = 5,
timeout: float = 100., check_canonical: CheckCanonicalPolicy = CheckCanonicalPolicy.NO, query_syntax: str = '',
- scale: Optional[int] = None, query_prefix=None, external_path='', threads: int = 1) -> dict[str, YdbCliHelper.WorkloadRunResult]:
+ scale: Optional[int] = None, query_prefix=None, external_path='', threads: int = 0) -> dict[str, YdbCliHelper.WorkloadRunResult]:
runner = YdbCliHelper.WorkloadRunner(
workload_type,
path,
@@ -351,6 +351,7 @@ class YdbCliHelper:
external_path=external_path,
threads=threads
)
+ extended_query_names = query_names + ["Sum", "Avg", "GAvg"]
if runner.run():
- return {q: YdbCliHelper.WorkloadResultParser(runner, q).result for q in query_names}
- return {q: runner.result for q in query_names}
+ return {q: YdbCliHelper.WorkloadResultParser(runner, q).result for q in extended_query_names}
+ return {q: runner.result for q in extended_query_names}
diff --git a/ydb/tests/olap/load/lib/clickbench.py b/ydb/tests/olap/load/lib/clickbench.py
index 408b193ed4d..ce6ed0abfa8 100644
--- a/ydb/tests/olap/load/lib/clickbench.py
+++ b/ydb/tests/olap/load/lib/clickbench.py
@@ -24,7 +24,7 @@ class TestClickbench(LoadSuiteBase):
fail_count = 0
results = YdbCliHelper.workload_run(
path=cls.path,
- query_names=set(QUERY_NAMES),
+ query_names=QUERY_NAMES,
iterations=1,
workload_type=cls.workload_type,
timeout=cls._get_query_settings().timeout,
@@ -50,9 +50,9 @@ class TestClickbenchPg(TestClickbench):
query_syntax = 'pg'
-class TestClickbenchParallel(LoadSuiteParallel):
+class ClickbenchParallelBase(LoadSuiteParallel):
workload_type: WorkloadType = WorkloadType.Clickbench
- iterations: int = 10
+ iterations: int = 5
def get_query_list() -> list[str]:
return QUERY_NAMES
@@ -65,3 +65,23 @@ class TestClickbenchParallel(LoadSuiteParallel):
if cls.verify_data and getenv('NO_VERIFY_DATA', '0') != '1' and getenv('NO_VERIFY_DATA_CLICKBENCH', '0') != '1':
cls.check_tables_size(folder=None, tables={'clickbench/hits': 99997497})
super().do_setup_class()
+
+
+class TestClickbenchParallel1(ClickbenchParallelBase):
+ threads: int = 1
+
+
+class TestClickbenchParallel2(ClickbenchParallelBase):
+ threads: int = 2
+
+
+class TestClickbenchParallel4(ClickbenchParallelBase):
+ threads: int = 4
+
+
+class TestClickbenchParallel8(ClickbenchParallelBase):
+ threads: int = 8
+
+
+class TestClickbenchParallel16(ClickbenchParallelBase):
+ threads: int = 16
diff --git a/ydb/tests/olap/load/lib/conftest.py b/ydb/tests/olap/load/lib/conftest.py
index f8c78b584a1..12afe5088f6 100644
--- a/ydb/tests/olap/load/lib/conftest.py
+++ b/ydb/tests/olap/load/lib/conftest.py
@@ -1,10 +1,11 @@
from __future__ import annotations
-import pytest
import allure
import json
-import yatest
-import os
import logging
+import os
+import pytest
+import yatest
+
from allure_commons._core import plugin_manager
from allure_pytest.listener import AllureListener
from copy import deepcopy
@@ -12,11 +13,11 @@ from datetime import datetime
from pytz import timezone
from time import time
from typing import Optional
-from ydb.tests.olap.lib.ydb_cli import YdbCliHelper, WorkloadType, CheckCanonicalPolicy
-from ydb.tests.olap.lib.ydb_cluster import YdbCluster
from ydb.tests.olap.lib.allure_utils import allure_test_description, NodeErrors
from ydb.tests.olap.lib.results_processor import ResultsProcessor
from ydb.tests.olap.lib.utils import get_external_param
+from ydb.tests.olap.lib.ydb_cli import YdbCliHelper, WorkloadType, CheckCanonicalPolicy
+from ydb.tests.olap.lib.ydb_cluster import YdbCluster
from ydb.tests.olap.scenario.helpers.scenario_tests_helper import ScenarioTestHelper
@@ -374,7 +375,7 @@ class LoadSuiteBase:
self.save_nodes_state()
result = YdbCliHelper.workload_run(
path=path,
- query_names={query_name},
+ query_names=[query_name],
iterations=qparams.iterations,
workload_type=self.workload_type,
timeout=qparams.timeout,
@@ -388,7 +389,7 @@ class LoadSuiteBase:
class LoadSuiteParallel(LoadSuiteBase):
- threads: int = 8
+ threads: int = 0
def get_query_list() -> list[str]:
return []
@@ -404,7 +405,7 @@ class LoadSuiteParallel(LoadSuiteBase):
cls.save_nodes_state()
cls.__results = YdbCliHelper.workload_run(
path=cls.get_path(),
- query_names=set(cls.get_query_list()),
+ query_names=cls.get_query_list(),
iterations=qparams.iterations,
workload_type=cls.workload_type,
timeout=qparams.timeout,
@@ -422,4 +423,4 @@ class LoadSuiteParallel(LoadSuiteBase):
def pytest_generate_tests(metafunc):
if issubclass(metafunc.cls, LoadSuiteParallel):
- metafunc.parametrize("query_name", metafunc.cls.get_query_list())
+ metafunc.parametrize("query_name", metafunc.cls.get_query_list() + ["Sum"])