aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorns-vasilev <ns-vasilev@yandex-team.com>2023-10-23 18:51:58 +0300
committerns-vasilev <ns-vasilev@yandex-team.com>2023-10-23 19:24:37 +0300
commite599669cb835689e7456f75778e2b304010e0961 (patch)
tree02d08ef2ddbe32403970da58423923825ef0428d
parentdac2309e55d9e559c65b13c3a30a88d786805e2e (diff)
downloadydb-e599669cb835689e7456f75778e2b304010e0961.tar.gz
KIKIMR-19702: generic query & scan benchmark (clickbench + tcp-h) + fix generic query for olap + fix memory limit in destructor
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h3
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp13
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h1
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp26
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp7
-rw-r--r--ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp35
-rw-r--r--ydb/public/lib/ydb_cli/commands/benchmark_utils.h2
-rw-r--r--ydb/public/lib/ydb_cli/commands/click_bench.cpp20
-rw-r--r--ydb/public/lib/ydb_cli/commands/click_bench.h3
-rw-r--r--ydb/public/lib/ydb_cli/commands/tpch.cpp20
-rw-r--r--ydb/public/lib/ydb_cli/commands/tpch.h4
-rw-r--r--ydb/tests/functional/clickbench/test.py5
12 files changed, 126 insertions, 13 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 53e348baf18..2b7b4f3d827 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -240,7 +240,8 @@ public:
bool NeedPersistentSnapshot() const {
auto type = GetType();
- if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY) {
+ if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
+ type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
return ::NKikimr::NKqp::HasOlapTableInTx(PreparedQuery->GetPhysicalQuery());
}
return (
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 88a7dd8e18c..1c7130f8222 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -580,6 +580,15 @@ TDataQueryResult ExecQueryAndTestResult(TSession& session, const TString& query,
return result;
}
+void FillProfile(NYdb::NQuery::TExecuteQueryPart& streamPart, NYson::TYsonWriter& writer, TVector<TString>* profiles,
+ ui32 profileIndex)
+{
+ Y_UNUSED(streamPart);
+ Y_UNUSED(writer);
+ Y_UNUSED(profiles);
+ Y_UNUSED(profileIndex);
+}
+
void FillProfile(NYdb::NTable::TScanQueryPart& streamPart, NYson::TYsonWriter& writer, TVector<TString>* profiles,
ui32 profileIndex)
{
@@ -646,6 +655,10 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool t
return out.Str();
}
+TString StreamResultToYson(NYdb::NQuery::TExecuteQueryIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus, const TString& issueMessageSubString) {
+ return StreamResultToYsonImpl(it, nullptr, throwOnTimeout, opStatus, issueMessageSubString);
+}
+
TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus, const TString& issueMessageSubString) {
return StreamResultToYsonImpl(it, nullptr, throwOnTimeout, opStatus, issueMessageSubString);
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index e739e2e96ea..1560d3913c1 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -261,6 +261,7 @@ public:
NYdb::EStatus Status;
};
+TString StreamResultToYson(NYdb::NQuery::TExecuteQueryIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS, const TString& issueMessageSubString = "");
TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS, const TString& issueMessageSubString = "");
TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS);
TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS);
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index d5d788505a2..a090cb27a29 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -5103,6 +5103,32 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(output, R"([[10u;]])");
}
+ Y_UNIT_TEST(OlapRead_StreamGenericQuery) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetForceColumnTablesCompositeMarks(true);
+ TKikimrRunner kikimr(settings);
+
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+ TTableWithNullsHelper(kikimr).CreateTableWithNulls();
+ TLocalHelper(kikimr).CreateTestOlapTable();
+
+ {
+ WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls");
+ }
+
+ auto db = kikimr.GetQueryClient();
+
+ auto it = db.StreamExecuteQuery(R"(
+ SELECT COUNT(*) FROM `/Root/tableWithNulls`;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
+ TString output = StreamResultToYson(it);
+ Cout << output << Endl;
+ CompareYson(output, R"([[10u;]])");
+ }
+
Y_UNIT_TEST(OlapRead_ScanQuery) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
index fb087855a90..592e18b60bf 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -218,7 +218,12 @@ public:
row[i].UnRef();
}
}
- IsEmpty();
+
+ ExtractIt.reset();
+ Storage.clear();
+ States.Clear();
+
+ CleanupCurrentContext();
}
bool TasteIt() {
diff --git a/ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp b/ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp
index 2fe04dd7f0b..910da5b6db1 100644
--- a/ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp
+++ b/ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp
@@ -112,7 +112,8 @@ public:
return ServerTiming;
}
- bool Scan(NTable::TScanQueryPartIterator& it) {
+ template <typename TIterator>
+ bool Scan(TIterator& it) {
for (;;) {
auto streamPart = it.ReadNext().GetValueSync();
if (!streamPart.IsSuccess()) {
@@ -123,8 +124,15 @@ public:
break;
}
- if (streamPart.HasQueryStats()) {
- ServerTiming = streamPart.GetQueryStats().GetTotalDuration();
+ if constexpr (std::is_same_v<TIterator, NTable::TScanQueryPartIterator>) {
+ if (streamPart.HasQueryStats()) {
+ ServerTiming = streamPart.GetQueryStats().GetTotalDuration();
+ }
+ } else {
+ const auto& stats = streamPart.GetStats();
+ if (stats) {
+ ServerTiming = stats->GetTotalDuration();
+ }
}
if (streamPart.HasResultSet()) {
@@ -250,6 +258,27 @@ TQueryBenchmarkResult Execute(const TString& query, NTable::TTableClient& client
}
}
+TQueryBenchmarkResult Execute(const TString& query, NQuery::TQueryClient& client) {
+ NQuery::TExecuteQuerySettings settings;
+ settings.StatsMode(NQuery::EStatsMode::Full);
+ auto it = client.StreamExecuteQuery(
+ query,
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx(),
+ settings).GetValueSync();
+ ThrowOnError(it);
+
+ std::shared_ptr<TYSONResultScanner> scannerYson = std::make_shared<TYSONResultScanner>();
+ std::shared_ptr<TCSVResultScanner> scannerCSV = std::make_shared<TCSVResultScanner>();
+ TQueryResultScannerComposite composite;
+ composite.AddScanner(scannerYson);
+ composite.AddScanner(scannerCSV);
+ if (!composite.Scan(it)) {
+ return TQueryBenchmarkResult::Error(composite.GetErrorInfo());
+ } else {
+ return TQueryBenchmarkResult::Result(scannerYson->GetResult(), *scannerCSV, composite.GetServerTiming());
+ }
+}
+
NJson::TJsonValue GetQueryLabels(ui32 queryId) {
NJson::TJsonValue labels(NJson::JSON_MAP);
labels.InsertValue("query", Sprintf("Query%02u", queryId));
diff --git a/ydb/public/lib/ydb_cli/commands/benchmark_utils.h b/ydb/public/lib/ydb_cli/commands/benchmark_utils.h
index bfcaaaf9fbf..80aeb5145de 100644
--- a/ydb/public/lib/ydb_cli/commands/benchmark_utils.h
+++ b/ydb/public/lib/ydb_cli/commands/benchmark_utils.h
@@ -2,6 +2,7 @@
#include <library/cpp/json/json_value.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <vector>
@@ -87,6 +88,7 @@ TString FullTablePath(const TString& database, const TString& table);
void ThrowOnError(const TStatus& status);
bool HasCharsInString(const TString& str);
TQueryBenchmarkResult Execute(const TString & query, NTable::TTableClient & client);
+TQueryBenchmarkResult Execute(const TString & query, NQuery::TQueryClient & client);
NJson::TJsonValue GetQueryLabels(ui32 queryId);
NJson::TJsonValue GetSensorValue(TStringBuf sensor, TDuration& value, ui32 queryId);
NJson::TJsonValue GetSensorValue(TStringBuf sensor, double value, ui32 queryId);
diff --git a/ydb/public/lib/ydb_cli/commands/click_bench.cpp b/ydb/public/lib/ydb_cli/commands/click_bench.cpp
index c796e6e0e12..16f36261121 100644
--- a/ydb/public/lib/ydb_cli/commands/click_bench.cpp
+++ b/ydb/public/lib/ydb_cli/commands/click_bench.cpp
@@ -210,12 +210,13 @@ TVector<TClickBenchCommandRun::TQueryFullInfo> TClickBenchCommandRun::GetQueries
return result;
}
+template <typename TClient>
bool TClickBenchCommandRun::RunBench(TConfig& config)
{
TOFStream outFStream{OutFilePath};
auto driver = CreateDriver(config);
- auto client = NYdb::NTable::TTableClient(driver);
+ auto client = TClient(driver);
TStringStream report;
report << "Results for " << IterationsCount << " iterations" << Endl;
@@ -573,12 +574,25 @@ void TClickBenchCommandRun::Config(TConfig& config) {
});
config.Opts->MutuallyExclusiveOpt(includeOpt, excludeOpt);
+
+ config.Opts->AddLongOption("executor", "Query executor type."
+ " Options: scan, generic\n"
+ "scan - use scan queries;\n"
+ "generic - use generic queries.")
+ .DefaultValue("scan").StoreResult(&QueryExecutorType);
};
int TClickBenchCommandRun::Run(TConfig& config) {
- const bool okay = RunBench(config);
- return !okay;
+ if (QueryExecutorType == "scan") {
+ const bool okay = RunBench<NYdb::NTable::TTableClient>(config);
+ return !okay;
+ } else if (QueryExecutorType == "generic") {
+ const bool okay = RunBench<NYdb::NQuery::TQueryClient>(config);
+ return !okay;
+ } else {
+ ythrow yexception() << "Incorrect executor type. Available options: \"scan\", \"generic\"." << Endl;
+ }
};
TMap<ui32, TString> TClickBenchCommandRun::LoadExternalResults() const {
diff --git a/ydb/public/lib/ydb_cli/commands/click_bench.h b/ydb/public/lib/ydb_cli/commands/click_bench.h
index ee2939d708a..61b1eb57e2c 100644
--- a/ydb/public/lib/ydb_cli/commands/click_bench.h
+++ b/ydb/public/lib/ydb_cli/commands/click_bench.h
@@ -39,6 +39,7 @@ protected:
TString ExternalQueriesDir;
TString ExternalResultsDir;
TString ExternalVariablesString;
+ TString QueryExecutorType;
TMap<ui32, TString> LoadExternalResults() const;
public:
@@ -47,6 +48,8 @@ public:
int Run(TConfig& config);
TString PatchQuery(const TStringBuf& original) const;
bool NeedRun(const ui32 queryIdx) const;
+
+ template <typename TClient>
bool RunBench(TConfig& config);
class TQueryFullInfo {
diff --git a/ydb/public/lib/ydb_cli/commands/tpch.cpp b/ydb/public/lib/ydb_cli/commands/tpch.cpp
index 2c60b5006b4..1dc9b849ab6 100644
--- a/ydb/public/lib/ydb_cli/commands/tpch.cpp
+++ b/ydb/public/lib/ydb_cli/commands/tpch.cpp
@@ -49,12 +49,13 @@ TVector<TString> TTpchCommandRun::GetQueries() const {
return queries;
}
+template <typename TClient>
bool TTpchCommandRun::RunBench(TConfig& config)
{
TOFStream outFStream{OutFilePath};
auto driver = CreateDriver(config);
- auto client = NYdb::NTable::TTableClient(driver);
+ auto client = TClient(driver);
TStringStream report;
report << "Results for " << IterationsCount << " iterations" << Endl;
@@ -379,12 +380,25 @@ void TTpchCommandRun::Config(TConfig& config) {
});
config.Opts->MutuallyExclusiveOpt(includeOpt, excludeOpt);
+
+ config.Opts->AddLongOption("executor", "Query executor type."
+ " Options: scan, generic\n"
+ "scan - use scan queries;\n"
+ "generic - use generic queries.")
+ .DefaultValue("scan").StoreResult(&QueryExecutorType);
};
int TTpchCommandRun::Run(TConfig& config) {
- const bool okay = RunBench(config);
- return !okay;
+ if (QueryExecutorType == "scan") {
+ const bool okay = RunBench<NYdb::NTable::TTableClient>(config);
+ return !okay;
+ } else if (QueryExecutorType == "generic") {
+ const bool okay = RunBench<NYdb::NQuery::TQueryClient>(config);
+ return !okay;
+ } else {
+ ythrow yexception() << "Incorrect executor type. Available options: \"scan\", \"generic\"." << Endl;
+ }
};
TCommandTpch::TCommandTpch()
diff --git a/ydb/public/lib/ydb_cli/commands/tpch.h b/ydb/public/lib/ydb_cli/commands/tpch.h
index b2f3fbbb63e..28f7da178b8 100644
--- a/ydb/public/lib/ydb_cli/commands/tpch.h
+++ b/ydb/public/lib/ydb_cli/commands/tpch.h
@@ -39,12 +39,16 @@ protected:
TString ExternalQueriesFile;
TString ExternalQueriesDir;
TString ExternalVariablesString;
+ TString QueryExecutorType;
+
public:
TTpchCommandRun();
void Config(TConfig& config);
int Run(TConfig& config);
TString PatchQuery(const TStringBuf& original) const;
bool NeedRun(const ui32 queryIdx) const;
+
+ template <typename TClient>
bool RunBench(TConfig& config);
TVector<TString> GetQueries() const;
diff --git a/ydb/tests/functional/clickbench/test.py b/ydb/tests/functional/clickbench/test.py
index 35fa42f46aa..cdccb2a6642 100644
--- a/ydb/tests/functional/clickbench/test.py
+++ b/ydb/tests/functional/clickbench/test.py
@@ -98,7 +98,8 @@ def save_canonical_data(data, fname):
@pytest.mark.parametrize("store", ["row", "column"])
-def test_run_benchmark(store):
+@pytest.mark.parametrize("executor", ["scan", "generic"])
+def test_run_benchmark(store, executor):
path = "clickbench/benchmark/{}/hits".format(store)
ret = run_cli(["workload", "clickbench", "init", "--store", store, "--path", path])
assert_that(ret.exit_code, is_(0))
@@ -114,7 +115,7 @@ def test_run_benchmark(store):
# just validating that benchmark can be executed successfully on this data.
out_fpath = os.path.join(yatest.common.output_path(), 'click_bench.{}.results'.format(store))
- ret = run_cli(["workload", "clickbench", "run", "--output", out_fpath, "--table", path])
+ ret = run_cli(["workload", "clickbench", "run", "--output", out_fpath, "--table", path, "--executor", executor])
assert_that(ret.exit_code, is_(0))