diff options
author | ns-vasilev <ns-vasilev@yandex-team.com> | 2023-10-23 18:51:58 +0300 |
---|---|---|
committer | ns-vasilev <ns-vasilev@yandex-team.com> | 2023-10-23 19:24:37 +0300 |
commit | e599669cb835689e7456f75778e2b304010e0961 (patch) | |
tree | 02d08ef2ddbe32403970da58423923825ef0428d | |
parent | dac2309e55d9e559c65b13c3a30a88d786805e2e (diff) | |
download | ydb-e599669cb835689e7456f75778e2b304010e0961.tar.gz |
KIKIMR-19702: generic query & scan benchmark (clickbench + tcp-h) + fix generic query for olap + fix memory limit in destructor
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 26 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp | 7 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp | 35 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/benchmark_utils.h | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/click_bench.cpp | 20 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/click_bench.h | 3 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/tpch.cpp | 20 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/tpch.h | 4 | ||||
-rw-r--r-- | ydb/tests/functional/clickbench/test.py | 5 |
12 files changed, 126 insertions, 13 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 53e348baf18..2b7b4f3d827 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -240,7 +240,8 @@ public: bool NeedPersistentSnapshot() const { auto type = GetType(); - if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY) { + if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY || + type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) { return ::NKikimr::NKqp::HasOlapTableInTx(PreparedQuery->GetPhysicalQuery()); } return ( diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 88a7dd8e18c..1c7130f8222 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -580,6 +580,15 @@ TDataQueryResult ExecQueryAndTestResult(TSession& session, const TString& query, return result; } +void FillProfile(NYdb::NQuery::TExecuteQueryPart& streamPart, NYson::TYsonWriter& writer, TVector<TString>* profiles, + ui32 profileIndex) +{ + Y_UNUSED(streamPart); + Y_UNUSED(writer); + Y_UNUSED(profiles); + Y_UNUSED(profileIndex); +} + void FillProfile(NYdb::NTable::TScanQueryPart& streamPart, NYson::TYsonWriter& writer, TVector<TString>* profiles, ui32 profileIndex) { @@ -646,6 +655,10 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool t return out.Str(); } +TString StreamResultToYson(NYdb::NQuery::TExecuteQueryIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus, const TString& issueMessageSubString) { + return StreamResultToYsonImpl(it, nullptr, throwOnTimeout, opStatus, issueMessageSubString); +} + TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus, const TString& issueMessageSubString) { return StreamResultToYsonImpl(it, nullptr, throwOnTimeout, opStatus, issueMessageSubString); } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index e739e2e96ea..1560d3913c1 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -261,6 +261,7 @@ public: NYdb::EStatus Status; }; +TString StreamResultToYson(NYdb::NQuery::TExecuteQueryIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS, const TString& issueMessageSubString = ""); TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS, const TString& issueMessageSubString = ""); TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS); TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index d5d788505a2..a090cb27a29 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -5103,6 +5103,32 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(output, R"([[10u;]])"); } + Y_UNIT_TEST(OlapRead_StreamGenericQuery) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false) + .SetForceColumnTablesCompositeMarks(true); + TKikimrRunner kikimr(settings); + + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTableWithNullsHelper(kikimr).CreateTableWithNulls(); + TLocalHelper(kikimr).CreateTestOlapTable(); + + { + WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls"); + } + + auto db = kikimr.GetQueryClient(); + + auto it = db.StreamExecuteQuery(R"( + SELECT COUNT(*) FROM `/Root/tableWithNulls`; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + Cout << output << Endl; + CompareYson(output, R"([[10u;]])"); + } + Y_UNIT_TEST(OlapRead_ScanQuery) { auto settings = TKikimrSettings() .SetWithSampleTables(false) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index fb087855a90..592e18b60bf 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -218,7 +218,12 @@ public: row[i].UnRef(); } } - IsEmpty(); + + ExtractIt.reset(); + Storage.clear(); + States.Clear(); + + CleanupCurrentContext(); } bool TasteIt() { diff --git a/ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp b/ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp index 2fe04dd7f0b..910da5b6db1 100644 --- a/ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp +++ b/ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp @@ -112,7 +112,8 @@ public: return ServerTiming; } - bool Scan(NTable::TScanQueryPartIterator& it) { + template <typename TIterator> + bool Scan(TIterator& it) { for (;;) { auto streamPart = it.ReadNext().GetValueSync(); if (!streamPart.IsSuccess()) { @@ -123,8 +124,15 @@ public: break; } - if (streamPart.HasQueryStats()) { - ServerTiming = streamPart.GetQueryStats().GetTotalDuration(); + if constexpr (std::is_same_v<TIterator, NTable::TScanQueryPartIterator>) { + if (streamPart.HasQueryStats()) { + ServerTiming = streamPart.GetQueryStats().GetTotalDuration(); + } + } else { + const auto& stats = streamPart.GetStats(); + if (stats) { + ServerTiming = stats->GetTotalDuration(); + } } if (streamPart.HasResultSet()) { @@ -250,6 +258,27 @@ TQueryBenchmarkResult Execute(const TString& query, NTable::TTableClient& client } } +TQueryBenchmarkResult Execute(const TString& query, NQuery::TQueryClient& client) { + NQuery::TExecuteQuerySettings settings; + settings.StatsMode(NQuery::EStatsMode::Full); + auto it = client.StreamExecuteQuery( + query, + NYdb::NQuery::TTxControl::BeginTx().CommitTx(), + settings).GetValueSync(); + ThrowOnError(it); + + std::shared_ptr<TYSONResultScanner> scannerYson = std::make_shared<TYSONResultScanner>(); + std::shared_ptr<TCSVResultScanner> scannerCSV = std::make_shared<TCSVResultScanner>(); + TQueryResultScannerComposite composite; + composite.AddScanner(scannerYson); + composite.AddScanner(scannerCSV); + if (!composite.Scan(it)) { + return TQueryBenchmarkResult::Error(composite.GetErrorInfo()); + } else { + return TQueryBenchmarkResult::Result(scannerYson->GetResult(), *scannerCSV, composite.GetServerTiming()); + } +} + NJson::TJsonValue GetQueryLabels(ui32 queryId) { NJson::TJsonValue labels(NJson::JSON_MAP); labels.InsertValue("query", Sprintf("Query%02u", queryId)); diff --git a/ydb/public/lib/ydb_cli/commands/benchmark_utils.h b/ydb/public/lib/ydb_cli/commands/benchmark_utils.h index bfcaaaf9fbf..80aeb5145de 100644 --- a/ydb/public/lib/ydb_cli/commands/benchmark_utils.h +++ b/ydb/public/lib/ydb_cli/commands/benchmark_utils.h @@ -2,6 +2,7 @@ #include <library/cpp/json/json_value.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> #include <vector> @@ -87,6 +88,7 @@ TString FullTablePath(const TString& database, const TString& table); void ThrowOnError(const TStatus& status); bool HasCharsInString(const TString& str); TQueryBenchmarkResult Execute(const TString & query, NTable::TTableClient & client); +TQueryBenchmarkResult Execute(const TString & query, NQuery::TQueryClient & client); NJson::TJsonValue GetQueryLabels(ui32 queryId); NJson::TJsonValue GetSensorValue(TStringBuf sensor, TDuration& value, ui32 queryId); NJson::TJsonValue GetSensorValue(TStringBuf sensor, double value, ui32 queryId); diff --git a/ydb/public/lib/ydb_cli/commands/click_bench.cpp b/ydb/public/lib/ydb_cli/commands/click_bench.cpp index c796e6e0e12..16f36261121 100644 --- a/ydb/public/lib/ydb_cli/commands/click_bench.cpp +++ b/ydb/public/lib/ydb_cli/commands/click_bench.cpp @@ -210,12 +210,13 @@ TVector<TClickBenchCommandRun::TQueryFullInfo> TClickBenchCommandRun::GetQueries return result; } +template <typename TClient> bool TClickBenchCommandRun::RunBench(TConfig& config) { TOFStream outFStream{OutFilePath}; auto driver = CreateDriver(config); - auto client = NYdb::NTable::TTableClient(driver); + auto client = TClient(driver); TStringStream report; report << "Results for " << IterationsCount << " iterations" << Endl; @@ -573,12 +574,25 @@ void TClickBenchCommandRun::Config(TConfig& config) { }); config.Opts->MutuallyExclusiveOpt(includeOpt, excludeOpt); + + config.Opts->AddLongOption("executor", "Query executor type." + " Options: scan, generic\n" + "scan - use scan queries;\n" + "generic - use generic queries.") + .DefaultValue("scan").StoreResult(&QueryExecutorType); }; int TClickBenchCommandRun::Run(TConfig& config) { - const bool okay = RunBench(config); - return !okay; + if (QueryExecutorType == "scan") { + const bool okay = RunBench<NYdb::NTable::TTableClient>(config); + return !okay; + } else if (QueryExecutorType == "generic") { + const bool okay = RunBench<NYdb::NQuery::TQueryClient>(config); + return !okay; + } else { + ythrow yexception() << "Incorrect executor type. Available options: \"scan\", \"generic\"." << Endl; + } }; TMap<ui32, TString> TClickBenchCommandRun::LoadExternalResults() const { diff --git a/ydb/public/lib/ydb_cli/commands/click_bench.h b/ydb/public/lib/ydb_cli/commands/click_bench.h index ee2939d708a..61b1eb57e2c 100644 --- a/ydb/public/lib/ydb_cli/commands/click_bench.h +++ b/ydb/public/lib/ydb_cli/commands/click_bench.h @@ -39,6 +39,7 @@ protected: TString ExternalQueriesDir; TString ExternalResultsDir; TString ExternalVariablesString; + TString QueryExecutorType; TMap<ui32, TString> LoadExternalResults() const; public: @@ -47,6 +48,8 @@ public: int Run(TConfig& config); TString PatchQuery(const TStringBuf& original) const; bool NeedRun(const ui32 queryIdx) const; + + template <typename TClient> bool RunBench(TConfig& config); class TQueryFullInfo { diff --git a/ydb/public/lib/ydb_cli/commands/tpch.cpp b/ydb/public/lib/ydb_cli/commands/tpch.cpp index 2c60b5006b4..1dc9b849ab6 100644 --- a/ydb/public/lib/ydb_cli/commands/tpch.cpp +++ b/ydb/public/lib/ydb_cli/commands/tpch.cpp @@ -49,12 +49,13 @@ TVector<TString> TTpchCommandRun::GetQueries() const { return queries; } +template <typename TClient> bool TTpchCommandRun::RunBench(TConfig& config) { TOFStream outFStream{OutFilePath}; auto driver = CreateDriver(config); - auto client = NYdb::NTable::TTableClient(driver); + auto client = TClient(driver); TStringStream report; report << "Results for " << IterationsCount << " iterations" << Endl; @@ -379,12 +380,25 @@ void TTpchCommandRun::Config(TConfig& config) { }); config.Opts->MutuallyExclusiveOpt(includeOpt, excludeOpt); + + config.Opts->AddLongOption("executor", "Query executor type." + " Options: scan, generic\n" + "scan - use scan queries;\n" + "generic - use generic queries.") + .DefaultValue("scan").StoreResult(&QueryExecutorType); }; int TTpchCommandRun::Run(TConfig& config) { - const bool okay = RunBench(config); - return !okay; + if (QueryExecutorType == "scan") { + const bool okay = RunBench<NYdb::NTable::TTableClient>(config); + return !okay; + } else if (QueryExecutorType == "generic") { + const bool okay = RunBench<NYdb::NQuery::TQueryClient>(config); + return !okay; + } else { + ythrow yexception() << "Incorrect executor type. Available options: \"scan\", \"generic\"." << Endl; + } }; TCommandTpch::TCommandTpch() diff --git a/ydb/public/lib/ydb_cli/commands/tpch.h b/ydb/public/lib/ydb_cli/commands/tpch.h index b2f3fbbb63e..28f7da178b8 100644 --- a/ydb/public/lib/ydb_cli/commands/tpch.h +++ b/ydb/public/lib/ydb_cli/commands/tpch.h @@ -39,12 +39,16 @@ protected: TString ExternalQueriesFile; TString ExternalQueriesDir; TString ExternalVariablesString; + TString QueryExecutorType; + public: TTpchCommandRun(); void Config(TConfig& config); int Run(TConfig& config); TString PatchQuery(const TStringBuf& original) const; bool NeedRun(const ui32 queryIdx) const; + + template <typename TClient> bool RunBench(TConfig& config); TVector<TString> GetQueries() const; diff --git a/ydb/tests/functional/clickbench/test.py b/ydb/tests/functional/clickbench/test.py index 35fa42f46aa..cdccb2a6642 100644 --- a/ydb/tests/functional/clickbench/test.py +++ b/ydb/tests/functional/clickbench/test.py @@ -98,7 +98,8 @@ def save_canonical_data(data, fname): @pytest.mark.parametrize("store", ["row", "column"]) -def test_run_benchmark(store): +@pytest.mark.parametrize("executor", ["scan", "generic"]) +def test_run_benchmark(store, executor): path = "clickbench/benchmark/{}/hits".format(store) ret = run_cli(["workload", "clickbench", "init", "--store", store, "--path", path]) assert_that(ret.exit_code, is_(0)) @@ -114,7 +115,7 @@ def test_run_benchmark(store): # just validating that benchmark can be executed successfully on this data. out_fpath = os.path.join(yatest.common.output_path(), 'click_bench.{}.results'.format(store)) - ret = run_cli(["workload", "clickbench", "run", "--output", out_fpath, "--table", path]) + ret = run_cli(["workload", "clickbench", "run", "--output", out_fpath, "--table", path, "--executor", executor]) assert_that(ret.exit_code, is_(0)) |