diff options
author | aidarsamer <aidarsamer@ydb.tech> | 2022-12-29 15:15:47 +0300 |
---|---|---|
committer | aidarsamer <aidarsamer@ydb.tech> | 2022-12-29 15:15:47 +0300 |
commit | e718be7eba95480bf6a909f927e98a8c89fb2263 (patch) | |
tree | 3c09686f0b23958dfc6c8774f7abbd5906c7cc92 | |
parent | 80c09361cd3952574f1eec85bfd8057277252e90 (diff) | |
download | ydb-e718be7eba95480bf6a909f927e98a8c89fb2263.tar.gz |
Add SSA runtime version
Upgrade SSA runtime version.
If no result columns in scan task meta verify that this is not ColumnShard OLAP read.
Add SSA runtime version
-rw-r--r-- | ydb/core/formats/ssa_runtime_version.h | 30 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_data.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 64 |
6 files changed, 114 insertions, 11 deletions
diff --git a/ydb/core/formats/ssa_runtime_version.h b/ydb/core/formats/ssa_runtime_version.h new file mode 100644 index 00000000000..1b6f4d9611e --- /dev/null +++ b/ydb/core/formats/ssa_runtime_version.h @@ -0,0 +1,30 @@ +#pragma once + +#include <util/system/types.h> + +namespace NKikimr::NSsa { +// Problem: rolling update of services based on SSA runtime (example: YDB) +// requires careful management of SSA runtime versions. A SSA program +// that was built with a YDB stable-23-1 must work correctly at YDB stable-22-4 +// (previous stable) if YDB rollback happens. +// +// Solution: we support SSA runtime version. Every incompatible change to +// SSA runtime increments SSA_RUNTIME_VERSION. A user of SSA runtime +// (YDB for example) manually chooses 'RuntimeVersion' to provide major/minor +// releases compatibility. For instance, if YDB stable-22-4 support version X, +// the owner of YDB stable-23-1 sets RuntimeVersion to version X to allow +// graceful rolling update. When there is not chance to rollback to previous +// version, for instance stable-22-4, the YDB owner can switch to the new +// version Y (Y > X). + +// Bump this version every time incompatible runtime functions are introduced. +#ifndef SSA_RUNTIME_VERSION +#define SSA_RUNTIME_VERSION 2U +#endif + +// History: +// v1 supports filter and cast(timestamp to uint64) pushdowns. +// v2 supports COUNT(col), COUNT(*), SUM(), MIN(), MAX(), SOME() aggregations. +constexpr ui32 RuntimeVersion = SSA_RUNTIME_VERSION; + +} diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp index ba6f77981af..d5dbaf0f76e 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp @@ -136,8 +136,6 @@ TExprBase KqpApplyExtractMembersToReadOlapTable(TExprBase node, TExprContext& ct return node; } - // When process is set it may use columns in read.Columns() but those columns may not be present - // in the results. Thus do not apply extract members if process is not empty lambda if (read.Process().Body().Raw() != read.Process().Args().Arg(0).Raw()) { auto extractMembers = Build<TKqpOlapExtractMembers>(ctx, node.Pos()) .Input(read.Process().Args().Arg(0)) diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp index 42e3475fa9d..08d99b41b71 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp @@ -2,6 +2,8 @@ #include <ydb/core/kqp/common/kqp_yql.h> +#include <ydb/core/formats/ssa_runtime_version.h> + #include <ydb/library/yql/core/yql_opt_utils.h> #include <vector> @@ -54,6 +56,11 @@ bool CanBePushedDown(const TExprBase& trait, TExprContext& ctx) TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + if (NKikimr::NSsa::RuntimeVersion < 2U) { + // We introduced aggregate pushdown in v2 of SSA program + return node; + } + if (!kqpCtx.Config->HasOptEnableOlapPushdown()) { return node; } @@ -135,6 +142,11 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti TExprBase KqpPushOlapLength(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + if (NKikimr::NSsa::RuntimeVersion < 2U) { + // We introduced aggregate pushdown in v2 of SSA program + return node; + } + if (!kqpCtx.Config->HasOptEnableOlapPushdown()) { return node; } diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp index 56fb4be5b6d..1989b965d48 100644 --- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp @@ -1,6 +1,8 @@ #include "kqp_olap_compiler.h" #include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/formats/ssa_runtime_version.h> + #include <ydb/library/yql/core/yql_opt_utils.h> namespace NKikimr { @@ -12,8 +14,6 @@ using namespace NKikimrSSA; using EAggFunctionType = TProgram::TAggregateAssignment::EAggregateFunction; -constexpr ui32 OLAP_PROGRAM_VERSION = 1; - namespace { struct TAggColInfo { @@ -38,7 +38,7 @@ public: MaxColumnId = std::max(MaxColumnId, columnMeta.Id); } - Program.SetVersion(OLAP_PROGRAM_VERSION); + Program.SetVersion(NKikimr::NSsa::RuntimeVersion); } ui32 GetColumnId(const std::string& name) const { @@ -453,7 +453,9 @@ void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx) void CompileFinalProjection(TKqpOlapCompileContext& ctx) { auto resultColNames = ctx.GetResultColNames(); - YQL_ENSURE(!resultColNames.empty()); + if (resultColNames.empty()) { + return; + } auto* projection = ctx.CreateProjection(); for (auto colName : resultColNames) { @@ -493,9 +495,7 @@ void CompileOlapProgram(const TCoLambda& lambda, const TKikimrTableMetadata& tab TKqpOlapCompileContext ctx(lambda.Args().Arg(0), tableMeta, readProto, resultColNames); CompileOlapProgramImpl(lambda.Body(), ctx); - if (!ctx.IsEmptyProgram()) { - CompileFinalProjection(ctx); - } + CompileFinalProjection(ctx); ctx.SerializeToProto(); } diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index ead01cb124d..e3c26ea5e7c 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -287,7 +287,8 @@ TKqpScanComputeContext::TScanData::TScanData(const NKikimrTxDataShard::TKqpTrans } } - if (meta.GetResultColumns().empty()) { + if (meta.GetResultColumns().empty() && !meta.HasOlapProgram()) { + // Currently we define ResultColumns just for Olap tables in TKqpQueryCompiler ResultColumns = Columns; } else { ResultColumns.reserve(meta.GetResultColumns().size()); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 24b562b3122..26a560fcc4a 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -8,6 +8,7 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> +#include <ydb/core/formats/ssa_runtime_version.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/datashard/datashard_ut_common_kqp.h> @@ -412,7 +413,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Cerr << ast << Endl; for (auto planNode : planNodes) { UNIT_ASSERT_C(ast.find(planNode) != std::string::npos, - TStringBuilder() << planNode << " was not pushed down. Query: " << query); + TStringBuilder() << planNode << " was not found. Query: " << query); } if (!readNodeType.empty()) { @@ -1206,7 +1207,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(result, R"([[23000u;]])"); // Check plan +#if SSA_RUNTIME_VERSION >= 2U CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }, "TableFullScan"); +#else + CheckPlanForAggregatePushdown(query, tableClient, { "CombineCore" }, ""); +#endif } } @@ -1247,7 +1252,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(result, R"([[[0];4600u];[[1];4600u];[[2];4600u];[[3];4600u];[[4];4600u]])"); // Check plan +#if SSA_RUNTIME_VERSION >= 2U CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }, "TableFullScan"); +#else + CheckPlanForAggregatePushdown(query, tableClient, { "CombineCore" }, ""); +#endif } } @@ -1288,7 +1297,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(result, R"([[23000u;]])"); // Check plan +#if SSA_RUNTIME_VERSION >= 2U CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }, "TableFullScan"); +#else + CheckPlanForAggregatePushdown(query, tableClient, { "Condense" }, ""); +#endif } } @@ -1583,9 +1596,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WHERE level = 2 )") .SetExpectedReply("[[4600u;]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg") .AddExpectedPlanOptions("KqpOlapFilter") .MutableLimitChecker().SetExpectedResultCount(1) +#else + .AddExpectedPlanOptions("KqpOlapFilter") + .AddExpectedPlanOptions("Condense") +#endif ; TestAggregations({ testCase }); @@ -1600,9 +1618,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WHERE level = 2 )") .SetExpectedReply("[[4600u;]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg") .AddExpectedPlanOptions("KqpOlapFilter") .MutableLimitChecker().SetExpectedResultCount(1) +#else + .AddExpectedPlanOptions("CombineCore") + .AddExpectedPlanOptions("KqpOlapFilter") +#endif ; TestAggregations({ testCase }); @@ -1617,9 +1640,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WHERE level = 2 )") .SetExpectedReply("[[4600u;]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg") .AddExpectedPlanOptions("KqpOlapFilter") .MutableLimitChecker().SetExpectedResultCount(1) +#else + .AddExpectedPlanOptions("CombineCore") + .AddExpectedPlanOptions("KqpOlapFilter") +#endif ; TestAggregations({ testCase }); @@ -1739,7 +1767,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WHERE level = 2 )") .SetExpectedReply("[[4600u;]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg") +#else + .AddExpectedPlanOptions("CombineCore") +#endif .AddExpectedPlanOptions("KqpOlapFilter"); TestAggregations({ testCase }); @@ -1753,7 +1785,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { FROM `/Root/olapStore/olapTable` )") .SetExpectedReply("[[[46000;]]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg"); +#else + .AddExpectedPlanOptions("CombineCore"); +#endif TestAggregations({ testCase }); } @@ -1768,7 +1804,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ORDER BY level )") .SetExpectedReply("[[[0];[0]];[[1];[4600]];[[2];[9200]];[[3];[13800]];[[4];[18400]]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg"); +#else + .AddExpectedPlanOptions("CombineCore"); +#endif TestAggregations({ testCase }); } @@ -1781,7 +1821,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { FROM `/Root/olapStore/olapTable` )") .SetExpectedReply("[[[0]]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg"); +#else + .AddExpectedPlanOptions("CombineCore"); +#endif TestAggregations({ testCase }); } @@ -1794,7 +1838,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { FROM `/Root/olapStore/olapTable` )") .SetExpectedReply("[[[4]]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg"); +#else + .AddExpectedPlanOptions("CombineCore"); +#endif TestAggregations({ testCase }); } @@ -1809,7 +1857,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ORDER BY level )") .SetExpectedReply("[[[0];[\"10000\"]];[[1];[\"10001\"]];[[2];[\"10002\"]];[[3];[\"10003\"]];[[4];[\"10004\"]]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg"); +#else + .AddExpectedPlanOptions("CombineCore"); +#endif TestAggregations({ testCase }); } @@ -1824,7 +1876,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ORDER BY level )") .SetExpectedReply("[[[0];[\"40995\"]];[[1];[\"40996\"]];[[2];[\"40997\"]];[[3];[\"40998\"]];[[4];[\"40999\"]]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg"); +#else + .AddExpectedPlanOptions("CombineCore"); +#endif TestAggregations({ testCase }); } @@ -1839,8 +1895,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ORDER BY c, resource_id DESC LIMIT 3 )") .SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]") +#if SSA_RUNTIME_VERSION >= 2U .AddExpectedPlanOptions("TKqpOlapAgg") .SetExpectedReadNodeType("TableFullScan"); +#else + .AddExpectedPlanOptions("CombineCore"); +#endif TestAggregations({ testCase }); } @@ -2956,6 +3016,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_C(parameter != parameters.end(), "No type " << checkType << " in parameters"); + Cerr << "Test query:\n" << query + predicate << Endl; + auto it = tableClient.StreamExecuteScanQuery(query + predicate, parameter->second).GetValueSync(); // Check for successful execution auto streamPart = it.ReadNext().GetValueSync(); |