aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2022-12-29 15:15:47 +0300
committeraidarsamer <aidarsamer@ydb.tech>2022-12-29 15:15:47 +0300
commite718be7eba95480bf6a909f927e98a8c89fb2263 (patch)
tree3c09686f0b23958dfc6c8774f7abbd5906c7cc92
parent80c09361cd3952574f1eec85bfd8057277252e90 (diff)
downloadydb-e718be7eba95480bf6a909f927e98a8c89fb2263.tar.gz
Add SSA runtime version
Upgrade SSA runtime version. If no result columns in scan task meta verify that this is not ColumnShard OLAP read. Add SSA runtime version
-rw-r--r--ydb/core/formats/ssa_runtime_version.h30
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp2
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp12
-rw-r--r--ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp14
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp3
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp64
6 files changed, 114 insertions, 11 deletions
diff --git a/ydb/core/formats/ssa_runtime_version.h b/ydb/core/formats/ssa_runtime_version.h
new file mode 100644
index 00000000000..1b6f4d9611e
--- /dev/null
+++ b/ydb/core/formats/ssa_runtime_version.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <util/system/types.h>
+
+namespace NKikimr::NSsa {
+// Problem: rolling update of services based on SSA runtime (example: YDB)
+// requires careful management of SSA runtime versions. A SSA program
+// that was built with a YDB stable-23-1 must work correctly at YDB stable-22-4
+// (previous stable) if YDB rollback happens.
+//
+// Solution: we support SSA runtime version. Every incompatible change to
+// SSA runtime increments SSA_RUNTIME_VERSION. A user of SSA runtime
+// (YDB for example) manually chooses 'RuntimeVersion' to provide major/minor
+// releases compatibility. For instance, if YDB stable-22-4 support version X,
+// the owner of YDB stable-23-1 sets RuntimeVersion to version X to allow
+// graceful rolling update. When there is not chance to rollback to previous
+// version, for instance stable-22-4, the YDB owner can switch to the new
+// version Y (Y > X).
+
+// Bump this version every time incompatible runtime functions are introduced.
+#ifndef SSA_RUNTIME_VERSION
+#define SSA_RUNTIME_VERSION 2U
+#endif
+
+// History:
+// v1 supports filter and cast(timestamp to uint64) pushdowns.
+// v2 supports COUNT(col), COUNT(*), SUM(), MIN(), MAX(), SOME() aggregations.
+constexpr ui32 RuntimeVersion = SSA_RUNTIME_VERSION;
+
+}
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
index ba6f77981af..d5dbaf0f76e 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
@@ -136,8 +136,6 @@ TExprBase KqpApplyExtractMembersToReadOlapTable(TExprBase node, TExprContext& ct
return node;
}
- // When process is set it may use columns in read.Columns() but those columns may not be present
- // in the results. Thus do not apply extract members if process is not empty lambda
if (read.Process().Body().Raw() != read.Process().Args().Arg(0).Raw()) {
auto extractMembers = Build<TKqpOlapExtractMembers>(ctx, node.Pos())
.Input(read.Process().Args().Arg(0))
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
index 42e3475fa9d..08d99b41b71 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
@@ -2,6 +2,8 @@
#include <ydb/core/kqp/common/kqp_yql.h>
+#include <ydb/core/formats/ssa_runtime_version.h>
+
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <vector>
@@ -54,6 +56,11 @@ bool CanBePushedDown(const TExprBase& trait, TExprContext& ctx)
TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx)
{
+ if (NKikimr::NSsa::RuntimeVersion < 2U) {
+ // We introduced aggregate pushdown in v2 of SSA program
+ return node;
+ }
+
if (!kqpCtx.Config->HasOptEnableOlapPushdown()) {
return node;
}
@@ -135,6 +142,11 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
TExprBase KqpPushOlapLength(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx)
{
+ if (NKikimr::NSsa::RuntimeVersion < 2U) {
+ // We introduced aggregate pushdown in v2 of SSA program
+ return node;
+ }
+
if (!kqpCtx.Config->HasOptEnableOlapPushdown()) {
return node;
}
diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
index 56fb4be5b6d..1989b965d48 100644
--- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
@@ -1,6 +1,8 @@
#include "kqp_olap_compiler.h"
#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/formats/ssa_runtime_version.h>
+
#include <ydb/library/yql/core/yql_opt_utils.h>
namespace NKikimr {
@@ -12,8 +14,6 @@ using namespace NKikimrSSA;
using EAggFunctionType = TProgram::TAggregateAssignment::EAggregateFunction;
-constexpr ui32 OLAP_PROGRAM_VERSION = 1;
-
namespace {
struct TAggColInfo {
@@ -38,7 +38,7 @@ public:
MaxColumnId = std::max(MaxColumnId, columnMeta.Id);
}
- Program.SetVersion(OLAP_PROGRAM_VERSION);
+ Program.SetVersion(NKikimr::NSsa::RuntimeVersion);
}
ui32 GetColumnId(const std::string& name) const {
@@ -453,7 +453,9 @@ void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx)
void CompileFinalProjection(TKqpOlapCompileContext& ctx) {
auto resultColNames = ctx.GetResultColNames();
- YQL_ENSURE(!resultColNames.empty());
+ if (resultColNames.empty()) {
+ return;
+ }
auto* projection = ctx.CreateProjection();
for (auto colName : resultColNames) {
@@ -493,9 +495,7 @@ void CompileOlapProgram(const TCoLambda& lambda, const TKikimrTableMetadata& tab
TKqpOlapCompileContext ctx(lambda.Args().Arg(0), tableMeta, readProto, resultColNames);
CompileOlapProgramImpl(lambda.Body(), ctx);
- if (!ctx.IsEmptyProgram()) {
- CompileFinalProjection(ctx);
- }
+ CompileFinalProjection(ctx);
ctx.SerializeToProto();
}
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index ead01cb124d..e3c26ea5e7c 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -287,7 +287,8 @@ TKqpScanComputeContext::TScanData::TScanData(const NKikimrTxDataShard::TKqpTrans
}
}
- if (meta.GetResultColumns().empty()) {
+ if (meta.GetResultColumns().empty() && !meta.HasOlapProgram()) {
+ // Currently we define ResultColumns just for Olap tables in TKqpQueryCompiler
ResultColumns = Columns;
} else {
ResultColumns.reserve(meta.GetResultColumns().size());
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 24b562b3122..26a560fcc4a 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -8,6 +8,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+#include <ydb/core/formats/ssa_runtime_version.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/datashard/datashard_ut_common_kqp.h>
@@ -412,7 +413,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Cerr << ast << Endl;
for (auto planNode : planNodes) {
UNIT_ASSERT_C(ast.find(planNode) != std::string::npos,
- TStringBuilder() << planNode << " was not pushed down. Query: " << query);
+ TStringBuilder() << planNode << " was not found. Query: " << query);
}
if (!readNodeType.empty()) {
@@ -1206,7 +1207,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
+#if SSA_RUNTIME_VERSION >= 2U
CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }, "TableFullScan");
+#else
+ CheckPlanForAggregatePushdown(query, tableClient, { "CombineCore" }, "");
+#endif
}
}
@@ -1247,7 +1252,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[[0];4600u];[[1];4600u];[[2];4600u];[[3];4600u];[[4];4600u]])");
// Check plan
+#if SSA_RUNTIME_VERSION >= 2U
CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }, "TableFullScan");
+#else
+ CheckPlanForAggregatePushdown(query, tableClient, { "CombineCore" }, "");
+#endif
}
}
@@ -1288,7 +1297,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
+#if SSA_RUNTIME_VERSION >= 2U
CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }, "TableFullScan");
+#else
+ CheckPlanForAggregatePushdown(query, tableClient, { "Condense" }, "");
+#endif
}
}
@@ -1583,9 +1596,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WHERE level = 2
)")
.SetExpectedReply("[[4600u;]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg")
.AddExpectedPlanOptions("KqpOlapFilter")
.MutableLimitChecker().SetExpectedResultCount(1)
+#else
+ .AddExpectedPlanOptions("KqpOlapFilter")
+ .AddExpectedPlanOptions("Condense")
+#endif
;
TestAggregations({ testCase });
@@ -1600,9 +1618,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WHERE level = 2
)")
.SetExpectedReply("[[4600u;]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg")
.AddExpectedPlanOptions("KqpOlapFilter")
.MutableLimitChecker().SetExpectedResultCount(1)
+#else
+ .AddExpectedPlanOptions("CombineCore")
+ .AddExpectedPlanOptions("KqpOlapFilter")
+#endif
;
TestAggregations({ testCase });
@@ -1617,9 +1640,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WHERE level = 2
)")
.SetExpectedReply("[[4600u;]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg")
.AddExpectedPlanOptions("KqpOlapFilter")
.MutableLimitChecker().SetExpectedResultCount(1)
+#else
+ .AddExpectedPlanOptions("CombineCore")
+ .AddExpectedPlanOptions("KqpOlapFilter")
+#endif
;
TestAggregations({ testCase });
@@ -1739,7 +1767,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WHERE level = 2
)")
.SetExpectedReply("[[4600u;]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg")
+#else
+ .AddExpectedPlanOptions("CombineCore")
+#endif
.AddExpectedPlanOptions("KqpOlapFilter");
TestAggregations({ testCase });
@@ -1753,7 +1785,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
FROM `/Root/olapStore/olapTable`
)")
.SetExpectedReply("[[[46000;]]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg");
+#else
+ .AddExpectedPlanOptions("CombineCore");
+#endif
TestAggregations({ testCase });
}
@@ -1768,7 +1804,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ORDER BY level
)")
.SetExpectedReply("[[[0];[0]];[[1];[4600]];[[2];[9200]];[[3];[13800]];[[4];[18400]]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg");
+#else
+ .AddExpectedPlanOptions("CombineCore");
+#endif
TestAggregations({ testCase });
}
@@ -1781,7 +1821,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
FROM `/Root/olapStore/olapTable`
)")
.SetExpectedReply("[[[0]]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg");
+#else
+ .AddExpectedPlanOptions("CombineCore");
+#endif
TestAggregations({ testCase });
}
@@ -1794,7 +1838,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
FROM `/Root/olapStore/olapTable`
)")
.SetExpectedReply("[[[4]]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg");
+#else
+ .AddExpectedPlanOptions("CombineCore");
+#endif
TestAggregations({ testCase });
}
@@ -1809,7 +1857,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ORDER BY level
)")
.SetExpectedReply("[[[0];[\"10000\"]];[[1];[\"10001\"]];[[2];[\"10002\"]];[[3];[\"10003\"]];[[4];[\"10004\"]]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg");
+#else
+ .AddExpectedPlanOptions("CombineCore");
+#endif
TestAggregations({ testCase });
}
@@ -1824,7 +1876,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ORDER BY level
)")
.SetExpectedReply("[[[0];[\"40995\"]];[[1];[\"40996\"]];[[2];[\"40997\"]];[[3];[\"40998\"]];[[4];[\"40999\"]]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg");
+#else
+ .AddExpectedPlanOptions("CombineCore");
+#endif
TestAggregations({ testCase });
}
@@ -1839,8 +1895,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ORDER BY c, resource_id DESC LIMIT 3
)")
.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]")
+#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg")
.SetExpectedReadNodeType("TableFullScan");
+#else
+ .AddExpectedPlanOptions("CombineCore");
+#endif
TestAggregations({ testCase });
}
@@ -2956,6 +3016,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_C(parameter != parameters.end(), "No type " << checkType << " in parameters");
+ Cerr << "Test query:\n" << query + predicate << Endl;
+
auto it = tableClient.StreamExecuteScanQuery(query + predicate, parameter->second).GetValueSync();
// Check for successful execution
auto streamPart = it.ReadNext().GetValueSync();