diff options
author | spuchin <spuchin@ydb.tech> | 2022-11-07 20:46:42 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-11-07 20:46:42 +0300 |
commit | 6861509fb1a82c5cfcc57f994592c8dd66b56975 (patch) | |
tree | 4772d4d4c541c7ad1bdab98e6e14bec04507a591 | |
parent | 50f76e264c70a223a34b24aa59e97bff97128f4c (diff) | |
download | ydb-6861509fb1a82c5cfcc57f994592c8dd66b56975.tar.gz |
Remove OldEngine optimizers code. ()
47 files changed, 783 insertions, 5608 deletions
diff --git a/ydb/core/kqp/host/CMakeLists.txt b/ydb/core/kqp/host/CMakeLists.txt index d0e18d46391..192853c90f9 100644 --- a/ydb/core/kqp/host/CMakeLists.txt +++ b/ydb/core/kqp/host/CMakeLists.txt @@ -31,7 +31,6 @@ target_link_libraries(core-kqp-host PUBLIC ) target_sources(core-kqp-host PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_host.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_ne_helper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_explain_prepared.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_physical.cpp diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 85b0f0d8804..3b8c052879f 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -352,8 +352,6 @@ public: } else { FillAstAndPlan(prepareResult, *prepareResult.PreparingQuery); } - - prepareResult.QueryTraits = QueryCtx->QueryTraits; } private: diff --git a/ydb/core/kqp/host/kqp_ne_helper.cpp b/ydb/core/kqp/host/kqp_ne_helper.cpp deleted file mode 100644 index 030140717c7..00000000000 --- a/ydb/core/kqp/host/kqp_ne_helper.cpp +++ /dev/null @@ -1,63 +0,0 @@ -#include "kqp_ne_helper.h" - -#include <ydb/core/kqp/common/kqp_yql.h> - -#include <ydb/library/yql/core/yql_expr_optimize.h> -#include <ydb/library/yql/utils/log/log.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYql; -using namespace NYql::NNodes; - -bool CanExecuteWithNewEngine(const TKiProgram& program, TExprContext& ctx) { - bool allow = true; - TStringBuf forbiddenCallable; - - VisitExpr( - program.Ptr(), - [&allow](const TExprNode::TPtr&) { - return allow; - }, - [&allow, &forbiddenCallable](const TExprNode::TPtr& node) { - -#define FORBID_CALLABLE(x) if (x::Match(node.Get())) { allow = false; forbiddenCallable = x::CallableName(); return true; } - - FORBID_CALLABLE(TKiUpdateRow); - FORBID_CALLABLE(TKiEraseRow); - FORBID_CALLABLE(TKiDeleteTable); - FORBID_CALLABLE(TKiEffects); - FORBID_CALLABLE(TKiSelectIndexRange); - FORBID_CALLABLE(TCoEquiJoin); - FORBID_CALLABLE(TCoJoin); - FORBID_CALLABLE(TCoMapJoinCore); - FORBID_CALLABLE(TCoJoinDict); - FORBID_CALLABLE(TCoSqlIn); - -#undef FORBID_CALLABLE - - if (auto selectRange = TMaybeNode<TKiSelectRange>(node)) { - if (selectRange.Cast().Table().Path().Value().ends_with("/indexImplTable")) { - forbiddenCallable = "KiSelectIndexRange"; - allow = false; - return true; - } - } - - return true; - }); - - if (allow) { - YQL_CLOG(NOTICE, ProviderKqp) << "Query " << KqpExprToPrettyString(program.Ref(), ctx) - << " can be executed with NewEngine"; - } else { - YQL_CLOG(NOTICE, ProviderKqp) << "Query " << KqpExprToPrettyString(program.Ref(), ctx) - << " cannot be executed with NewEngine (blocked by '" << forbiddenCallable << "')"; - } - - return allow; -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/host/kqp_ne_helper.h b/ydb/core/kqp/host/kqp_ne_helper.h deleted file mode 100644 index 64b495afcc1..00000000000 --- a/ydb/core/kqp/host/kqp_ne_helper.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h> - -#include <ydb/library/yql/ast/yql_expr.h> - -namespace NKikimr { -namespace NKqp { - -bool CanExecuteWithNewEngine(const NYql::NNodes::TKiProgram& program, NYql::TExprContext& ctx); - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 0eb7a61a13d..4f7465ca2d2 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -1,5 +1,4 @@ #include "kqp_host_impl.h" -#include "kqp_ne_helper.h" #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/compile/kqp_compile.h> @@ -44,7 +43,6 @@ public: queryResult.ProtobufArenaPtr.get()); } - queryResult.QueryTraits = TransformCtx.QueryCtx->QueryTraits; queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats); } @@ -71,7 +69,6 @@ public: results.push_back(result); } - queryResult.QueryTraits = TransformCtx.QueryCtx->QueryTraits; queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats); queryResult.Results = std::move(results); } diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h index 47063b67d29..8aa8e8eb151 100644 --- a/ydb/core/kqp/kqp.h +++ b/ydb/core/kqp/kqp.h @@ -4,7 +4,6 @@ #include <library/cpp/lwtrace/shuttle.h> #include <ydb/core/kqp/common/kqp_common.h> #include <ydb/core/kqp/counters/kqp_counters.h> -#include <ydb/core/kqp/provider/yql_kikimr_query_traits.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/library/yql/dq/actors/dq.h> @@ -225,7 +224,6 @@ struct TKqpCompileResult { ETableReadType MaxReadType; TPreparedQueryConstPtr PreparedQuery; - std::optional<TQueryTraits> QueryTraits; }; struct TEvKqp { diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp index 407a7eec910..a83b0f60e31 100644 --- a/ydb/core/kqp/kqp_compile_actor.cpp +++ b/ydb/core/kqp/kqp_compile_actor.cpp @@ -254,17 +254,14 @@ private: if (status == Ydb::StatusIds::SUCCESS) { YQL_ENSURE(kqpResult.PreparingQuery); KqpCompileResult->PreparedQuery.reset(kqpResult.PreparingQuery.release()); - KqpCompileResult->QueryTraits = kqpResult.QueryTraits; auto now = TInstant::Now(); auto duration = now - StartTime; Counters->ReportCompileDurations(DbCounters, duration, CompileCpuTime); - const auto& queryTraits = KqpCompileResult->QueryTraits; LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation successful" << ", self: " << ctx.SelfID - << ", duration: " << duration - << ", traits: " << (queryTraits ? queryTraits->ToString() : "<none>")); + << ", duration: " << duration); } else { LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed" << ", self: " << ctx.SelfID diff --git a/ydb/core/kqp/opt/CMakeLists.txt b/ydb/core/kqp/opt/CMakeLists.txt index 52ec5e53da1..c4f7da7040d 100644 --- a/ydb/core/kqp/opt/CMakeLists.txt +++ b/ydb/core/kqp/opt/CMakeLists.txt @@ -32,4 +32,5 @@ target_sources(core-kqp-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_phase.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_phy_check.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_range_legacy.cpp ) diff --git a/ydb/core/kqp/opt/kqp_opt_impl.h b/ydb/core/kqp/opt/kqp_opt_impl.h index 88e49dc8a17..17d90672f24 100644 --- a/ydb/core/kqp/opt/kqp_opt_impl.h +++ b/ydb/core/kqp/opt/kqp_opt_impl.h @@ -6,6 +6,7 @@ #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/providers/common/provider/yql_table_lookup.h> namespace NKikimr::NKqp::NOpt { @@ -49,4 +50,16 @@ NYql::NNodes::TKqpTable BuildTableMeta(const NYql::TKikimrTableMetadata& tableMe TIntrusivePtr<NYql::TKikimrTableMetadata> GetIndexMetadata(const NYql::NNodes::TKqlReadTableIndex& index, const NYql::TKikimrTablesData& tables, TStringBuf cluster); +bool KqpTableLookupCanCompare(NYql::NNodes::TExprBase node); +NYql::NNodes::TMaybeNode<NYql::NNodes::TExprBase> KqpTableLookupGetValue(NYql::NNodes::TExprBase node, + const NYql::TTypeAnnotationNode* type, NYql::TExprContext& ctx); +NYql::NCommon::TTableLookup::TCompareResult KqpTableLookupCompare(NYql::NNodes::TExprBase left, + NYql::NNodes::TExprBase right); + +TVector<std::pair<NYql::TExprNode::TPtr, const NYql::TIndexDescription*>> BuildSecondaryIndexVector( + const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx, + const THashSet<TStringBuf>* filter, + const std::function<NYql::NNodes::TExprBase (const NYql::TKikimrTableMetadata&, + NYql::TPositionHandle, NYql::TExprContext&)>& tableBuilder); + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index c9f1c846974..93c7bea5af6 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -1,7 +1,6 @@ #include "kqp_opt_impl.h" #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> -#include <ydb/core/kqp/provider/kqp_opt_helpers.h> #include <ydb/library/yql/core/yql_opt_utils.h> @@ -12,6 +11,107 @@ using namespace NYql::NNodes; namespace { +TExprBase UnwrapReadTableValues(TExprBase input, const TKikimrTableDescription& tableDesc, + const TCoAtomList columns, TExprContext& ctx) +{ + TCoArgument itemArg = Build<TCoArgument>(ctx, input.Pos()) + .Name("item") + .Done(); + + TVector<TExprBase> structItems; + for (auto atom : columns) { + auto columnType = tableDesc.GetColumnType(TString(atom.Value())); + YQL_ENSURE(columnType); + + auto item = Build<TCoNameValueTuple>(ctx, input.Pos()) + .Name(atom) + .Value<TCoCoalesce>() + .Predicate<TCoMember>() + .Struct(itemArg) + .Name(atom) + .Build() + .Value<TCoDefault>() + .Type(ExpandType(atom.Pos(), *columnType->Cast<TOptionalExprType>()->GetItemType(), ctx)) + .Build() + .Build() + .Done(); + + structItems.push_back(item); + } + + return Build<TCoMap>(ctx, input.Pos()) + .Input(input) + .Lambda() + .Args({itemArg}) + .Body<TCoAsStruct>() + .Add(structItems) + .Build() + .Build() + .Done(); +} + +// Replace absent input columns to NULL to perform REPLACE via UPSERT +std::pair<TExprBase, TCoAtomList> CreateRowsToReplace(const TExprBase& input, + const TCoAtomList& inputColumns, const TKikimrTableDescription& tableDesc, + TPositionHandle pos, TExprContext& ctx) +{ + THashSet<TStringBuf> inputColumnsSet; + for (const auto& name : inputColumns) { + inputColumnsSet.insert(name.Value()); + } + + auto rowArg = Build<TCoArgument>(ctx, pos) + .Name("row") + .Done(); + + TVector<TCoAtom> writeColumns; + TVector<TExprBase> writeMembers; + + for (const auto& [name, _] : tableDesc.Metadata->Columns) { + TMaybeNode<TExprBase> memberValue; + if (tableDesc.GetKeyColumnIndex(name) || inputColumnsSet.contains(name)) { + memberValue = Build<TCoMember>(ctx, pos) + .Struct(rowArg) + .Name().Build(name) + .Done(); + } else { + auto type = tableDesc.GetColumnType(name); + YQL_ENSURE(type); + + memberValue = Build<TCoNothing>(ctx, pos) + .OptionalType(NCommon::BuildTypeExpr(pos, *type, ctx)) + .Done(); + } + + auto nameAtom = TCoAtom(ctx.NewAtom(pos, name)); + + YQL_ENSURE(memberValue); + auto memberTuple = Build<TCoNameValueTuple>(ctx, pos) + .Name(nameAtom) + .Value(memberValue.Cast()) + .Done(); + + writeColumns.emplace_back(std::move(nameAtom)); + writeMembers.emplace_back(std::move(memberTuple)); + } + + auto writeData = Build<TCoMap>(ctx, pos) + .Input(input) + .Lambda() + .Args({rowArg}) + .Body<TCoAsStruct>() + .Add(writeMembers) + .Build() + .Build() + .Done(); + + auto columnList = Build<TCoAtomList>(ctx, pos) + .Add(writeColumns) + .Done(); + + return {writeData, columnList}; +} + bool UseReadTableRanges(const TKikimrTableDescription& tableData, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx) { /* * OLAP tables can not work with ordinary ReadTable in case there is no support in physical @@ -97,7 +197,7 @@ TExprBase BuildReadTable(const TKiReadTable& read, const TKikimrTableDescription auto readNode = BuildReadTable(columns, read.Pos(), tableData, ctx, kqpCtx); return unwrapValues - ? UnwrapKiReadTableValues(readNode, tableData, columns, ctx) + ? UnwrapReadTableValues(readNode, tableData, columns, ctx) : readNode; } @@ -121,7 +221,7 @@ TExprBase BuildReadTableIndex(const TKiReadTable& read, const TKikimrTableDescri .Done(); return unwrapValues - ? UnwrapKiReadTableValues(kqlReadTable, tableData, kqlReadTable.Columns(), ctx) + ? UnwrapReadTableValues(kqlReadTable, tableData, kqlReadTable.Columns(), ctx) : kqlReadTable; } diff --git a/ydb/core/kqp/opt/kqp_opt_range_legacy.cpp b/ydb/core/kqp/opt/kqp_opt_range_legacy.cpp new file mode 100644 index 00000000000..163bce1e4bc --- /dev/null +++ b/ydb/core/kqp/opt/kqp_opt_range_legacy.cpp @@ -0,0 +1,256 @@ +#include "kqp_opt_impl.h" + +#include <ydb/library/yql/utils/utf8.h> + +namespace NKikimr::NKqp::NOpt { + +using namespace NYql; +using namespace NYql::NNodes; +using namespace NYql::NCommon; + +namespace { + +template<typename T> +TTableLookup::TCompareResult::TResult CompareValues(const T& left, const T& right) { + if (left == right) { + return TTableLookup::TCompareResult::Equal; + } else { + return left > right + ? TTableLookup::TCompareResult::Greater + : TTableLookup::TCompareResult::Less; + } +} + +template<typename T> +TTableLookup::TCompareResult CompareIntegralNodes(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { + T leftValue = FromString<T>(left.Ref(), slot); + T rightValue = FromString<T>(right.Ref(), slot); + auto compareResult = CompareValues(leftValue, rightValue); + + TMaybe<bool> adjacent; + switch (compareResult) { + case TTableLookup::TCompareResult::Equal: + break; + + case TTableLookup::TCompareResult::Greater: + adjacent = leftValue == rightValue + 1; + break; + + case TTableLookup::TCompareResult::Less: + adjacent = rightValue == leftValue + 1; + break; + } + + return TTableLookup::TCompareResult(compareResult, adjacent); +} + +template<typename T> +TTableLookup::TCompareResult CompareNodes(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { + T leftValue = FromString<T>(left.Ref(), slot); + T rightValue = FromString<T>(right.Ref(), slot); + return TTableLookup::TCompareResult(CompareValues(leftValue, rightValue)); +} + +template<> +TTableLookup::TCompareResult CompareNodes<bool>(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { + bool leftValue = FromString<bool>(left.Ref(), slot); + bool rightValue = FromString<bool>(right.Ref(), slot); + auto compareResult = CompareValues(leftValue, rightValue); + + return TTableLookup::TCompareResult(compareResult); +} + +template<> +TTableLookup::TCompareResult CompareNodes<ui64>(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { + return CompareIntegralNodes<ui64>(left, right, slot); +} + +template<> +TTableLookup::TCompareResult CompareNodes<i64>(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { + return CompareIntegralNodes<i64>(left, right, slot); +} + +template<> +TTableLookup::TCompareResult CompareNodes<TString>(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { + Y_UNUSED(slot); + + const auto& leftValue = left.Value(); + const auto& rightValue = right.Value(); + return TTableLookup::TCompareResult(CompareValues(leftValue, rightValue)); +} + +} // namespace + +bool KqpTableLookupCanCompare(TExprBase node) { + if (node.Maybe<TCoBool>()) { + return true; + } + + if (node.Maybe<TCoIntegralCtor>()) { + return true; + } + + if (node.Maybe<TCoString>()) { + return true; + } + + if (node.Maybe<TCoUtf8>()) { + return true; + } + + return false; +} + +TMaybeNode<TExprBase> KqpTableLookupGetValue(TExprBase node, const TTypeAnnotationNode* type, + TExprContext& ctx) +{ + const TTypeAnnotationNode* targetType = type; + bool isTargetOptional = false; + if (type->GetKind() == ETypeAnnotationKind::Optional) { + targetType = type->Cast<TOptionalExprType>()->GetItemType(); + isTargetOptional = true; + } + + if (targetType->GetKind() != ETypeAnnotationKind::Data) { + return TMaybeNode<TExprBase>(); + } + + THashSet<const TExprNode*> knownArgs; + bool canPush = true; + VisitExpr(node.Ptr(), [&knownArgs, &canPush] (const TExprNode::TPtr& exprNode) { + auto node = TExprBase(exprNode); + + if (!canPush) { + return false; + } + + if (auto maybeLambda = node.Maybe<TCoLambda>()) { + for (const auto& arg : maybeLambda.Cast().Args()) { + knownArgs.emplace(arg.Raw()); + } + } + + if (auto maybeArg = node.Maybe<TCoArgument>()) { + if (!knownArgs.contains(maybeArg.Cast().Raw())) { + canPush = false; + return false; + } + } + + return true; + }); + + if (!canPush) { + return TMaybeNode<TExprBase>(); + } + + const auto& dataTypeName = targetType->Cast<TDataExprType>()->GetName(); + + TExprBase valueNode = node; + if (isTargetOptional) { + if (auto maybeJust = node.Maybe<TCoJust>()) { + valueNode = maybeJust.Cast().Input(); + } + + if (node.Maybe<TCoNothing>()) { + return Build<TCoNothing>(ctx, node.Pos()) + .OptionalType(ExpandType(node.Pos(), *type, ctx)) + .Done() + .Ptr(); + } + } + + TExprNode::TPtr literal; + if (auto maybeInt = valueNode.Maybe<TCoIntegralCtor>()) { + if (maybeInt.Cast().CallableName() == dataTypeName) { + return valueNode; + } + + if (AllowIntegralConversion(maybeInt.Cast(), false, NKikimr::NUdf::GetDataSlot(dataTypeName))) { + literal = maybeInt.Cast().Literal().Ptr(); + } + } + + if (auto maybeString = valueNode.Maybe<TCoString>()) { + if (dataTypeName == "String") { + return valueNode; + } + + if (dataTypeName == "Utf8") { + auto atom = maybeString.Cast().Literal(); + auto value = atom.Value(); + if (!IsUtf8(value)) { + return {}; + } + + literal = atom.Ptr(); + } + } + + if (auto maybeUtf8 = valueNode.Maybe<TCoUtf8>()) { + if (dataTypeName == "String" || dataTypeName == "Utf8") { + literal = maybeUtf8.Cast().Literal().Ptr(); + } + } + + if (auto maybeBool = valueNode.Maybe<TCoBool>()) { + if (dataTypeName == "Bool") { + literal = maybeBool.Cast().Literal().Ptr(); + } + } + + if (literal) { + auto ret = ctx.Builder(valueNode.Pos()) + .Callable(dataTypeName) + .Add(0, literal) + .Seal() + .Build(); + + return ret; + } + + auto valueType = valueNode.Ref().GetTypeAnn(); + if (isTargetOptional && valueType->GetKind() == ETypeAnnotationKind::Optional) { + valueType = valueType->Cast<TOptionalExprType>()->GetItemType(); + } + + if (valueType->GetKind() == ETypeAnnotationKind::Data && + valueType->Cast<TDataExprType>()->GetName() == dataTypeName) + { + return node; + } + + return Build<TCoConvert>(ctx, node.Pos()) + .Input(node) + .Type().Build(dataTypeName) + .Done(); +} + +TTableLookup::TCompareResult KqpTableLookupCompare(TExprBase left, TExprBase right) { + if (left.Maybe<TCoBool>() && right.Maybe<TCoBool>()) { + return CompareNodes<bool>(left.Cast<TCoBool>().Literal(), + right.Cast<TCoBool>().Literal(), NKikimr::NUdf::EDataSlot::Bool); + } + + if (left.Maybe<TCoUint64>() && right.Maybe<TCoUint64>()) { + return CompareNodes<ui64>(left.Cast<TCoUint64>().Literal(), + right.Cast<TCoUint64>().Literal(), NKikimr::NUdf::EDataSlot::Uint64); + } + + if (left.Maybe<TCoIntegralCtor>() && right.Maybe<TCoIntegralCtor>()) { + return CompareNodes<i64>(left.Cast<TCoIntegralCtor>().Literal(), + right.Cast<TCoIntegralCtor>().Literal(), NKikimr::NUdf::EDataSlot::Int64); + } + + if (left.Maybe<TCoString>() && right.Maybe<TCoString>() || + left.Maybe<TCoUtf8>() && right.Maybe<TCoUtf8>()) + { + return CompareNodes<TString>(left.Cast<TCoDataCtor>().Literal(), + right.Cast<TCoDataCtor>().Literal(), NKikimr::NUdf::EDataSlot::String); + } + + YQL_ENSURE(false, "Unexpected nodes in Kikimr TableLookupCompare: (" << left.Ref().Content() + << ", " << right.Ref().Content() << ")"); +} + +} // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp index ed3f4576acf..f08f1be6c3b 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp @@ -12,6 +12,88 @@ using namespace NYql::NNodes; namespace { +TCoAtomList BuildKeyColumnsList(const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx) { + TVector<TExprBase> columnsToSelect; + columnsToSelect.reserve(table.Metadata->KeyColumnNames.size()); + for (auto key : table.Metadata->KeyColumnNames) { + auto value = table.Metadata->Columns.at(key); + auto atom = Build<TCoAtom>(ctx, pos) + .Value(value.Name) + .Done(); + + columnsToSelect.push_back(atom); + } + + return Build<TCoAtomList>(ctx, pos) + .Add(columnsToSelect) + .Done(); +} + +TCoAtomList MergeColumns(const NNodes::TCoAtomList& col1, const TVector<TString>& col2, TExprContext& ctx) { + TVector<TCoAtom> columns; + THashSet<TString> uniqColumns; + columns.reserve(col1.Size() + col2.size()); + + for (const auto& c : col1) { + YQL_ENSURE(uniqColumns.emplace(c.StringValue()).second); + columns.push_back(c); + } + + for (const auto& c : col2) { + if (uniqColumns.emplace(c).second) { + auto atom = Build<TCoAtom>(ctx, col1.Pos()) + .Value(c) + .Done(); + columns.push_back(atom); + } + } + + return Build<TCoAtomList>(ctx, col1.Pos()) + .Add(columns) + .Done(); +} + +bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) { + auto checkKey = [keySelector, &tableDesc, columns] (const TExprBase& key, ui32 index) { + if (!key.Maybe<TCoMember>()) { + return false; + } + + auto member = key.Cast<TCoMember>(); + if (member.Struct().Raw() != keySelector.Args().Arg(0).Raw()) { + return false; + } + + auto column = member.Name().StringValue(); + auto columnIndex = tableDesc.GetKeyColumnIndex(column); + if (!columnIndex || *columnIndex != index) { + return false; + } + + if (columns) { + columns->emplace_back(std::move(column)); + } + + return true; + }; + + auto lambdaBody = keySelector.Body(); + if (auto maybeTuple = lambdaBody.Maybe<TExprList>()) { + auto tuple = maybeTuple.Cast(); + for (size_t i = 0; i < tuple.Size(); ++i) { + if (!checkKey(tuple.Item(i), i)) { + return false; + } + } + } else { + if (!checkKey(lambdaBody, 0)) { + return false; + } + } + + return true; +} + bool CanPushTopSort(const TCoTopSort& node, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) { return IsKeySelectorPkPrefix(node.KeySelectorLambda(), tableDesc, columns); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index 4856df9704d..aa6315b0d89 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -4,7 +4,6 @@ #include <ydb/core/kqp/opt/kqp_opt_impl.h> #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> -#include <ydb/core/kqp/provider/yql_kikimr_opt_utils.h> #include <ydb/library/yql/core/yql_opt_utils.h> @@ -17,6 +16,100 @@ using namespace NYql::NNodes; namespace { +bool GetEquiJoinKeyTypes(TExprBase leftInput, const TString& leftColumnName, const TKikimrTableDescription& rightTable, + const TString& rightColumnName, const TDataExprType*& leftData, const TDataExprType*& rightData) +{ + auto rightType = rightTable.GetColumnType(rightColumnName); + YQL_ENSURE(rightType); + if (rightType->GetKind() == ETypeAnnotationKind::Optional) { + rightType = rightType->Cast<TOptionalExprType>()->GetItemType(); + } + + YQL_ENSURE(rightType->GetKind() == ETypeAnnotationKind::Data); + rightData = rightType->Cast<TDataExprType>(); + + auto leftInputType = leftInput.Ref().GetTypeAnn(); + YQL_ENSURE(leftInputType); + YQL_ENSURE(leftInputType->GetKind() == ETypeAnnotationKind::List); + auto itemType = leftInputType->Cast<TListExprType>()->GetItemType(); + YQL_ENSURE(itemType->GetKind() == ETypeAnnotationKind::Struct); + auto structType = itemType->Cast<TStructExprType>(); + auto memberIndex = structType->FindItem(leftColumnName); + YQL_ENSURE(memberIndex, "Column '" << leftColumnName << "' not found in " << *((TTypeAnnotationNode*) structType)); + + auto leftType = structType->GetItems()[*memberIndex]->GetItemType(); + if (leftType->GetKind() == ETypeAnnotationKind::Optional) { + leftType = leftType->Cast<TOptionalExprType>()->GetItemType(); + } + + if (leftType->GetKind() != ETypeAnnotationKind::Data) { + return false; + } + + leftData = leftType->Cast<TDataExprType>(); + return true; +} + +TExprBase ConvertToTuples(const TSet<TString>& columns, const TCoArgument& structArg, TExprContext& ctx, + TPositionHandle pos) +{ + TVector<TExprBase> tuples{Reserve(columns.size())}; + + for (const auto& key : columns) { + tuples.emplace_back(Build<TCoMember>(ctx, pos) + .Struct(structArg) + .Name().Build(key) + .Done()); + } + + if (tuples.size() == 1) { + return tuples[0]; + } + + return Build<TExprList>(ctx, pos) + .Add(tuples) + .Done(); +} + +TExprBase DeduplicateByMembers(const TExprBase& expr, const TSet<TString>& members, TExprContext& ctx, + TPositionHandle pos) +{ + auto structArg = Build<TCoArgument>(ctx, pos) + .Name("struct") + .Done(); + + return Build<TCoPartitionByKey>(ctx, pos) + .Input(expr) + .KeySelectorLambda() + .Args(structArg) + .Body(ConvertToTuples(members, structArg, ctx, pos)) + .Build() + .SortDirections<TCoVoid>() + .Build() + .SortKeySelectorLambda<TCoVoid>() + .Build() + .ListHandlerLambda() + .Args({"stream"}) + .Body<TCoFlatMap>() + .Input("stream") + .Lambda() + .Args({"tuple"}) + .Body<TCoTake>() + .Input<TCoNth>() + .Tuple("tuple") + .Index().Value("1").Build() + .Build() + .Count<TCoUint64>() + .Literal().Value("1").Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Done(); +} + + [[maybe_unused]] bool IsKqlPureExpr(const TExprBase& expr) { auto node = FindNode(expr.Ptr(), [](const TExprNode::TPtr& node) { diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp index 51b3ae98174..5d62ee43596 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp @@ -207,7 +207,7 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T auto row = flatmap.Lambda().Args().Arg(0); auto predicate = TExprBase(flatmap.Lambda().Body().Ref().ChildPtr(0)); TTableLookup lookup = ExtractTableLookup(row, predicate, tableDesc.Metadata->KeyColumnNames, - &KiTableLookupGetValue, &KiTableLookupCanCompare, &KiTableLookupCompare, ctx, + &KqpTableLookupGetValue, &KqpTableLookupCanCompare, &KqpTableLookupCompare, ctx, kqpCtx.Config->HasAllowNullCompareInIndex()); if (lookup.IsFullScan()) { diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp index 987ba08d2d6..150e2c6a492 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp @@ -35,7 +35,7 @@ TMaybeNode<TExprBase> TryBuildTrivialReadTable(TCoFlatMap& flatmap, TKqlReadTabl auto row = flatmap.Lambda().Args().Arg(0); auto predicate = TExprBase(flatmap.Lambda().Body().Ref().ChildPtr(0)); TTableLookup lookup = ExtractTableLookup(row, predicate, tableDesc.Metadata->KeyColumnNames, - &KiTableLookupGetValue, &KiTableLookupCanCompare, &KiTableLookupCompare, ctx, + &KqpTableLookupGetValue, &KqpTableLookupCanCompare, &KqpTableLookupCompare, ctx, kqpCtx.Config->HasAllowNullCompareInIndex()); if (lookup.IsFullScan()) { diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp index 118442443db..0155d45134a 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp @@ -4,7 +4,6 @@ #include <ydb/core/kqp/opt/kqp_opt_impl.h> #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> -#include <ydb/core/kqp/provider/yql_kikimr_opt_utils.h> #include <ydb/library/yql/core/common_opt/yql_co_sqlin.h> @@ -14,6 +13,38 @@ using namespace NYql; using namespace NYql::NDq; using namespace NYql::NNodes; +namespace { + +bool CanRewriteSqlInToEquiJoin(const TTypeAnnotationNode* lookupType, const TTypeAnnotationNode* collectionType) { + // SqlIn in Dict + if (collectionType->GetKind() == ETypeAnnotationKind::Dict) { + return IsDataOrOptionalOfData(lookupType); + } + + // SqlIn in List<DataType> or List<Tuple<DataType...>> + if (collectionType->GetKind() == ETypeAnnotationKind::List) { + auto collectionItemType = collectionType->Cast<TListExprType>()->GetItemType(); + + if (collectionItemType->GetKind() == ETypeAnnotationKind::Tuple) { + if (lookupType->GetKind() != ETypeAnnotationKind::Tuple) { + return false; + } + auto lookupItems = lookupType->Cast<TTupleExprType>()->GetItems(); + auto collectionItems = collectionItemType->Cast<TTupleExprType>()->GetItems(); + if (lookupItems.size() != collectionItems.size()) { + return false; + } + return AllOf(collectionItems, [](const auto& item) { return IsDataOrOptionalOfData(item); }); + } + + return IsDataOrOptionalOfData(collectionItemType); + } + + return false; +} + +} // namespace + TExprBase KqpRewriteSqlInToEquiJoin(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, const TKikimrConfiguration::TPtr& config) { diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin_compact.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin_compact.cpp index f3ffaa1ec0f..bc57164a0b7 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin_compact.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin_compact.cpp @@ -3,7 +3,6 @@ #include <ydb/core/kqp/opt/kqp_opt_impl.h> #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> -#include <ydb/core/kqp/provider/yql_kikimr_opt_utils.h> #include <ydb/library/yql/core/common_opt/yql_co_sqlin.h> #include <ydb/library/yql/core/yql_opt_utils.h> diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp index ac8a95c6f98..eb31d985fb0 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp @@ -6,6 +6,72 @@ using namespace NYql; using namespace NYql::NDq; using namespace NYql::NNodes; +namespace { + +TExprBase ExtractKeys(TCoArgument itemArg, const TKikimrTableDescription& tableDesc, + TExprContext& ctx) +{ + TVector<TExprBase> keys; + for (TString& keyColumnName : tableDesc.Metadata->KeyColumnNames) { + auto key = Build<TCoMember>(ctx, itemArg.Pos()) + .Struct(itemArg) + .Name().Build(keyColumnName) + .Done(); + + keys.emplace_back(std::move(key)); + } + + if (keys.size() == 1) { + return keys[0]; + } + + return Build<TExprList>(ctx, itemArg.Pos()) + .Add(keys) + .Done(); +} + +TExprBase RemoveDuplicateKeyFromInput(const TExprBase& input, const TKikimrTableDescription& tableDesc, + TPositionHandle pos, TExprContext& ctx) +{ + const auto& keySelectorArg = Build<TCoArgument>(ctx, pos) + .Name("item") + .Done(); + + const auto& streamArg = Build<TCoArgument>(ctx, pos) + .Name("streamArg") + .Done(); + + return Build<TCoPartitionByKey>(ctx, pos) + .Input(input) + .KeySelectorLambda() + .Args(keySelectorArg) + .Body(ExtractKeys(keySelectorArg, tableDesc, ctx)) + .Build() + .SortDirections<TCoVoid>().Build() + .SortKeySelectorLambda<TCoVoid>().Build() + .ListHandlerLambda() + .Args({TStringBuf("stream")}) + .Body<TCoFlatMap>() + .Input(TStringBuf("stream")) + .Lambda<TCoLambda>() + .Args(streamArg) + .Body<TCoLast>() + .Input<TCoForwardList>() + .Stream<TCoNth>() + .Tuple(streamArg) + .Index().Value(ToString(1)) + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Done(); +} + +} // namespace + TExprNode::TPtr MakeMessage(TStringBuf message, TPositionHandle pos, TExprContext& ctx) { return ctx.NewCallable(pos, "Utf8", { ctx.NewAtom(pos, message) }); } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h index 6712d94a193..d744f731b78 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h @@ -2,7 +2,6 @@ #include <ydb/core/kqp/opt/kqp_opt_impl.h> #include <ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h> -#include <ydb/core/kqp/provider/kqp_opt_helpers.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/dq/opt/dq_opt.h> diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp index e0315a4c990..f1f0804b27b 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp @@ -8,6 +8,48 @@ using namespace NYql::NNodes; namespace { +TVector<TExprBase> CreateColumnsToSelectToUpdateIndex( + const TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> indexes, + const TVector<TString>& pk, + const THashSet<TString>& dataColumns, + TPositionHandle pos, + TExprContext& ctx) +{ + TVector<TExprBase> columnsToSelect; + TSet<TString> columns; + + for (const auto& pair : indexes) { + for (const auto& col : pair.second->KeyColumns) { + if (columns.insert(col).second) { + auto atom = Build<TCoAtom>(ctx, pos) + .Value(col) + .Done(); + columnsToSelect.emplace_back(std::move(atom)); + } + } + + for (const auto& col : dataColumns) { + if (columns.insert(col).second) { + auto atom = Build<TCoAtom>(ctx, pos) + .Value(col) + .Done(); + columnsToSelect.emplace_back(std::move(atom)); + } + } + } + + for (const auto& p : pk) { + const auto& atom = Build<TCoAtom>(ctx, pos) + .Value(p) + .Done(); + if (columns.insert(p).second) { + columnsToSelect.push_back(atom); + } + } + + return columnsToSelect; +} + TDqPhyPrecompute PrecomputeDict(const TCondenseInputResult& condenseResult, TPositionHandle pos, TExprContext& ctx) { auto computeDictStage = Build<TDqStage>(ctx, pos) .Inputs() @@ -32,6 +74,50 @@ TDqPhyPrecompute PrecomputeDict(const TCondenseInputResult& condenseResult, TPos } // namespace +TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> BuildSecondaryIndexVector( + const TKikimrTableDescription& table, + TPositionHandle pos, + TExprContext& ctx, + const THashSet<TStringBuf>* filter, + const std::function<TExprBase (const TKikimrTableMetadata&, TPositionHandle, TExprContext&)>& tableBuilder) +{ + TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> secondaryIndexes; + secondaryIndexes.reserve(table.Metadata->Indexes.size()); + YQL_ENSURE(table.Metadata->Indexes.size() == table.Metadata->SecondaryGlobalIndexMetadata.size()); + for (size_t i = 0; i < table.Metadata->Indexes.size(); i++) { + const auto& indexMeta = table.Metadata->Indexes[i]; + + if (!indexMeta.ItUsedForWrite()) { + continue; + } + + // Add index if filter absent + bool addIndex = filter ? false : true; + + for (const auto& col : indexMeta.KeyColumns) { + + if (filter) { + // Add index if filter and at least one column present in the filter + addIndex |= filter->contains(TStringBuf(col)); + } + } + + for (const auto& col : indexMeta.DataColumns) { + + if (filter) { + // Add index if filter and at least one column present in the filter + addIndex |= filter->contains(TStringBuf(col)); + } + } + + if (indexMeta.KeyColumns && addIndex) { + auto indexTable = tableBuilder(*table.Metadata->SecondaryGlobalIndexMetadata[i], pos, ctx).Ptr(); + secondaryIndexes.emplace_back(std::make_pair(indexTable, &indexMeta)); + } + } + return secondaryIndexes; +} + TSecondaryIndexes BuildSecondaryIndexVector(const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx, const THashSet<TStringBuf>* filter) { @@ -39,7 +125,7 @@ TSecondaryIndexes BuildSecondaryIndexVector(const TKikimrTableDescription& table return BuildTableMeta(meta, pos, ctx); }; - return ::NKikimr::NKqp::BuildSecondaryIndexVector(table, pos, ctx, filter, cb); + return BuildSecondaryIndexVector(table, pos, ctx, filter, cb); } TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& lookupKeys, diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp index 9117dcbf7bc..b8742ac6387 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp @@ -89,6 +89,24 @@ TRowsAndKeysResult PrecomputeRowsAndKeys(const TCondenseInputResult& condenseRes }; } +// Return set of data columns need to be save during index update +THashSet<TString> CreateDataColumnSetToRead( + const TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>>& indexes, + const THashSet<TStringBuf>& inputColumns) +{ + THashSet<TString> res; + + for (const auto& index : indexes) { + for (const auto& col : index.second->DataColumns) { + if (!inputColumns.contains(col)) { + res.emplace(col); + } + } + } + + return res; +} + TExprBase MakeNonexistingRowsFilter(const TDqPhyPrecompute& inputRows, const TDqPhyPrecompute& lookupDict, const TVector<TString>& dictKeys, TPositionHandle pos, TExprContext& ctx) { diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 88f103fa05c..a0e28b2ab21 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -3,7 +3,6 @@ #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/opt/kqp_opt_impl.h> #include <ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h> -#include <ydb/core/kqp/provider/kqp_opt_helpers.h> #include <ydb/core/tx/schemeshard/schemeshard_utils.h> #include <ydb/public/lib/scheme_types/scheme_type_id.h> @@ -228,7 +227,7 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, if (!input.Maybe<TDqCnUnionAll>()) { return node; } - + if (!IsSingleConsumerConnection(input, parents, false)) { continue; } diff --git a/ydb/core/kqp/prepare/kqp_query_plan.cpp b/ydb/core/kqp/prepare/kqp_query_plan.cpp index 068f352dbfb..0fe84452d21 100644 --- a/ydb/core/kqp/prepare/kqp_query_plan.cpp +++ b/ydb/core/kqp/prepare/kqp_query_plan.cpp @@ -109,197 +109,6 @@ TString GetExprStr(const TExprBase& scalar, bool quoteStr = true) { return "expr"; } -void FillTablesInfo(const TExprNode::TPtr& query, TMap<TString, TTableInfo>& tables) { - TNodeSet visitedNodes; - TQueue<TMaybe<TExprScope>> scopesQueue; - scopesQueue.push(TMaybe<TExprScope>()); - - while (!scopesQueue.empty()) { - auto scope = scopesQueue.front(); - scopesQueue.pop(); - - auto scopeRoot = scope - ? scope->Lambda.Body() - : TExprBase(query); - - VisitExpr(scopeRoot.Ptr(), [scope, &scopesQueue, &tables] (const TExprNode::TPtr& exprNode) { - auto node = TExprBase(exprNode); - auto mapDepth = scope ? scope->Depth : 0; - - if (node.Maybe<TCoLambda>()) { - return false; - } - - if (auto callable = node.Maybe<TCallable>()) { - bool isMap = callable.Maybe<TCoMapBase>().IsValid() || - callable.Maybe<TCoFlatMapBase>().IsValid() || - callable.Maybe<TCoFilterBase>().IsValid(); - - for (const auto& arg : node.Cast<TVarArgCallable<TExprBase>>()) { - if (auto lambda = arg.Maybe<TCoLambda>()) { - TExprScope newScope(callable.Cast(), lambda.Cast(), isMap ? mapDepth + 1 : mapDepth); - scopesQueue.push(newScope); - } - } - } - - if (auto maybeSelectRange = node.Maybe<TKiSelectRange>()) { - auto selectRange = maybeSelectRange.Cast(); - - TTableRead read; - - auto isBoundDefined = [](const TExprBase& bound) -> bool { - return !bound.Maybe<TCoNothing>() && !bound.Maybe<TCoVoid>(); - }; - - auto getBoundStr = [](const TExprBase& bound) -> TString { - if (bound.Maybe<TCoNothing>()) { - return "-inf"; - } - - if (bound.Maybe<TCoVoid>()) { - return "+inf"; - } - - return GetExprStr(bound); - }; - - bool incFrom = false; - bool incTo = false; - for (const auto& node : selectRange.Range()) { - if (auto maybeAtom = node.Maybe<TCoAtom>()) { - auto value = maybeAtom.Cast().Value(); - - if (value == "IncFrom") { - incFrom = true; - } - - if (value == "IncTo") { - incTo = true; - } - } - } - - bool isScan = false; - for (const auto& node : selectRange.Range()) { - if (auto maybeColumnRange = node.Maybe<TKiColumnRangeTuple>()) { - auto columnRange = maybeColumnRange.Cast(); - - if (!isScan && columnRange.From().Raw() == columnRange.To().Raw()) { - auto lookup = TStringBuilder() << columnRange.Column().Value() - << " (" << GetExprStr(columnRange.From()) << ")"; - read.LookupBy.push_back(lookup); - } else { - isScan = true; - - TStringBuilder scanBuilder; - scanBuilder << columnRange.Column().Value(); - - if (isBoundDefined(columnRange.From()) || isBoundDefined(columnRange.To())) { - scanBuilder - << (incFrom ? " [" : " (") - << getBoundStr(columnRange.From()) << ", " - << getBoundStr(columnRange.To()) - << (incTo ? "]" : ")"); - } - - read.ScanBy.push_back(scanBuilder); - } - } - } - - auto limitSetting = GetSetting(selectRange.Settings().Ref(), "ItemsLimit"); - if (limitSetting && TMaybeNode<TCoNameValueTuple>(limitSetting)) { - read.Limit = GetExprStr(TCoNameValueTuple(limitSetting).Value().Cast()); - } - - auto reverseSettings = GetSetting(selectRange.Settings().Ref(), "Reverse"); - if (reverseSettings) { - read.Reverse = true; - } - - for (const auto& column : selectRange.Select()) { - read.Columns.emplace_back(column); - } - - if (read.LookupBy.empty()) { - read.Type = TKikimrKeyRange::IsFull(selectRange.Range()) - ? EPlanTableReadType::FullScan - : EPlanTableReadType::Scan; - } else { - read.Type = mapDepth > 0 - ? EPlanTableReadType::MultiLookup - : EPlanTableReadType::Lookup; - } - - tables[TString(selectRange.Table().Path())].Reads.push_back(read); - } - - if (auto maybeSelectRow = node.Maybe<TKiSelectRow>()) { - auto selectRow = maybeSelectRow.Cast(); - - TTableRead read; - read.Type = mapDepth > 0 - ? EPlanTableReadType::MultiLookup - : EPlanTableReadType::Lookup; - - for (const auto& key : selectRow.Key()) { - auto lookup = TStringBuilder() << key.Name().Value() - << " (" << GetExprStr(key.Value().Cast()) << ")"; - read.LookupBy.push_back(lookup); - } - - for (const auto& column : selectRow.Select()) { - read.Columns.emplace_back(column); - } - - tables[TString(selectRow.Table().Path())].Reads.push_back(read); - } - - if (auto maybeUpdateRow = node.Maybe<TKiUpdateRow>()) { - auto updateRow = maybeUpdateRow.Cast(); - - TTableWrite write; - write.Type = mapDepth > 0 - ? EPlanTableWriteType::MultiUpsert - : EPlanTableWriteType::Upsert; - - - for (const auto& tuple : updateRow.Key()) { - auto key = TStringBuilder() << tuple.Name().Value() - << " (" << GetExprStr(tuple.Value().Cast()) << ")"; - write.Keys.push_back(key); - } - - for (const auto& tuple : updateRow.Update()) { - write.Columns.emplace_back(tuple.Name()); - } - - tables[TString(updateRow.Table().Path())].Writes.push_back(write); - } - - if (auto maybeEraseRow = node.Maybe<TKiEraseRow>()) { - auto eraseRow = maybeEraseRow.Cast(); - - TTableWrite write; - write.Type = mapDepth > 0 - ? EPlanTableWriteType::MultiErase - : EPlanTableWriteType::Erase; - - for (const auto& tuple : eraseRow.Key()) { - auto key = TStringBuilder() << tuple.Name().Value() - << " (" << GetExprStr(tuple.Value().Cast()) << ")"; - write.Keys.push_back(key); - } - - tables[TString(eraseRow.Table().Path())].Writes.push_back(write); - } - - return true; - }, visitedNodes); - } -}; - class TxPlanSerializer { public: TxPlanSerializer(TSerializerCtx& serializerCtx, ui32 txId, const TKqpPhysicalTx& tx) : SerializerCtx(serializerCtx), TxId(txId), Tx(tx) {} @@ -1450,24 +1259,6 @@ void SetNonZero(NJson::TJsonValue& node, const TStringBuf& name, T value) { } // namespace -void WriteKqlPlan(NJsonWriter::TBuf& writer, const TExprNode::TPtr& query) { - TMap<TString, TTableInfo> tables; - - writer.BeginObject(); - writer.WriteKey("meta"); - - writer.BeginObject(); - writer.WriteKey("version").WriteString("0.1"); - writer.WriteKey("type").WriteString("query"); - writer.EndObject(); - - writer.WriteKey("tables"); - FillTablesInfo(query, tables); - WriteCommonTablesInfo(writer, tables); - - writer.EndObject(); -} - // TODO(sk): check prepared statements params in read ranges // TODO(sk): check params from correlated subqueries // lookup join void PhyQuerySetTxPlans(NKqpProto::TKqpPhyQuery& queryProto, const TKqpPhysicalQuery& query, diff --git a/ydb/core/kqp/prepare/kqp_query_plan.h b/ydb/core/kqp/prepare/kqp_query_plan.h index 828cb5ce5c6..ad5b0045044 100644 --- a/ydb/core/kqp/prepare/kqp_query_plan.h +++ b/ydb/core/kqp/prepare/kqp_query_plan.h @@ -28,8 +28,6 @@ enum class EPlanTableWriteType { MultiErase, }; -void WriteKqlPlan(NJsonWriter::TBuf& writer, const NYql::TExprNode::TPtr& query); - /* * Set dqPlan in each physical transaction (TKqpPhyQuery.Transactions[].Plan). Common query plan with all * table accesses is stored in top-level TKqpPhyQuery.QueryPlan. diff --git a/ydb/core/kqp/provider/CMakeLists.txt b/ydb/core/kqp/provider/CMakeLists.txt index fccd83767a0..78868c23b22 100644 --- a/ydb/core/kqp/provider/CMakeLists.txt +++ b/ydb/core/kqp/provider/CMakeLists.txt @@ -7,7 +7,6 @@ find_package(Python3 REQUIRED) -add_subdirectory(mkql) add_subdirectory(ut) add_library(core-kqp-provider) @@ -18,7 +17,6 @@ target_link_libraries(core-kqp-provider PUBLIC contrib-libs-cxxsupp yutil ydb-core-base - kqp-provider-mkql ydb-core-protos ydb-library-aclib library-aclib-protos @@ -41,21 +39,14 @@ target_link_libraries(core-kqp-provider PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-provider PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/kqp_opt_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_datasink.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_datasource.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_exec.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_expr_nodes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_gateway.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_kql.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_mkql.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_opt_join.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_opt_range.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_opt_utils.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_provider.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_query_traits.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_results.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_settings.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp diff --git a/ydb/core/kqp/provider/kqp_opt_helpers.cpp b/ydb/core/kqp/provider/kqp_opt_helpers.cpp deleted file mode 100644 index 0f9d77214f6..00000000000 --- a/ydb/core/kqp/provider/kqp_opt_helpers.cpp +++ /dev/null @@ -1,241 +0,0 @@ -#include "kqp_opt_helpers.h" - -#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYql; -using namespace NYql::NNodes; - -TExprBase ExtractKeys(TCoArgument itemArg, const TKikimrTableDescription& tableDesc, - TExprContext& ctx) -{ - TVector<TExprBase> keys; - for (TString& keyColumnName : tableDesc.Metadata->KeyColumnNames) { - auto key = Build<TCoMember>(ctx, itemArg.Pos()) - .Struct(itemArg) - .Name().Build(keyColumnName) - .Done(); - - keys.emplace_back(std::move(key)); - } - - if (keys.size() == 1) { - return keys[0]; - } - - return Build<TExprList>(ctx, itemArg.Pos()) - .Add(keys) - .Done(); -} - -TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> BuildSecondaryIndexVector( - const TKikimrTableDescription& table, - TPositionHandle pos, - TExprContext& ctx, - const THashSet<TStringBuf>* filter, - const std::function<TExprBase (const TKikimrTableMetadata&, TPositionHandle, TExprContext&)>& tableBuilder) -{ - TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> secondaryIndexes; - secondaryIndexes.reserve(table.Metadata->Indexes.size()); - YQL_ENSURE(table.Metadata->Indexes.size() == table.Metadata->SecondaryGlobalIndexMetadata.size()); - for (size_t i = 0; i < table.Metadata->Indexes.size(); i++) { - const auto& indexMeta = table.Metadata->Indexes[i]; - - if (!indexMeta.ItUsedForWrite()) { - continue; - } - - // Add index if filter absent - bool addIndex = filter ? false : true; - - for (const auto& col : indexMeta.KeyColumns) { - - if (filter) { - // Add index if filter and at least one column present in the filter - addIndex |= filter->contains(TStringBuf(col)); - } - } - - for (const auto& col : indexMeta.DataColumns) { - - if (filter) { - // Add index if filter and at least one column present in the filter - addIndex |= filter->contains(TStringBuf(col)); - } - } - - if (indexMeta.KeyColumns && addIndex) { - auto indexTable = tableBuilder(*table.Metadata->SecondaryGlobalIndexMetadata[i], pos, ctx).Ptr(); - secondaryIndexes.emplace_back(std::make_pair(indexTable, &indexMeta)); - } - } - return secondaryIndexes; -} - -TVector<TExprBase> CreateColumnsToSelectToUpdateIndex( - const TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> indexes, - const TVector<TString>& pk, - const THashSet<TString>& dataColumns, - TPositionHandle pos, - TExprContext& ctx) -{ - TVector<TExprBase> columnsToSelect; - TSet<TString> columns; - - for (const auto& pair : indexes) { - for (const auto& col : pair.second->KeyColumns) { - if (columns.insert(col).second) { - auto atom = Build<TCoAtom>(ctx, pos) - .Value(col) - .Done(); - columnsToSelect.emplace_back(std::move(atom)); - } - } - - for (const auto& col : dataColumns) { - if (columns.insert(col).second) { - auto atom = Build<TCoAtom>(ctx, pos) - .Value(col) - .Done(); - columnsToSelect.emplace_back(std::move(atom)); - } - } - } - - for (const auto& p : pk) { - const auto& atom = Build<TCoAtom>(ctx, pos) - .Value(p) - .Done(); - if (columns.insert(p).second) { - columnsToSelect.push_back(atom); - } - } - - return columnsToSelect; -} - -// Return set of data columns need to be save during index update -THashSet<TString> CreateDataColumnSetToRead( - const TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>>& indexes, - const THashSet<TStringBuf>& inputColumns) -{ - THashSet<TString> res; - - for (const auto& index : indexes) { - for (const auto& col : index.second->DataColumns) { - if (!inputColumns.contains(col)) { - res.emplace(col); - } - } - } - - return res; -} - -TExprBase RemoveDuplicateKeyFromInput(const TExprBase& input, const TKikimrTableDescription& tableDesc, - TPositionHandle pos, TExprContext& ctx) -{ - const auto& keySelectorArg = Build<TCoArgument>(ctx, pos) - .Name("item") - .Done(); - - const auto& streamArg = Build<TCoArgument>(ctx, pos) - .Name("streamArg") - .Done(); - - return Build<TCoPartitionByKey>(ctx, pos) - .Input(input) - .KeySelectorLambda() - .Args(keySelectorArg) - .Body(ExtractKeys(keySelectorArg, tableDesc, ctx)) - .Build() - .SortDirections<TCoVoid>().Build() - .SortKeySelectorLambda<TCoVoid>().Build() - .ListHandlerLambda() - .Args({TStringBuf("stream")}) - .Body<TCoFlatMap>() - .Input(TStringBuf("stream")) - .Lambda<TCoLambda>() - .Args(streamArg) - .Body<TCoLast>() - .Input<TCoForwardList>() - .Stream<TCoNth>() - .Tuple(streamArg) - .Index().Value(ToString(1)) - .Build() - .Build() - .Build() - .Build() - .Build() - .Build() - .Build() - .Done(); -} - -// Replace absent input columns to NULL to perform REPLACE via UPSERT -std::pair<TExprBase, TCoAtomList> CreateRowsToReplace(const TExprBase& input, - const TCoAtomList& inputColumns, const TKikimrTableDescription& tableDesc, - TPositionHandle pos, TExprContext& ctx) -{ - THashSet<TStringBuf> inputColumnsSet; - for (const auto& name : inputColumns) { - inputColumnsSet.insert(name.Value()); - } - - auto rowArg = Build<TCoArgument>(ctx, pos) - .Name("row") - .Done(); - - TVector<TCoAtom> writeColumns; - TVector<TExprBase> writeMembers; - - for (const auto& [name, _] : tableDesc.Metadata->Columns) { - TMaybeNode<TExprBase> memberValue; - if (tableDesc.GetKeyColumnIndex(name) || inputColumnsSet.contains(name)) { - memberValue = Build<TCoMember>(ctx, pos) - .Struct(rowArg) - .Name().Build(name) - .Done(); - } else { - auto type = tableDesc.GetColumnType(name); - YQL_ENSURE(type); - - memberValue = Build<TCoNothing>(ctx, pos) - .OptionalType(NCommon::BuildTypeExpr(pos, *type, ctx)) - .Done(); - } - - auto nameAtom = TCoAtom(ctx.NewAtom(pos, name)); - - YQL_ENSURE(memberValue); - auto memberTuple = Build<TCoNameValueTuple>(ctx, pos) - .Name(nameAtom) - .Value(memberValue.Cast()) - .Done(); - - writeColumns.emplace_back(std::move(nameAtom)); - writeMembers.emplace_back(std::move(memberTuple)); - } - - auto writeData = Build<TCoMap>(ctx, pos) - .Input(input) - .Lambda() - .Args({rowArg}) - .Body<TCoAsStruct>() - .Add(writeMembers) - .Build() - .Build() - .Done(); - - auto columnList = Build<TCoAtomList>(ctx, pos) - .Add(writeColumns) - .Done(); - - return {writeData, columnList}; -} - -} // namespace NKqp -} // namespace NKikimr - diff --git a/ydb/core/kqp/provider/kqp_opt_helpers.h b/ydb/core/kqp/provider/kqp_opt_helpers.h deleted file mode 100644 index db7507cf55d..00000000000 --- a/ydb/core/kqp/provider/kqp_opt_helpers.h +++ /dev/null @@ -1,45 +0,0 @@ -#pragma once - -#include <ydb/core/kqp/provider/yql_kikimr_gateway.h> - -// Common opt helpers for "Old" and "New" engine. -// Should be in core/kqp/opt directory, but also use by Old engine from core/kqp/provider. - -namespace NYql { - class TKikimrTableDescription; -} - -namespace NKikimr { -namespace NKqp { - -NYql::NNodes::TExprBase ExtractKeys(NYql::NNodes::TCoArgument itemArg, const NYql::TKikimrTableDescription& tableDesc, - NYql::TExprContext& ctx); - -TVector<std::pair<NYql::TExprNode::TPtr, const NYql::TIndexDescription*>> BuildSecondaryIndexVector( - const NYql::TKikimrTableDescription& table, - NYql::TPositionHandle pos, - NYql::TExprContext& ctx, - const THashSet<TStringBuf>* filter, - const std::function<NYql::NNodes::TExprBase (const NYql::TKikimrTableMetadata&, NYql::TPositionHandle, NYql::TExprContext&)>& tableBuilder); - -TVector<NYql::NNodes::TExprBase> CreateColumnsToSelectToUpdateIndex( - const TVector<std::pair<NYql::TExprNode::TPtr, const NYql::TIndexDescription*>> indexes, - const TVector<TString>& pk, - const THashSet<TString>& dataColumns, - NYql::TPositionHandle pos, - NYql::TExprContext& ctx); - -THashSet<TString> CreateDataColumnSetToRead( - const TVector<std::pair<NYql::TExprNode::TPtr, const NYql::TIndexDescription*>>& indexes, - const THashSet<TStringBuf>& inputColumns); - -NYql::NNodes::TExprBase RemoveDuplicateKeyFromInput( - const NYql::NNodes::TExprBase& input, const NYql::TKikimrTableDescription& tableDesc, - NYql::TPositionHandle pos, NYql::TExprContext& ctx); - -std::pair<NYql::NNodes::TExprBase, NYql::NNodes::TCoAtomList> CreateRowsToReplace(const NYql::NNodes::TExprBase& input, - const NYql::NNodes::TCoAtomList& inputColumns, const NYql::TKikimrTableDescription& tableDesc, - NYql::TPositionHandle pos, NYql::TExprContext& ctx); - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/provider/mkql/CMakeLists.txt b/ydb/core/kqp/provider/mkql/CMakeLists.txt deleted file mode 100644 index 386664aace8..00000000000 --- a/ydb/core/kqp/provider/mkql/CMakeLists.txt +++ /dev/null @@ -1,41 +0,0 @@ - -# This file was gererated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -find_package(Python3 REQUIRED) - -add_library(kqp-provider-mkql STATIC) -set_property(TARGET kqp-provider-mkql PROPERTY - LINKER_LANGUAGE CXX -) -target_link_libraries(kqp-provider-mkql PUBLIC - contrib-libs-cxxsupp - yutil -) -target_sources(kqp-provider-mkql PRIVATE - ${CMAKE_BINARY_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.gen.h - ${CMAKE_BINARY_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.decl.inl.h - ${CMAKE_BINARY_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.defs.inl.h -) -add_custom_command( - OUTPUT - ${CMAKE_BINARY_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.gen.h - ${CMAKE_BINARY_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.decl.inl.h - ${CMAKE_BINARY_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.defs.inl.h - DEPENDS - ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.jnj - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.json - ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py - COMMAND - Python3::Interpreter - ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py - ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.jnj - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.json - ${CMAKE_BINARY_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.gen.h - ${CMAKE_BINARY_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.decl.inl.h - ${CMAKE_BINARY_DIR}/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.defs.inl.h -) diff --git a/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.h b/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.h deleted file mode 100644 index 83c1637d19d..00000000000 --- a/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.h +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once - -#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.gen.h> - -#include <ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.gen.h> - -namespace NYql { -namespace NNodes { - -#include <ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.decl.inl.h> - -#include <ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.defs.inl.h> - -} // namespace NNodes -} // namespace NYql diff --git a/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.json b/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.json deleted file mode 100644 index 1051c124b8d..00000000000 --- a/ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "NodeRootType": "TExprBase", - "NodeBuilderBase": "TNodeBuilderBase", - "ListBuilderBase": "TListBuilderBase", - "FreeArgCallableBase": "TFreeArgCallable", - "FreeArgBuilderBase": "TFreeArgCallableBuilderBase", - "Nodes": [ - { - "Name": "TMkqlVersionedTable", - "Base": "TExprBase", - "Match": {"Type": "Tuple"}, - "Children": [ - {"Index": 0, "Name": "Table", "Type": "TCoAtom"}, - {"Index": 1, "Name": "SchemaVersion", "Type": "TCoAtom"}, - {"Index": 2, "Name": "PathId", "Type": "TCoAtom"} - ] - }, - { - "Name": "TMkqlUpdateRow", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "UpdateRow"}, - "Children": [ - {"Index": 0, "Name": "Table", "Type": "TMkqlVersionedTable"}, - {"Index": 1, "Name": "Key", "Type": "TCoNameValueTupleList"}, - {"Index": 2, "Name": "Update", "Type": "TCoNameValueTupleList"} - ] - }, - { - "Name": "TMkqlEraseRow", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "EraseRow"}, - "Children": [ - {"Index": 0, "Name": "Table", "Type": "TMkqlVersionedTable"}, - {"Index": 1, "Name": "Key", "Type": "TCoNameValueTupleList"} - ] - } - ] -} diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index eb869ee0b45..67bdcda3bf6 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -17,13 +17,6 @@ public: : SessionCtx(sessionCtx) {} private: - TStatus HandleClusterConfig(TKiClusterConfig node, TExprContext& ctx) override { - Y_UNUSED(node); - Y_UNUSED(ctx); - - return TStatus::Ok; - } - TStatus HandleWriteTable(TKiWriteTable node, TExprContext& ctx) override { Y_UNUSED(ctx); @@ -264,41 +257,9 @@ private: } TStatus HandleKql(TCallable node, TExprContext& ctx) override { + Y_UNUSED(node); Y_UNUSED(ctx); - if (auto call = node.Maybe<TKiSelectRow>()) { - auto cluster = call.Cast().Cluster().Value(); - auto table = call.Cast().Table(); - - SessionCtx->Tables().GetOrAddTable(TString(cluster), SessionCtx->GetDatabase(), TString(table.Path())); - return TStatus::Ok; - } - - if (auto call = node.Maybe<TKiSelectRangeBase>()) { - auto cluster = call.Cast().Cluster().Value(); - auto table = call.Cast().Table(); - - SessionCtx->Tables().GetOrAddTable(TString(cluster), SessionCtx->GetDatabase(), TString(table.Path())); - return TStatus::Ok; - } - - if (auto call = node.Maybe<TKiUpdateRow>()) { - - auto cluster = call.Cast().Cluster().Value(); - auto table = call.Cast().Table(); - - SessionCtx->Tables().GetOrAddTable(TString(cluster), SessionCtx->GetDatabase(), TString(table.Path())); - return TStatus::Ok; - } - - if (auto call = node.Maybe<TKiEraseRow>()) { - auto cluster = call.Cast().Cluster().Value(); - auto table = call.Cast().Table(); - - SessionCtx->Tables().GetOrAddTable(TString(cluster), SessionCtx->GetDatabase(), TString(table.Path())); - return TStatus::Ok; - } - return TStatus::Ok; } @@ -341,32 +302,9 @@ public: } TExprNode::TPtr GetClusterInfo(const TString& cluster, TExprContext& ctx) override { - auto config = Gateway->GetClusterConfig(cluster); - if (!config) { - return {}; - } - - TPositionHandle pos; - - TVector<TExprBase> locators; - auto& grpcData = config->GetGrpc(); - for (size_t index = 0; index < grpcData.LocatorsSize(); ++index) { - locators.push_back(Build<TCoAtom>(ctx, pos) - .Value(grpcData.GetLocators(index)) - .Done()); - } - - return Build<TKiClusterConfig>(ctx, pos) - .GrpcData<TKiGrpcData>() - .Locators<TCoAtomList>() - .Add(locators) - .Build() - .TimeoutMs<TCoAtom>().Build(ToString(grpcData.GetTimeoutMs())) - .MaxMessageSizeBytes<TCoAtom>().Build(ToString(grpcData.GetMaxMessageSizeBytes())) - .MaxInFlight<TCoAtom>().Build(ToString(grpcData.GetMaxInFlight())) - .Build() - .TvmId<TCoAtom>().Build(ToString(config->GetTvmId())) - .Done().Ptr(); + Y_UNUSED(cluster); + Y_UNUSED(ctx); + return {}; } IGraphTransformer& GetIntentDeterminationTransformer() override { @@ -719,10 +657,6 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt auto callable = TCallable(input); - if (auto node = callable.Maybe<TKiClusterConfig>()) { - return HandleClusterConfig(node.Cast(), ctx); - } - if (auto node = callable.Maybe<TKiWriteTable>()) { return HandleWriteTable(node.Cast(), ctx); } diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.h b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.h index e91fd7635ab..c352ed1487a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.h +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.h @@ -70,20 +70,5 @@ public: #include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.defs.inl.h> -template<typename TParent> -class TNodeBuilder<TParent, TKiColumnRangeTuple> : public NGenerated::TKiColumnRangeTupleBuilder<TParent> -{ -public: - TNodeBuilder(TExprContext& ctx, TPositionHandle pos, - typename NGenerated::TKiColumnRangeTupleBuilder<TParent>::BuildFuncType buildFunc, - typename NGenerated::TKiColumnRangeTupleBuilder<TParent>::GetArgFuncType getArgFunc) - : NGenerated::TKiColumnRangeTupleBuilder<TParent>(ctx, pos, buildFunc, getArgFunc) {} - - TKiColumnRangeTuple DoBuild() { - auto node = this->Ctx.NewList(this->Pos, { this->ColumnHolder.Cast().Ptr(), this->FromHolder.Cast().Ptr(), this->ToHolder.Cast().Ptr() }); - return TKiColumnRangeTuple(node); - } -}; - } // namespace NNodes } // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json index d801df3a317..d96428ed0c6 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json @@ -265,188 +265,6 @@ {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"}, {"Index": 4, "Name": "Ast", "Type": "TExprBase"} ] - }, - { - "Name": "TKiColumnRangeTuple", - "Base": "TExprBase", - "Match": {"Type": "NodeType", "TypeName": "List"}, - "Builder": {"Generate": "Custom"}, - "Children": [ - {"Index": 0, "Name": "Column", "Type": "TCoAtom"}, - {"Index": 1, "Name": "From", "Type": "TExprBase"}, - {"Index": 2, "Name": "To", "Type": "TExprBase"} - ] - }, - { - "Name": "TKiSelectRow", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiSelectRow"}, - "Children": [ - {"Index": 0, "Name": "Cluster", "Type": "TCoAtom"}, - {"Index": 1, "Name": "Table", "Type": "TKiVersionedTable"}, - {"Index": 2, "Name": "Key", "Type": "TCoNameValueTupleList"}, - {"Index": 3, "Name": "Select", "Type": "TCoAtomList"} - ] - }, - { - "Name": "TKiUpdateRow", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiUpdateRow"}, - "Children": [ - {"Index": 0, "Name": "Cluster", "Type": "TCoAtom"}, - {"Index": 1, "Name": "Table", "Type": "TKiVersionedTable"}, - {"Index": 2, "Name": "Key", "Type": "TCoNameValueTupleList"}, - {"Index": 3, "Name": "Update", "Type": "TCoNameValueTupleList"} - ] - }, - { - "Name": "TKiEraseRow", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiEraseRow"}, - "Children": [ - {"Index": 0, "Name": "Cluster", "Type": "TCoAtom"}, - {"Index": 1, "Name": "Table", "Type": "TKiVersionedTable"}, - {"Index": 2, "Name": "Key", "Type": "TCoNameValueTupleList"} - ] - }, - { - "Name": "TKiSelectRangeBase", - "Base": "TCallable", - "Match": {"Type": "CallableBase"}, - "Builder": {"Generate": "None"}, - "Children": [ - {"Index": 0, "Name": "Cluster", "Type": "TCoAtom"}, - {"Index": 1, "Name": "Table", "Type": "TKiVersionedTable"}, - {"Index": 2, "Name": "Range", "Type": "TExprList"}, - {"Index": 3, "Name": "Select", "Type": "TCoAtomList"}, - {"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"} - ] - }, - { - "Name": "TKiSelectRange", - "Base": "TKiSelectRangeBase", - "Match": {"Type": "Callable", "Name": "KiSelectRange"} - }, - { - "Name": "TKiSelectIndexRange", - "Base": "TKiSelectRangeBase", - "Match": {"Type": "Callable", "Name": "KiSelectIndexRange"}, - "Children": [ - {"Index": 5, "Name": "IndexName", "Type": "TCoAtom"} - ] - }, - { - "Name": "TKiSetResult", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiSetResult"}, - "Children": [ - {"Index": 0, "Name": "Name", "Type": "TCoAtom"}, - {"Index": 1, "Name": "Data", "Type": "TExprBase"} - ] - }, - { - "Name": "TKiAcquireLocks", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiAcquireLocks"}, - "Children": [ - {"Index": 0, "Name": "LockTxId", "Type": "TExprBase"} - ] - }, - { - "Name": "TKiMapParameter", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiMapParameter"}, - "Children": [ - {"Index": 0, "Name": "Input", "Type": "TExprBase"}, - {"Index": 1, "Name": "Lambda", "Type": "TCoLambda"} - ] - }, - { - "Name": "TMkqlMapParameter", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "MapParameter"}, - "Children": [ - {"Index": 0, "Name": "Input", "Type": "TExprBase"}, - {"Index": 1, "Name": "Lambda", "Type": "TCoLambda"} - ] - }, - { - "Name": "TKiFlatMapParameter", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiFlatMapParameter"}, - "Children": [ - {"Index": 0, "Name": "Input", "Type": "TExprBase"}, - {"Index": 1, "Name": "Lambda", "Type": "TCoLambda"} - ] - }, - { - "Name": "TKiConditionalEffect", - "Base": "TCallable", - "Match": {"Type": "CallableBase"}, - "Builder": {"Generate": "None"}, - "Children": [ - {"Index": 0, "Name": "Predicate", "Type": "TExprBase"}, - {"Index": 1, "Name": "Effect", "Type": "TExprBase"}, - {"Index": 2, "Name": "Constraint", "Type": "TCoAtom"} - ] - }, - { - "Name": "TKiAbortIf", - "Base": "TKiConditionalEffect", - "Match": {"Type": "Callable", "Name": "KiAbortIf"} - }, - { - "Name": "TKiRevertIf", - "Base": "TKiConditionalEffect", - "Match": {"Type": "Callable", "Name": "KiRevertIf"} - }, - { - "Name": "TKiProgram", - "Base": "TExprBase", - "Match": {"Type": "Tuple"}, - "Children": [ - {"Index": 0, "Name": "Results", "Type": "TExprList"}, - {"Index": 1, "Name": "Effects", "Type": "TExprBase"} - ] - }, - { - "Name": "TKiPartialSort", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiPartialSort"}, - "Children": [ - {"Index": 0, "Name": "Input", "Type": "TExprBase"}, - {"Index": 1, "Name": "SortDirections", "Type": "TExprBase"}, - {"Index": 2, "Name": "KeySelectorLambda", "Type": "TCoLambda"} - ] - }, - { - "Name": "TKiPartialTake", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiPartialTake"}, - "Children": [ - {"Index": 0, "Name": "Input", "Type": "TExprBase"}, - {"Index": 1, "Name": "Count", "Type": "TExprBase"} - ] - }, - { - "Name": "TKiGrpcData", - "Base": "TExprBase", - "Match": {"Type": "Tuple"}, - "Children": [ - {"Index": 0, "Name": "Locators", "Type": "TCoAtomList"}, - {"Index": 1, "Name": "TimeoutMs", "Type": "TCoAtom"}, - {"Index": 2, "Name": "MaxMessageSizeBytes", "Type": "TCoAtom"}, - {"Index": 3, "Name": "MaxInFlight", "Type": "TCoAtom"} - ] - }, - { - "Name": "TKiClusterConfig", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "KiClusterConfig"}, - "Children": [ - {"Index": 0, "Name": "GrpcData", "Type": "TKiGrpcData"}, - {"Index": 1, "Name": "TvmId", "Type": "TCoAtom"} - ] } ] } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 6791ae8f58f..d902c1bd589 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -1,7 +1,5 @@ #pragma once -#include "yql_kikimr_query_traits.h" - #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/utils/resetable_setting.h> #include <ydb/library/yql/utils/yql_panic.h> @@ -592,7 +590,6 @@ public: NKqpProto::TKqpStatsQuery QueryStats; std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery; std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery; - std::optional<NKikimr::NKqp::TQueryTraits> QueryTraits; TString QueryAst; TString QueryPlan; std::shared_ptr<google::protobuf::Arena> ProtobufArenaPtr; diff --git a/ydb/core/kqp/provider/yql_kikimr_kql.cpp b/ydb/core/kqp/provider/yql_kikimr_kql.cpp deleted file mode 100644 index 95f5915a57d..00000000000 --- a/ydb/core/kqp/provider/yql_kikimr_kql.cpp +++ /dev/null @@ -1,1248 +0,0 @@ -#include "yql_kikimr_provider_impl.h" -#include "kqp_opt_helpers.h" - -#include <ydb/library/yql/core/yql_opt_utils.h> - -namespace NYql { -namespace { - -using namespace NNodes; -using namespace NKikimr::NKqp; - -TCoNameValueTupleList CreateSecondaryIndexKeyTuples(TCoArgument itemArg, const TVector<TString>& keyColumnNames, - const THashSet<TStringBuf>& inputColumns, const TKikimrTableDescription& table, - const TExprBase& fetch, bool nullExtension, TExprContext& ctx) -{ - TVector<TExprBase> keyTuples; - THashSet<TString> uniqColumns; - for (const TString& name : keyColumnNames) { - // skip already added columns - if (!uniqColumns.insert(name).second) { - continue; - } - if (inputColumns.contains(name)) { - const auto& tuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(name) - .Build() - .Done(); - - keyTuples.push_back(tuple); - } else { - if (nullExtension) { - const auto& type = table.GetColumnType(name); - const auto& member = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .template Value<TCoNothing>() - .OptionalType(NCommon::BuildTypeExpr(itemArg.Pos(), *type, ctx)) - .Build() - .Done(); - keyTuples.emplace_back(TExprBase(member)); - } else { - const auto& member = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct(fetch) - .Name().Build(name) - .Build() - .Done(); - keyTuples.emplace_back(TExprBase(member)); - } - } - } - return Build<TCoNameValueTupleList>(ctx, itemArg.Pos()) - .Add(keyTuples) - .Done(); -} - -TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> BuildSecondaryIndexVector( - const TKikimrTableDescription& table, - TPositionHandle pos, - TExprContext& ctx, - const THashSet<TStringBuf>* filter = nullptr) -{ - return NKikimr::NKqp::BuildSecondaryIndexVector(table, pos, ctx, filter, &BuildVersionedTable); -} - -TExprBase BuildConditionalErase( - const TExprBase& condition, - const TCoArgument& itemArg, - const TVector<TExprBase>& keyToErase, - TExprNode::TPtr table, - const TKiWriteTable& node, - TExprContext& ctx) -{ - const auto& erase = Build<TKiEraseRow>(ctx, node.Pos()) - .Cluster(node.DataSink().Cluster()) - .Table(table) - .Key<TCoNameValueTupleList>() - .Add(keyToErase) - .Build() - .Done(); - - return Build<TCoIfPresent>(ctx, node.Pos()) - .Optional(condition) - .PresentHandler<TCoLambda>() - .Args(itemArg) - .Body(erase) - .Build() - .MissingValue<TCoVoid>().Build() - .Done(); -} - -TExprBase CreateUpdateRowWithSecondaryIndex( - const TKiUpdateRow& updateTable, - const TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> indexes, - const TKikimrTableDescription& table, - const TCoNameValueTupleList& tablePk, - const THashSet<TStringBuf>& inputColumns, - const TCoArgument& itemArg, - const TKiWriteTable& node, TExprContext& ctx, bool replace, bool skipErase) -{ - TVector<TExprBase> updates; - updates.push_back(updateTable); - - const TVector<TString>& pk = table.Metadata->KeyColumnNames; - // in case of replace mode no need to read old data columns - const auto dataColumnSet = replace ? THashSet<TString>() : CreateDataColumnSetToRead(indexes, inputColumns); - const auto columnsToSelect = NKikimr::NKqp::CreateColumnsToSelectToUpdateIndex(indexes, pk, dataColumnSet, node.Pos(), ctx); - - const TExprBase& fetch = Build<TKiSelectRow>(ctx, node.Pos()) - .Cluster(node.DataSink().Cluster()) - .Table(updateTable.Table()) - .Key(tablePk) - .template Select<TCoAtomList>() - .Add(columnsToSelect) - .Build() - .Done(); - - for (const auto& pair : indexes) { - TVector<TString> indexTablePk; - TVector<TCoNameValueTuple> dataColumnUpdates; - - // skip update of index table if indexed column not present in request - // and upserted rows already present - bool mayBeSkipIndexUpdate = !replace; - indexTablePk.reserve(pair.second->KeyColumns.size() + pk.size()); - for (const auto& col : pair.second->KeyColumns) { - indexTablePk.push_back(col); - if (inputColumns.contains(col)) { - mayBeSkipIndexUpdate = false; - } - } - indexTablePk.insert(indexTablePk.end(), pk.begin(), pk.end()); - - // We can`t skip index update if data column present in query - // but in this case we may be need to skip erace - bool mustUpdateDataColumn = false; - - for (const auto& col : pair.second->DataColumns) { - if (inputColumns.contains(col)) { - mustUpdateDataColumn = true; - auto tuple = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(col) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(col) - .Build() - .Done(); - dataColumnUpdates.push_back(tuple); - } else if (!replace) { - // Index table has data column, but data column has not been - // specifyed in request - const auto& tuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(col) - .Value<TCoMember>() - .Struct(fetch) - .Name().Build(col) - .Build() - .Done(); - dataColumnUpdates.push_back(tuple); - } - } - - TExprBase updateRowSecondaryIndex = Build<TKiUpdateRow>(ctx, node.Pos()) - .Cluster(node.DataSink().Cluster()) - .Table(pair.first) - .Key(CreateSecondaryIndexKeyTuples( - itemArg, - indexTablePk, - inputColumns, - table, - fetch, - replace || skipErase, - ctx)) - .Update<TCoNameValueTupleList>() - .Add(dataColumnUpdates) - .Build() - .Done(); - - if (!mustUpdateDataColumn && mayBeSkipIndexUpdate) { - updateRowSecondaryIndex = Build<TCoIf>(ctx, node.Pos()) - .Predicate<TCoHasItems>() - .List<TCoToList>() - .Optional(fetch) - .Build() - .Build() - .ThenValue<TCoVoid>() - .Build() - .ElseValue(updateRowSecondaryIndex) - .Done(); - } - - TVector<TExprBase> keyToErase; - keyToErase.reserve(pair.second->KeyColumns.size() + pk.size()); - if (!skipErase && !mayBeSkipIndexUpdate) { - THashSet<TString> uniqColumns; - for (const auto& col : pair.second->KeyColumns) { - if (uniqColumns.insert(col).second) { - const auto& member = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(col) - .Value<TCoMember>() - .Struct(fetch) - .Name().Build(col) - .Build() - .Done(); - keyToErase.emplace_back(TExprBase(member)); - } - } - - for (const auto& k : pk) { - if (uniqColumns.insert(k).second) { - const auto& member = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(k) - .Value<TCoMember>() - .Struct(fetch) - .Name().Build(k) - .Build() - .Done(); - keyToErase.emplace_back(TExprBase(member)); - } - } - - const auto& conditionalErase = BuildConditionalErase( - fetch, itemArg, keyToErase, pair.first, node, ctx); - updates.push_back(conditionalErase); - } - updates.push_back(updateRowSecondaryIndex); - } - - return Build<TCoAsList>(ctx, node.Pos()) - .Add(updates) - .Done(); -} - -TExprNode::TPtr KiUpsertTableToKql(const TKiWriteTable& node, TExprContext& ctx, const TKikimrTableDescription& table, - bool replace, bool skipErase, TExprNode::TPtr& effect) -{ - auto itemArg = Build<TCoArgument>(ctx, node.Pos()) - .Name("item") - .Done(); - - auto inputColumnsSetting = GetSetting(node.Settings().Ref(), "input_columns"); - YQL_ENSURE(inputColumnsSetting); - - THashSet<TStringBuf> inputColumns; - for (const auto& atom : TCoNameValueTuple(inputColumnsSetting).Value().Cast<TCoAtomList>()) { - inputColumns.insert(atom.Value()); - } - - const auto& secondaryIndexes = BuildSecondaryIndexVector(table, node.Pos(), ctx); - - TVector<TCoNameValueTuple> valueTuples; - - const auto versionedTable = BuildVersionedTable(*table.Metadata, node.Pos(), ctx); - YQL_ENSURE(versionedTable.Path() == node.Table()); - - for (auto& pair : table.Metadata->Columns) { - const TString& name = pair.first; - - if (table.GetKeyColumnIndex(name)) { - continue; - } - - if (inputColumns.contains(name)) { - auto tuple = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(name) - .Build() - .Done(); - valueTuples.push_back(tuple); - } else if (replace) { - auto type = table.GetColumnType(name); - YQL_ENSURE(type, "No such column: " << name); - auto tuple = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(name) - .Value<TCoNothing>() - .OptionalType(NCommon::BuildTypeExpr(node.Pos(), *type, ctx)) - .Build() - .Done(); - valueTuples.push_back(tuple); - } - } - - const auto& tablePk = ExtractNamedKeyTuples(itemArg, table, ctx); - - // Update for main table - const auto& tableUpdate = Build<TKiUpdateRow>(ctx, node.Pos()) - .Cluster(node.DataSink().Cluster()) - .Table(versionedTable) - .Key(tablePk) - .Update<TCoNameValueTupleList>() - .Add(valueTuples) - .Build() - .Done(); - - const auto& input = (skipErase || secondaryIndexes.empty()) ? node.Input() : RemoveDuplicateKeyFromInput(node.Input(), table, node.Pos(), ctx); - - effect = Build<TCoFlatMap>(ctx, node.Pos()) - .Input(input) - .Lambda<TCoLambda>() - .Args({itemArg}) - .Body(CreateUpdateRowWithSecondaryIndex( - tableUpdate, - secondaryIndexes, - table, - tablePk, - inputColumns, - itemArg, - node, - ctx, - replace, - skipErase) - ) - .Build() - .Done() - .Ptr(); - - return Build<TCoWorld>(ctx, node.Pos()) - .Done() - .Ptr(); -} - -TExprNode::TPtr KiInsertTableToKql(const TKiWriteTable& node, TExprContext& ctx, const TKikimrTableDescription& table, - const TYdbOperation& op, TExprNode::TPtr& effect) -{ - auto fetchItemArg = Build<TCoArgument>(ctx, node.Pos()) - .Name("fetchItem") - .Done(); - - const auto versionedTable = BuildVersionedTable(*table.Metadata, node.Pos(), ctx); - YQL_ENSURE(versionedTable.Path() == node.Table()); - - auto fetchLambda = Build<TCoLambda>(ctx, node.Pos()) - .Args(fetchItemArg) - .Body<TKiSelectRow>() - .Cluster(node.DataSink().Cluster()) - .Table(versionedTable) - .Key(ExtractNamedKeyTuples(fetchItemArg, table, ctx)) - .Select() - .Build() - .Build() - .Done(); - - auto getKeyListItemArg = Build<TCoArgument>(ctx, node.Pos()) - .Name("getKeyListItem") - .Done(); - - auto keyList = Build<TCoMap>(ctx, node.Pos()) - .Input(node.Input()) - .Lambda() - .Args(getKeyListItemArg) - .Body(ExtractKeys(getKeyListItemArg, table, ctx)) - .Build() - .Done(); - - auto duplicatesPredicate = Build<TCoCmpNotEqual>(ctx, node.Pos()) - .Left<TCoLength>() - .List(keyList) - .Build() - .Right<TCoLength>() - .List<TCoToDict>() - .List(keyList) - .KeySelector() - .Args({"item"}) - .Body("item") - .Build() - .PayloadSelector() - .Args({"item"}) - .Body<TCoVoid>().Build() - .Build() - .Settings() - .Add().Build("One") - .Add().Build("Hashed") - .Build() - .Build() - .Build() - .Done(); - - auto fetchPredicate = Build<TCoHasItems>(ctx, node.Pos()) - .List<TCoFlatMap>() - .Input(node.Input()) - .Lambda(fetchLambda) - .Build() - .Done(); - - auto predicate = Build<TCoOr>(ctx, node.Pos()) - .Add({duplicatesPredicate, fetchPredicate}) - .Done(); - - TExprNode::TPtr insertEffect; - auto insertKql = KiUpsertTableToKql(node, ctx, table, false, true, insertEffect); - - if (op == TYdbOperation::InsertAbort) { - effect = Build<TKiAbortIf>(ctx, node.Pos()) - .Predicate(predicate) - .Effect(insertEffect) - .Constraint().Build("insert_pk") - .Done() - .Ptr(); - } else if (op == TYdbOperation::InsertRevert) { - effect = Build<TKiRevertIf>(ctx, node.Pos()) - .Predicate(predicate) - .Effect(insertEffect) - .Constraint().Build("insert_pk") - .Done() - .Ptr(); - } else { - YQL_ENSURE(false, "Unexpected table operation"); - } - - return insertKql; -} - -TExprNode::TPtr KiDeleteOnTableToKql(const TKiWriteTable& node, TExprContext& ctx, - const TKikimrTableDescription& table, TExprNode::TPtr& effect) -{ - auto itemArg = Build<TCoArgument>(ctx, node.Pos()) - .Name("item") - .Done(); - - TVector<TExprBase> updates; - - const auto& tablePk = ExtractNamedKeyTuples(itemArg, table, ctx); - const TVector<TString>& pk = table.Metadata->KeyColumnNames; - - const auto versionedTable = BuildVersionedTable(*table.Metadata, node.Pos(), ctx); - YQL_ENSURE(versionedTable.Path() == node.Table()); - - updates.emplace_back(Build<TKiEraseRow>(ctx, node.Pos()) - .Cluster(node.DataSink().Cluster()) - .Table(versionedTable) - .Key(tablePk) - .Done() - ); - - const auto& indexes = BuildSecondaryIndexVector(table, node.Pos(), ctx); - - if (indexes) { - // No need to read dataColumn - we just going to remove row - const THashSet<TString> dummyDataColumns; - const auto& columnsToSelect = NKikimr::NKqp::CreateColumnsToSelectToUpdateIndex(indexes, pk, dummyDataColumns, node.Pos(), ctx); - const TExprBase& fetch = Build<TKiSelectRow>(ctx, node.Pos()) - .Cluster(node.DataSink().Cluster()) - .Table(versionedTable) - .Key(tablePk) - .template Select<TCoAtomList>() - .Add(columnsToSelect) - .Build() - .Done(); - - for (const auto& pair : indexes) { - - TVector<TExprBase> keyToErase; - keyToErase.reserve(pair.second->KeyColumns.size() + pk.size()); - - THashSet<TString> uniqColumns; - for (const auto& col : pair.second->KeyColumns) { - if (!uniqColumns.insert(col).second) { - continue; - } - const auto& member = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(col) - .Value<TCoMember>() - .Struct(fetch) - .Name().Build(col) - .Build() - .Done(); - keyToErase.emplace_back(TExprBase(member)); - } - - for (const auto& k : pk) { - if (!uniqColumns.insert(k).second) { - continue; - } - const auto& member = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(k) - .Value<TCoMember>() - .Struct(fetch) - .Name().Build(k) - .Build() - .Done(); - keyToErase.emplace_back(TExprBase(member)); - } - - const auto& erase = Build<TKiEraseRow>(ctx, node.Pos()) - .Cluster(node.DataSink().Cluster()) - .Table(pair.first) - .Key<TCoNameValueTupleList>() - .Add(keyToErase) - .Build() - .Done(); - - updates.push_back(erase); - } - - } - - effect = Build<TCoFlatMap>(ctx, node.Pos()) - .Input(node.Input()) - .Lambda<TCoLambda>() - .Args({itemArg}) - .Body<TCoAsList>() - .Add(updates) - .Build() - .Build() - .Done() - .Ptr(); - - return Build<TCoWorld>(ctx, node.Pos()) - .Done() - .Ptr(); -} - -TExprNode::TPtr KiReadTableToKql(TCoRight right, TExprContext& ctx, const TKikimrTablesData& tablesData, bool withSystemColumns) { - const auto& read = right.Input().Cast<TKiReadTable>(); - bool unwrapValues = HasSetting(read.Settings().Ref(), "unwrap_values"); - - TKikimrKey key(ctx); - YQL_ENSURE(key.Extract(read.TableKey().Ref())); - YQL_ENSURE(key.GetKeyType() == TKikimrKey::Type::Table); - const auto& cluster = read.DataSource().Cluster(); - const auto& table = key.GetTablePath(); - - const auto& tableDesc = tablesData.ExistingTable(TString(cluster), TString(table)); - - const auto versionedTable = BuildVersionedTable(*tableDesc.Metadata, read.Pos(), ctx); - YQL_ENSURE(versionedTable.Path().Value() == table); - - TMaybe<TString> secondaryIndex; - - if (const auto& view = key.GetView()) { - YQL_ENSURE(tableDesc.Metadata); - if (!ValidateTableHasIndex(tableDesc.Metadata, ctx, read.Pos())) { - return nullptr; - } - auto [metadata, state] = tableDesc.Metadata->GetIndexMetadata(view.GetRef()); - YQL_ENSURE(metadata, "unable to find metadta for index: " << view.GetRef()); - YQL_ENSURE(state == TIndexDescription::EIndexState::Ready - || state == TIndexDescription::EIndexState::WriteOnly); - if (state != TIndexDescription::EIndexState::Ready) { - auto err = TStringBuilder() - << "Requested index: " << view.GetRef() - << " is not ready to use"; - ctx.AddError(YqlIssue(ctx.GetPosition(read.Pos()), TIssuesIds::KIKIMR_INDEX_IS_NOT_READY, err)); - return nullptr; - } - secondaryIndex = metadata->Name; - } - - if (secondaryIndex) { - const auto& indexTableName = secondaryIndex.GetRef(); - const auto& keyTableDesc = tablesData.ExistingTable(TString(cluster), TString(indexTableName)); - TKikimrKeyRange range(ctx, keyTableDesc); - - const auto& selectRange = Build<TKiSelectIndexRange>(ctx, read.Pos()) - .Cluster(cluster) - .Table(versionedTable) - .Range(range.ToRangeExpr(read, ctx)) - .Select(read.GetSelectColumns(ctx, tablesData, withSystemColumns)) - .Settings() - .Build() - .IndexName() - .Value(secondaryIndex.GetRef()) - .Build() - .Done(); - - if (unwrapValues) { - return UnwrapKiReadTableValues(selectRange, tableDesc, selectRange.Select(), ctx).Ptr(); - } else { - return TExprBase(selectRange).Ptr(); - } - } else { - TKikimrKeyRange range(ctx, tableDesc); - - const auto& selectRange = Build<TKiSelectRange>(ctx, read.Pos()) - .Cluster(cluster) - .Table(versionedTable) - .Range(range.ToRangeExpr(read, ctx)) - .Select(read.GetSelectColumns(ctx, tablesData, withSystemColumns)) - .Settings() - .Build() - .Done(); - - if (unwrapValues) { - return UnwrapKiReadTableValues(selectRange, tableDesc, selectRange.Select(), ctx).Ptr(); - } else { - return TExprBase(selectRange).Ptr(); - } - } -} - -TExprNode::TPtr KiUpdateOnTableToKql(const TKiWriteTable& node, TExprContext& ctx, - const TKikimrTableDescription& tableDesc, TExprNode::TPtr& effect) -{ - // TODO: KIKIMR-3206 - // This function should be rewriten - - const auto& cluster = node.DataSink().Cluster(); - - const auto& itemArg = Build<TCoArgument>(ctx, node.Pos()) - .Name("item") - .Done(); - - const auto& inputColumnsSetting = GetSetting(node.Settings().Ref(), "input_columns"); - YQL_ENSURE(inputColumnsSetting); - - TVector<TCoNameValueTuple> valueTuples; - THashSet<TStringBuf> updatedColumns; - for (const auto& atom : TCoNameValueTuple(inputColumnsSetting).Value().Cast<TCoAtomList>()) { - if (tableDesc.GetKeyColumnIndex(TString(atom.Value()))) { - continue; - } - - auto tuple = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name(atom) - .Value<TCoMember>() - .Struct(itemArg) - .Name(atom) - .Build() - .Done(); - valueTuples.push_back(tuple); - updatedColumns.insert(atom.Value()); - } - - // Returns index only if at least one of indexed columns for coresponding index has been updated. - const auto& indexes = BuildSecondaryIndexVector(tableDesc, node.Pos(), ctx, &updatedColumns); - const TVector<TString>& pk = tableDesc.Metadata->KeyColumnNames; - - const auto versionedTable = BuildVersionedTable(*tableDesc.Metadata, node.Pos(), ctx); - YQL_ENSURE(versionedTable.Path() == node.Table()); - - const auto dataColumnSet = CreateDataColumnSetToRead(indexes, updatedColumns); - const TVector<TExprBase> columnsToSelect = NKikimr::NKqp::CreateColumnsToSelectToUpdateIndex(indexes, pk, dataColumnSet, node.Pos(), ctx); - const auto& fetch = Build<TKiSelectRow>(ctx, node.Pos()) - .Cluster(cluster) - .Table(versionedTable) - .Key(ExtractNamedKeyTuples(itemArg, tableDesc, ctx)) - .Select<TCoAtomList>() - .Add(columnsToSelect) - .Build() - .Done(); - - TVector<TExprBase> updates; - - updates.emplace_back(Build<TKiUpdateRow>(ctx, node.Pos()) - .Cluster(cluster) - .Table(versionedTable) - .Key(ExtractNamedKeyTuples(itemArg, tableDesc, ctx)) - .Update<TCoNameValueTupleList>() - .Add(valueTuples) - .Build() - .Done() - ); - - if (indexes) { - for (const auto& pair : indexes) { - TVector<TString> indexTablePk; - indexTablePk.reserve(pair.second->KeyColumns.size() + pk.size()); - for (const auto& col : pair.second->KeyColumns) { - indexTablePk.push_back(col); - } - indexTablePk.insert(indexTablePk.end(), pk.begin(), pk.end()); - - TVector<TExprBase> keyToAdd; - TVector<TExprBase> keyToErase; - TVector<TCoNameValueTuple> dataColumnUpdates; - - keyToAdd.reserve(indexTablePk.size()); - keyToErase.reserve(indexTablePk.size()); - dataColumnUpdates.reserve(pair.second->DataColumns.size()); - - for (const TString& name : pair.second->DataColumns) { - // Data column was specified in request - update it - if (updatedColumns.contains(name)) { - const auto& tuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(name) - .Build() - .Done(); - dataColumnUpdates.push_back(tuple); - } else { - const auto& tuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct(fetch) - .Name().Build(name) - .Build() - .Done(); - dataColumnUpdates.push_back(tuple); - } - } - - THashSet<TString> uniqColumns; - for (const TString& name : indexTablePk) { - if (!uniqColumns.insert(name).second) { - continue; - } - const auto& oldTuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct(fetch) - .Name().Build(name) - .Build() - .Done(); - keyToErase.push_back(oldTuple); - if (updatedColumns.contains(name)) { - const auto& tuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(name) - .Build() - .Done(); - keyToAdd.push_back(tuple); - } else { - keyToAdd.push_back(oldTuple); - } - } - - const auto& erase = Build<TKiEraseRow>(ctx, node.Pos()) - .Cluster(cluster) - .Table(pair.first) - .Key<TCoNameValueTupleList>() - .Add(keyToErase) - .Build() - .Done(); - - updates.push_back(erase); - - const auto& updateRowSecondaryIndex = Build<TKiUpdateRow>(ctx, node.Pos()) - .Cluster(cluster) - .Table(pair.first) - .Key<TCoNameValueTupleList>() - .Add(keyToAdd) - .Build() - .Update<TCoNameValueTupleList>() - .Add(dataColumnUpdates) - .Build() - .Done(); - - updates.push_back(updateRowSecondaryIndex); - - } - } - - const auto& updateLambda = Build<TCoLambda>(ctx, node.Pos()) - .Args(itemArg) - .Body<TCoIf>() - .Predicate<TCoHasItems>() - .List<TCoToList>() - .Optional(fetch) - .Build() - .Build() - .ThenValue<TCoAsList>() - .Add(updates) - .Build() - .ElseValue<TCoList>() - .ListType<TCoListType>() - .ItemType<TCoVoidType>() - .Build() - .Build() - .Build() - .Build() - .Done(); - - effect = Build<TCoFlatMap>(ctx, node.Pos()) - .Input(node.Input()) - .Lambda(updateLambda) - .Done() - .Ptr(); - - return Build<TCoWorld>(ctx, node.Pos()) - .Done() - .Ptr(); -} - -TExprNode::TPtr KiWriteTableToKql(TKiWriteTable write, TExprContext& ctx, - const TKikimrTablesData& tablesData, TExprNode::TPtr& effect) -{ - auto op = GetTableOp(write); - - auto cluster = write.DataSink().Cluster().Value(); - auto table = write.Table().Value(); - auto& tableDesc = tablesData.ExistingTable(TString(cluster), TString(table)); - - switch (op) { - case TYdbOperation::Upsert: - case TYdbOperation::Replace: - return KiUpsertTableToKql(write, ctx, tableDesc, op == TYdbOperation::Replace, false, effect); - case TYdbOperation::InsertRevert: - case TYdbOperation::InsertAbort: - return KiInsertTableToKql(write, ctx, tableDesc, op, effect); - case TYdbOperation::DeleteOn: - return KiDeleteOnTableToKql(write, ctx, tableDesc, effect); - case TYdbOperation::UpdateOn: - return KiUpdateOnTableToKql(write, ctx, tableDesc, effect); - default: - return nullptr; - } -} - -TExprNode::TPtr KiUpdateTableToKql(TKiUpdateTable update, TExprContext& ctx, - const TKikimrTablesData& tablesData, TExprNode::TPtr& effect, bool withSystemColumns) -{ - YQL_ENSURE(update.Update().Ref().GetTypeAnn()); - - const auto& cluster = update.DataSink().Cluster(); - const auto& table = update.Table(); - const auto& tableDesc = tablesData.ExistingTable(TString(cluster.Value()), TString(table.Value())); - const TVector<TString>& pk = tableDesc.Metadata->KeyColumnNames; - - const auto versionedTable = BuildVersionedTable(*tableDesc.Metadata, update.Pos(), ctx); - YQL_ENSURE(versionedTable.Path() == update.Table()); - - auto schemaVersion = TStringBuilder() << tableDesc.Metadata->SchemaVersion; - - TKikimrKeyRange range(ctx, tableDesc); - const auto& selectRange = Build<TKiSelectRange>(ctx, update.Pos()) - .Cluster(cluster) - .Table(versionedTable) - .Range(range.ToRangeExpr(update, ctx)) - .Select(BuildColumnsList(tableDesc, update.Pos(), ctx, withSystemColumns)) - .Settings().Build() - .Done(); - - const auto& filter = Build<TCoFilter>(ctx, update.Pos()) - .Input(selectRange) - .Lambda(update.Filter()) - .Done(); - - const auto& itemArg = Build<TCoArgument>(ctx, update.Pos()) - .Name("item") - .Done(); - - TVector<TCoNameValueTuple> valueTuples; - const auto& updateResultType = update.Update().Ref().GetTypeAnn()->Cast<TStructExprType>(); - THashSet<TStringBuf> updatedColumns; - - for (const auto& item : updateResultType->GetItems()) { - const auto& name = item->GetName(); - updatedColumns.insert(name); - - const auto& tuple = Build<TCoNameValueTuple>(ctx, update.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct<TExprApplier>() - .Apply(update.Update()) - .With(0, itemArg) - .Build() - .Name().Build(name) - .Build() - .Done(); - valueTuples.push_back(tuple); - } - - const auto& indexes = BuildSecondaryIndexVector(tableDesc, update.Pos(), ctx, &updatedColumns); - - TVector<TExprBase> updates; - updates.emplace_back(Build<TKiUpdateRow>(ctx, update.Pos()) - .Cluster(cluster) - .Table(versionedTable) - .Key(ExtractNamedKeyTuples(itemArg, tableDesc, ctx)) - .Update<TCoNameValueTupleList>() - .Add(valueTuples) - .Build() - .Done() - ); - - if (indexes) { - for (const auto& pair : indexes) { - TVector<TString> indexTablePk; - indexTablePk.reserve(pair.second->KeyColumns.size() + pk.size()); - for (const auto& col : pair.second->KeyColumns) { - indexTablePk.push_back(col); - } - indexTablePk.insert(indexTablePk.end(), pk.begin(), pk.end()); - - TVector<TExprBase> keyToAdd; - TVector<TExprBase> keyToErase; - TVector<TCoNameValueTuple> dataColumnsUpdates; - - keyToAdd.reserve(indexTablePk.size()); - keyToErase.reserve(indexTablePk.size()); - dataColumnsUpdates.reserve(pair.second->DataColumns.size()); - - for (const TString& name : pair.second->DataColumns) { - if (updatedColumns.contains(name)) { - const auto& tuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct<TExprApplier>() - .Apply(update.Update()) - .With(0, itemArg) - .Build() - .Name().Build(name) - .Build() - .Done(); - dataColumnsUpdates.push_back(tuple); - } else { - const auto& oldTuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(name) - .Build() - .Done(); - dataColumnsUpdates.push_back(oldTuple); - } - } - - THashSet<TString> uniqColumns; - for (const TString& name : indexTablePk) { - if (!uniqColumns.insert(name).second) { - continue; - } - const auto& oldTuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(name) - .Build() - .Done(); - - keyToErase.push_back(oldTuple); - - if (updatedColumns.contains(name)) { - const auto& tuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(name) - .Value<TCoMember>() - .Struct<TExprApplier>() - .Apply(update.Update()) - .With(0, itemArg) - .Build() - .Name().Build(name) - .Build() - .Done(); - keyToAdd.push_back(tuple); - } else { - keyToAdd.push_back(oldTuple); - } - } - - const auto& erase = Build<TKiEraseRow>(ctx, update.Pos()) - .Cluster(cluster) - .Table(pair.first) - .Key<TCoNameValueTupleList>() - .Add(keyToErase) - .Build() - .Done(); - updates.push_back(erase); - - const auto& updateRowSecondaryIndex = Build<TKiUpdateRow>(ctx, update.Pos()) - .Cluster(cluster) - .Table(pair.first) - .Key<TCoNameValueTupleList>() - .Add(keyToAdd) - .Build() - .Update<TCoNameValueTupleList>() - .Add(dataColumnsUpdates) - .Build() - .Done(); - - updates.push_back(updateRowSecondaryIndex); - } - } - - effect = Build<TCoFlatMap>(ctx, update.Pos()) - .Input(filter) - .Lambda<TCoLambda>() - .Args({itemArg}) - .Body<TCoAsList>() - .Add(updates) - .Build() - .Build() - .Done() - .Ptr(); - - return Build<TCoWorld>(ctx, update.Pos()) - .Done() - .Ptr(); -} - -TExprNode::TPtr KiDeleteTableToKql(TKiDeleteTable del, TExprContext& ctx, - const TKikimrTablesData& tablesData, TExprNode::TPtr& effect, bool withSystemColumns) -{ - const auto& cluster = del.DataSink().Cluster(); - const auto& table = del.Table(); - const auto& tableDesc = tablesData.ExistingTable(TString(cluster.Value()), TString(table.Value())); - - const auto versionedTable = BuildVersionedTable(*tableDesc.Metadata, del.Pos(), ctx); - YQL_ENSURE(versionedTable.Path() == table); - - TKikimrKeyRange range(ctx, tableDesc); - const auto& selectRange = Build<TKiSelectRange>(ctx, del.Pos()) - .Cluster(cluster) - .Table(versionedTable) - .Range(range.ToRangeExpr(del, ctx)) - .Select(BuildColumnsList(tableDesc, del.Pos(), ctx, withSystemColumns)) - .Settings().Build() - .Done(); - - const auto& filter = Build<TCoFilter>(ctx, del.Pos()) - .Input(selectRange) - .Lambda(del.Filter()) - .Done(); - - const auto& itemArg = Build<TCoArgument>(ctx, del.Pos()) - .Name("item") - .Done(); - - const auto& indexes = BuildSecondaryIndexVector(tableDesc, del.Pos(), ctx); - - TVector<TExprBase> updates; - updates.reserve(indexes.size() + 1); - - const auto& tablePk = ExtractNamedKeyTuples(itemArg, tableDesc, ctx); - updates.emplace_back(Build<TKiEraseRow>(ctx, del.Pos()) - .Cluster(cluster) - .Table(versionedTable) - .Key(tablePk) - .Done() - ); - - const TVector<TString>& pk = tableDesc.Metadata->KeyColumnNames; - if (indexes) { - TVector<TExprBase> keyToErase; - for (const auto& pair : indexes) { - keyToErase.reserve(pair.second->KeyColumns.size() + pk.size()); - - THashSet<TString> uniqColumns; - for (const auto& col : pair.second->KeyColumns) { - if (!uniqColumns.insert(col).second) { - continue; - } - const auto& member = Build<TCoNameValueTuple>(ctx, del.Pos()) - .Name().Build(col) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(col) - .Build() - .Done(); - keyToErase.emplace_back(TExprBase(member)); - } - - for (const auto& k : pk) { - if (!uniqColumns.insert(k).second) { - continue; - } - const auto& member = Build<TCoNameValueTuple>(ctx, del.Pos()) - .Name().Build(k) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(k) - .Build() - .Done(); - keyToErase.emplace_back(TExprBase(member)); - } - - const auto& erase = Build<TKiEraseRow>(ctx, del.Pos()) - .Cluster(cluster) - .Table(pair.first) - .Key<TCoNameValueTupleList>() - .Add(keyToErase) - .Build() - .Done(); - - keyToErase.clear(); - updates.push_back(erase); - } - } - - effect = Build<TCoFlatMap>(ctx, del.Pos()) - .Input(filter) - .template Lambda<TCoLambda>() - .Args({itemArg}) - .template Body<TCoAsList>() - .Add(updates) - .Build() - .Build() - .Done() - .Ptr(); - - return Build<TCoWorld>(ctx, del.Pos()) - .Done() - .Ptr(); -} - -} // namespace - -TKiProgram BuildKiProgram(TKiDataQuery query, const TKikimrTablesData& tablesData, - TExprContext& ctx, bool withSystemColumns) -{ - TExprNode::TPtr optResult; - TOptimizeExprSettings optSettings(nullptr); - optSettings.VisitChanges = true; - IGraphTransformer::TStatus status(IGraphTransformer::TStatus::Ok); - status = OptimizeExpr(query.Effects().Ptr(), optResult, - [&tablesData, withSystemColumns](const TExprNode::TPtr& input, TExprContext& ctx) { - auto node = TExprBase(input); - auto ret = input; - - if (auto maybeWrite = node.Maybe<TKiWriteTable>()) { - KiWriteTableToKql(maybeWrite.Cast(), ctx, tablesData, ret); - } else if (auto maybeUpdate = node.Maybe<TKiUpdateTable>()) { - KiUpdateTableToKql(maybeUpdate.Cast(), ctx, tablesData, ret, withSystemColumns); - } else if (auto maybeDelete = node.Maybe<TKiDeleteTable>()) { - KiDeleteTableToKql(maybeDelete.Cast(), ctx, tablesData, ret, withSystemColumns); - } - - return ret; - }, ctx, optSettings); - - YQL_ENSURE(status == IGraphTransformer::TStatus::Ok); - - YQL_ENSURE(TMaybeNode<TKiEffects>(optResult)); - TVector<TExprBase> effectsList(TKiEffects(optResult).begin(), TKiEffects(optResult).end()); - - TMaybeNode<TExprBase> effects; - if (effectsList.empty()) { - effects = Build<TCoList>(ctx, query.Pos()) - .ListType<TCoListType>() - .ItemType<TCoVoidType>() - .Build() - .Build() - .Done(); - } else { - effects = Build<TCoExtend>(ctx, query.Pos()) - .Add(effectsList) - .Done(); - } - - TVector<TExprBase> results; - for (const auto& kiResult : query.Results()) { - results.push_back(kiResult.Value()); - } - - auto program = Build<TKiProgram>(ctx, query.Pos()) - .Results() - .Add(results) - .Build() - .Effects(effects.Cast()) - .Done(); - - TExprNode::TPtr newProgram; - status = OptimizeExpr(program.Ptr(), newProgram, - [&tablesData, withSystemColumns](const TExprNode::TPtr& input, TExprContext& ctx) { - auto node = TExprBase(input); - - if (node.Maybe<TCoRight>().Input().Maybe<TKiReadTable>()) { - return KiReadTableToKql(node.Cast<TCoRight>(), ctx, tablesData, withSystemColumns); - } - - return input; - }, ctx, optSettings); - - YQL_ENSURE(status == IGraphTransformer::TStatus::Ok); - YQL_ENSURE(TMaybeNode<TKiProgram>(newProgram)); - - return TKiProgram(newProgram); -} - -TExprBase UnwrapKiReadTableValues(TExprBase input, const TKikimrTableDescription& tableDesc, - const TCoAtomList columns, TExprContext& ctx) -{ - TCoArgument itemArg = Build<TCoArgument>(ctx, input.Pos()) - .Name("item") - .Done(); - - TVector<TExprBase> structItems; - for (auto atom : columns) { - auto columnType = tableDesc.GetColumnType(TString(atom.Value())); - YQL_ENSURE(columnType); - - auto item = Build<TCoNameValueTuple>(ctx, input.Pos()) - .Name(atom) - .Value<TCoCoalesce>() - .Predicate<TCoMember>() - .Struct(itemArg) - .Name(atom) - .Build() - .Value<TCoDefault>() - .Type(ExpandType(atom.Pos(), *columnType->Cast<TOptionalExprType>()->GetItemType(), ctx)) - .Build() - .Build() - .Done(); - - structItems.push_back(item); - } - - return Build<TCoMap>(ctx, input.Pos()) - .Input(input) - .Lambda() - .Args({itemArg}) - .Body<TCoAsStruct>() - .Add(structItems) - .Build() - .Build() - .Done(); -} - -bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) { - auto checkKey = [keySelector, &tableDesc, columns] (const TExprBase& key, ui32 index) { - if (!key.Maybe<TCoMember>()) { - return false; - } - - auto member = key.Cast<TCoMember>(); - if (member.Struct().Raw() != keySelector.Args().Arg(0).Raw()) { - return false; - } - - auto column = member.Name().StringValue(); - auto columnIndex = tableDesc.GetKeyColumnIndex(column); - if (!columnIndex || *columnIndex != index) { - return false; - } - - if (columns) { - columns->emplace_back(std::move(column)); - } - - return true; - }; - - auto lambdaBody = keySelector.Body(); - if (auto maybeTuple = lambdaBody.Maybe<TExprList>()) { - auto tuple = maybeTuple.Cast(); - for (size_t i = 0; i < tuple.Size(); ++i) { - if (!checkKey(tuple.Item(i), i)) { - return false; - } - } - } else { - if (!checkKey(lambdaBody, 0)) { - return false; - } - } - - return true; -} - -} // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_mkql.cpp b/ydb/core/kqp/provider/yql_kikimr_mkql.cpp deleted file mode 100644 index 4cd98892e44..00000000000 --- a/ydb/core/kqp/provider/yql_kikimr_mkql.cpp +++ /dev/null @@ -1,206 +0,0 @@ -#include "yql_kikimr_provider_impl.h" -#include <ydb/core/kqp/provider/mkql/yql_kikimr_mkql_expr_nodes.h> - -#include <ydb/library/yql/core/yql_expr_optimize.h> -#include <ydb/library/yql/core/yql_expr_type_annotation.h> -#include <ydb/library/yql/core/yql_join.h> - - -namespace NYql { -namespace { - -using namespace NNodes; - -TExprNode::TPtr BuildMkqlVersionedTable(const TCoAtom& tableName, const TCoAtom& schemaVersion, - const TCoAtom& pathId, TExprContext& ctx, TPositionHandle pos) -{ - return Build<TMkqlVersionedTable>(ctx, pos) - .Table(tableName) - .SchemaVersion(schemaVersion) - .PathId(pathId) - .Done() - .Ptr(); -} - -TExprNode::TPtr MkqlRewriteCallables(TCallable callable, TExprContext& ctx, const TMaybe<TString>& rtParamName) { - TMaybeNode<TCoParameter> readTarget; - - if (rtParamName) { - readTarget = Build<TCoParameter>(ctx, callable.Pos()) - .Name().Build(*rtParamName) - .Type<TCoDataType>() - .Type().Build("Uint32") - .Build() - .Done(); - } - - if (auto maybeSelectRow = callable.Maybe<TKiSelectRow>()) { - auto selectRow = maybeSelectRow.Cast(); - auto vt = selectRow.Table(); - TExprNode::TListType children = { - BuildMkqlVersionedTable(vt.Path(), vt.SchemaVersion(), vt.PathId(), ctx, selectRow.Pos()), - selectRow.Key().Ptr(), - selectRow.Select().Ptr() - }; - - if (readTarget) { - children.push_back(readTarget.Cast().Ptr()); - } - - return ctx.NewCallable(selectRow.Pos(), "SelectRow", std::move(children)); - } - - if (auto maybeSelectRange = callable.Maybe<TKiSelectRangeBase>()) { - auto selectRange = maybeSelectRange.Cast(); - auto vt = selectRange.Table(); - - TExprNode::TListType children = { - BuildMkqlVersionedTable(vt.Path(), vt.SchemaVersion(), vt.PathId(), ctx, selectRange.Pos()), - selectRange.Range().Ptr(), - selectRange.Select().Ptr(), - selectRange.Settings().Ptr() - }; - - if (readTarget) { - children.push_back(readTarget.Cast().Ptr()); - } - - auto selectRangeMkql = ctx.NewCallable(selectRange.Pos(), "SelectRange", std::move(children)); - - if (selectRange.Maybe<TKiSelectRange>()) { - return ctx.Builder(selectRange.Pos()) - .Callable("Member") - .Add(0, selectRangeMkql) - .Atom(1, "List") - .Seal() - .Build(); - } else { - ctx.AddError(TIssue(ctx.GetPosition(selectRange.Pos()), TStringBuilder() << "Got unsupported callable: " - << selectRange.CallableName())); - return nullptr; - } - } - - if (auto maybeUpdateRow = callable.Maybe<TKiUpdateRow>()) { - auto updateRow = maybeUpdateRow.Cast(); - auto vt = updateRow.Table(); - return Build<TMkqlUpdateRow>(ctx, updateRow.Pos()) - .Table(BuildMkqlVersionedTable(vt.Path(), vt.SchemaVersion(), vt.PathId(), ctx, updateRow.Pos())) - .Key(updateRow.Key()) - .Update(updateRow.Update()) - .Done() - .Ptr(); - } - - if (auto maybeEraseRow = callable.Maybe<TKiEraseRow>()) { - auto eraseRow = maybeEraseRow.Cast(); - auto vt = eraseRow.Table(); - return Build<TMkqlEraseRow>(ctx, eraseRow.Pos()) - .Table(BuildMkqlVersionedTable(vt.Path(), vt.SchemaVersion(), vt.PathId(), ctx, eraseRow.Pos())) - .Key(eraseRow.Key()) - .Done() - .Ptr(); - } - - if (auto setResult = callable.Maybe<TKiSetResult>()) { - return ctx.Builder(setResult.Cast().Pos()) - .Callable("SetResult") - .Add(0, setResult.Cast().Name().Ptr()) - .Add(1, setResult.Cast().Data().Ptr()) - .Seal() - .Build(); - } - - if (auto acquireLocks = callable.Maybe<TKiAcquireLocks>()) { - return ctx.Builder(acquireLocks.Cast().Pos()) - .Callable("AcquireLocks") - .Add(0, acquireLocks.Cast().LockTxId().Ptr()) - .Seal() - .Build(); - } - - if (auto map = callable.Maybe<TKiMapParameter>()) { - return Build<TMkqlMapParameter>(ctx, map.Cast().Pos()) - .Input(map.Cast().Input()) - .Lambda(map.Cast().Lambda()) - .Done() - .Ptr(); - } - - if (auto map = callable.Maybe<TKiFlatMapParameter>()) { - return ctx.Builder(map.Cast().Pos()) - .Callable("FlatMapParameter") - .Add(0, map.Cast().Input().Ptr()) - .Add(1, map.Cast().Lambda().Ptr()) - .Seal() - .Build(); - } - if (auto maybePartialSort = callable.Maybe<TKiPartialSort>()) { - return ctx.Builder(maybePartialSort.Cast().Pos()) - .Callable("PartialSort") - .Add(0, maybePartialSort.Cast().Input().Ptr()) - .Add(1, maybePartialSort.Cast().SortDirections().Ptr()) - .Add(2, maybePartialSort.Cast().KeySelectorLambda().Ptr()) - .Seal() - .Build(); - } - if (auto maybePartialTake = callable.Maybe<TKiPartialTake>()) { - return ctx.Builder(maybePartialTake.Cast().Pos()) - .Callable("PartialTake") - .Add(0, maybePartialTake.Cast().Input().Ptr()) - .Add(1, maybePartialTake.Cast().Count().Ptr()) - .Seal() - .Build(); - } - - return callable.Ptr(); -} - -} // namespace - -TMaybeNode<TExprBase> TranslateToMkql(TExprBase node, TExprContext& ctx, const TMaybe<TString>& rtParamName) { - auto maybeProgram = node.Maybe<TKiProgram>(); - YQL_ENSURE(maybeProgram); - - auto program = maybeProgram.Cast(); - bool hasResult = !program.Results().Empty(); - - if (hasResult) { - node = Build<TCoAppend>(ctx, node.Pos()) - .List(program.Effects()) - .Item<TKiSetResult>() - .Name().Build("Result") - .Data(program.Results()) - .Build() - .Done(); - } else { - node = program.Effects(); - } - - auto current = node.Ptr(); - TExprNode::TPtr output; - TOptimizeExprSettings optSettings(nullptr); - optSettings.VisitChanges = true; - auto status = OptimizeExpr(current, output, - [rtParamName](const TExprNode::TPtr& input, TExprContext& ctx) { - auto ret = input; - auto node = TExprBase(input); - - if (auto call = node.Maybe<TCallable>()) { - ret = MkqlRewriteCallables(call.Cast(), ctx, rtParamName); - if (ret != input) { - return ret; - } - } - - return ret; - }, ctx, optSettings); - - if (status != IGraphTransformer::TStatus::Ok) { - return TExprNode::TPtr(); - } - - return TExprBase(output); -} - -} // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_opt.cpp b/ydb/core/kqp/provider/yql_kikimr_opt.cpp index c90b5dbdf25..f38393e64ae 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt.cpp @@ -11,10 +11,6 @@ namespace { using namespace NNodes; using namespace NCommon; -bool CanPushPartialSort(const TKiPartialSort& node, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) { - return IsKeySelectorPkPrefix(node.KeySelectorLambda(), tableDesc, columns); -} - TExprNode::TPtr KiTrimReadTableWorld(TExprBase node) { if (auto maybeRead = node.Maybe<TCoLeft>().Input().Maybe<TKiReadTable>()) { YQL_CLOG(INFO, ProviderKikimr) << "KiTrimReadTableWorld"; @@ -42,699 +38,22 @@ TExprNode::TPtr KiEmptyCommit(TExprBase node) { return innerCommit.Ptr(); } -TExprNode::TPtr KiEraseOverSelectRow(TExprBase node, TExprContext& ctx) { - if (!node.Maybe<TCoFlatMap>().Input().Maybe<TKiSelectRow>()) { - return node.Ptr(); - } - - auto map = node.Cast<TCoFlatMap>(); - - if (auto maybeErase = map.Lambda().Body().Maybe<TCoJust>().Input().Maybe<TKiEraseRow>()) { - auto selectRow = map.Input().Cast<TKiSelectRow>(); - auto eraseRow = maybeErase.Cast(); - - YQL_ENSURE(selectRow.Cluster().Raw() == eraseRow.Cluster().Raw()); - - if (selectRow.Table().Raw() == eraseRow.Table().Raw()) { - auto ret = Build<TCoJust>(ctx, node.Pos()) - .Input<TKiEraseRow>() - .Cluster(selectRow.Cluster()) - .Table(selectRow.Table()) - .Key(selectRow.Key()) - .Build() - .Done(); - - YQL_CLOG(INFO, ProviderKikimr) << "KiEraseOverSelectRow"; - return ret.Ptr(); - } - } - - if (auto maybeAsList = map.Lambda().Body().Maybe<TCoAsList>()) { - auto asList = maybeAsList.Cast(); - if (asList.ArgCount() != 1) { - return node.Ptr(); - } - - if (auto maybeErase = asList.Arg(0).Maybe<TKiEraseRow>()) { - auto selectRow = map.Input().Cast<TKiSelectRow>(); - auto eraseRow = maybeErase.Cast(); - - YQL_ENSURE(selectRow.Cluster().Raw() == eraseRow.Cluster().Raw()); - - if (selectRow.Table().Raw() == eraseRow.Table().Raw()) { - - auto ret = Build<TCoAsList>(ctx, node.Pos()) - .Add<TKiEraseRow>() - .Cluster(selectRow.Cluster()) - .Table(selectRow.Table()) - .Key(selectRow.Key()) - .Build() - .Done(); - - YQL_CLOG(INFO, ProviderKikimr) << "KiEraseOverSelectRow"; - return ret.Ptr(); - } - } - } - - return node.Ptr(); -} - -TExprNode::TPtr KiRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { - if (!node.Maybe<TCoAggregate>()) { - return node.Ptr(); - } - - auto agg = node.Cast<TCoAggregate>(); - if (!agg.Input().Maybe<TKiSelectRange>() && !agg.Input().Maybe<TCoFlatMap>().Input().Maybe<TKiSelectRange>()) { - return node.Ptr(); - } - - for (size_t i = 0; i < agg.Handlers().Size(); ++i) { - auto aggHandler = agg.Handlers().Item(i); - - // Need to get rid of Unwraps in AggregateToPartitionByKeyWithCombine for DISTINCT case - if (aggHandler.DistinctName().IsValid()) { - return node.Ptr(); - } - } - - YQL_CLOG(INFO, ProviderKikimr) << "KiRewriteAggregate"; - TAggregateExpander aggExpander(true, node.Ptr(), ctx, typesCtx); - return aggExpander.ExpandAggregate(); -} - -TExprNode::TPtr KiRedundantSortByPk(TExprBase node, TExprContext& ctx, - const TKikimrTablesData& tablesData, const TKikimrConfiguration& config) -{ - auto maybeSort = node.Maybe<TCoSort>(); - auto maybePartialSort = node.Maybe<TKiPartialSort>(); - - if (!maybeSort && !maybePartialSort) { - return node.Ptr(); - } - - auto input = maybeSort ? maybeSort.Cast().Input() : maybePartialSort.Cast().Input(); - auto sortDirections = maybeSort ? maybeSort.Cast().SortDirections() : maybePartialSort.Cast().SortDirections(); - auto keySelector = maybeSort ? maybeSort.Cast().KeySelectorLambda() : maybePartialSort.Cast().KeySelectorLambda(); - - auto read = input; - TMaybe<THashSet<TStringBuf>> passthroughFields; - if (maybePartialSort && input.Maybe<TCoFlatMap>()) { - auto flatmap = input.Cast<TCoFlatMap>(); - - if (!IsPassthroughFlatMap(flatmap, &passthroughFields)) { - return node.Ptr(); - } - - read = flatmap.Input(); - } - - if (!read.Maybe<TKiSelectRange>()) { - return node.Ptr(); - } - - auto selectRange = read.Cast<TKiSelectRange>(); - - if (HasSetting(selectRange.Settings().Ref(), "Reverse")) { - // N.B. when SelectRange has a Reverse option we cannot optimize - // sort without complex analysis of how it interacts with sorting - return node.Ptr(); - } - - enum : ui32 { - SortDirectionNone = 0, - SortDirectionForward = 1, - SortDirectionReverse = 2, - SortDirectionUnknown = 4, - }; - - auto getDirection = [] (TExprBase expr) -> ui32 { - if (!expr.Maybe<TCoBool>()) { - return SortDirectionUnknown; - } - - if (!FromString<bool>(expr.Cast<TCoBool>().Literal().Value())) { - return SortDirectionReverse; - } - - return SortDirectionForward; - }; - - ui32 direction = SortDirectionNone; - - if (auto maybeList = sortDirections.Maybe<TExprList>()) { - for (const auto& expr : maybeList.Cast()) { - direction |= getDirection(expr); - if (direction != SortDirectionForward && direction != SortDirectionReverse) { - return node.Ptr(); - } - } - } else { - direction |= getDirection(sortDirections); - if (direction != SortDirectionForward && direction != SortDirectionReverse) { - return node.Ptr(); - } - } - - auto& tableData = tablesData.ExistingTable(selectRange.Cluster().StringValue(), selectRange.Table().Path().StringValue()); - - auto checkKey = [keySelector, &tableData, &passthroughFields] (TExprBase key, ui32 index) { - if (!key.Maybe<TCoMember>()) { - return false; - } - - auto member = key.Cast<TCoMember>(); - if (member.Struct().Raw() != keySelector.Args().Arg(0).Raw()) { - return false; - } - - auto column = TString(member.Name().Value()); - auto columnIndex = tableData.GetKeyColumnIndex(column); - if (!columnIndex || *columnIndex != index) { - return false; - } - - if (passthroughFields && !passthroughFields->contains(column)) { - return false; - } - - return true; - }; - - auto lambdaBody = keySelector.Body(); - if (auto maybeTuple = lambdaBody.Maybe<TExprList>()) { - auto tuple = maybeTuple.Cast(); - for (size_t i = 0; i < tuple.Size(); ++i) { - if (!checkKey(tuple.Item(i), i)) { - return node.Ptr(); - } - } - } else { - if (!checkKey(lambdaBody, 0)) { - return node.Ptr(); - } - } - - if (direction == SortDirectionReverse) { - if (!config.AllowReverseRange()) { - return node.Ptr(); - } - - auto reverseValue = Build<TCoBool>(ctx, node.Pos()) - .Literal().Build("true") - .Done(); - - auto newSettings = Build<TCoNameValueTupleList>(ctx, selectRange.Settings().Pos()) - .Add(TVector<TExprBase>(selectRange.Settings().begin(), selectRange.Settings().end())) - .Add<TCoNameValueTuple>() - .Name().Build("Reverse") - .Value(reverseValue) - .Build() - .Done(); - - TExprNode::TPtr newSelect = Build<TKiSelectRange>(ctx, selectRange.Pos()) - .Cluster(selectRange.Cluster()) - .Table(selectRange.Table()) - .Range(selectRange.Range()) - .Select(selectRange.Select()) - .Settings(newSettings) - .Done() - .Ptr(); - - YQL_CLOG(INFO, ProviderKikimr) << "KiRedundantSortByPkReverse"; - - if (input.Maybe<TCoFlatMap>()) { - auto flatmap = input.Cast<TCoFlatMap>(); - - return Build<TCoFlatMap>(ctx, flatmap.Pos()) - .Input(newSelect) - .Lambda(flatmap.Lambda()) - .Done().Ptr(); - } else { - return newSelect; - } - } - - YQL_CLOG(INFO, ProviderKikimr) << "KiRedundantSortByPk"; - return input.Ptr(); -} - -TExprNode::TPtr KiTopSort(TExprBase node, TExprContext& ctx, const TOptimizeContext& optCtx, - const TKikimrConfiguration& config) -{ - if (config.HasOptDisableTopSort()) { - return node.Ptr(); - } - - if (!node.Maybe<TCoTake>()) { - return node.Ptr(); - } - - auto take = node.Cast<TCoTake>(); - TCoInputBase top = take; - if (!optCtx.IsSingleUsage(take.Input().Ref())) { - return node.Ptr(); - } - - if (!IsKqlPureExpr(take.Count())) { - return node.Ptr(); - } - - auto skip = take.Input().Maybe<TCoSkip>(); - if (skip) { - if (!optCtx.IsSingleUsage(skip.Cast().Input().Ref())) { - return node.Ptr(); - } - - if (!IsKqlPureExpr(skip.Cast().Count())) { - return node.Ptr(); - } - - top = skip.Cast(); - } - - auto sort = top.Input().Maybe<TCoSort>(); - if (sort) { - if (!optCtx.IsSingleUsage(sort.Cast().Input().Ref())) { - return node.Ptr(); - } - - top = sort.Cast(); - } - - bool hasExtend = false; - TVector<TExprBase> inputs; - if (auto maybeExtend = top.Input().Maybe<TCoExtend>()) { - for (const auto& input : maybeExtend.Cast()) { - inputs.push_back(input); - } - hasExtend = true; - } else { - inputs.push_back(top.Input()); - } - - TExprBase takeCount = take.Count(); - if (skip) { - takeCount = Build<TCoPlus>(ctx, node.Pos()) - .Left(take.Count()) - .Right(skip.Cast().Count()) - .Done(); - } - - bool hasFlatMaps = false; - TVector<TExprBase> partialOutputs; - for (auto& input : inputs) { - auto read = input; - if (auto maybeFlatmap = input.Maybe<TCoFlatMap>()) { - auto flatmap = maybeFlatmap.Cast(); - if (!optCtx.IsSingleUsage(flatmap.Input().Ref())) { - return node.Ptr(); - } - - if (!IsKqlPureLambda(flatmap.Lambda())) { - return node.Ptr(); - } - - hasFlatMaps = true; - read = flatmap.Input(); - } - - if (!read.Maybe<TKiSelectRangeBase>()) { - return node.Ptr(); - } - - auto takeInput = input; - if (sort) { - takeInput = Build<TKiPartialSort>(ctx, node.Pos()) - .Input(input) - .SortDirections(sort.Cast().SortDirections()) - .KeySelectorLambda(sort.Cast().KeySelectorLambda()) - .Done(); - } - - auto output = Build<TKiPartialTake>(ctx, node.Pos()) - .Input(takeInput) - .Count(takeCount) - .Done(); - - partialOutputs.push_back(output); - } - - TExprBase merged = Build<TCoExtend>(ctx, node.Pos()) - .Add(partialOutputs) - .Done(); - - if (sort) { - merged = Build<TCoSort>(ctx, node.Pos()) - .Input(merged) - .SortDirections(sort.Cast().SortDirections()) - .KeySelectorLambda(sort.Cast().KeySelectorLambda()) - .Done(); - } else { - auto canRewrite = hasExtend || hasFlatMaps; - if (!canRewrite || take.Count().Maybe<TCoUint64>()) { - return node.Ptr(); - } - } - - YQL_CLOG(INFO, ProviderKikimr) << "KiTopSort"; - - if (skip) { - return Build<TCoTake>(ctx, node.Pos()) - .Input<TCoSkip>() - .Input(merged) - .Count(skip.Count().Cast()) - .Build() - .Count(take.Count()) - .Done() - .Ptr(); - } else { - return Build<TCoTake>(ctx, node.Pos()) - .Input(merged) - .Count(take.Count()) - .Done() - .Ptr(); - } -} - -TMaybeNode<TCoNameValueTupleList> SimplifyKeyTuples(TCoNameValueTupleList keyTupleList, TExprContext& ctx) { - TVector<TCoNameValueTuple> newTuples(keyTupleList.begin(), keyTupleList.end()); - - bool hasChanges = false; - for (ui32 i = 0; i < keyTupleList.Size(); ++i) { - const auto& tuple = keyTupleList.Item(i); - YQL_ENSURE(tuple.Value().IsValid()); - if (auto maybeJust = tuple.Value().Maybe<TCoJust>()) { - hasChanges = true; - newTuples[i] = Build<TCoNameValueTuple>(ctx, maybeJust.Cast().Pos()) - .Name(tuple.Name()) - .Value(maybeJust.Cast().Input()) - .Done(); - } - } - - if (hasChanges) { - return Build<TCoNameValueTupleList>(ctx, keyTupleList.Pos()) - .Add(newTuples) - .Done(); - } - - return TMaybeNode<TCoNameValueTupleList>(); -} - -TExprNode::TPtr KiSimplifyRowKey(TExprBase node, TExprContext& ctx) { - if (auto maybeSelectRow = node.Maybe<TKiSelectRow>()) { - auto selectRow = maybeSelectRow.Cast(); - - if (auto newKey = SimplifyKeyTuples(selectRow.Key(), ctx)) { - return Build<TKiSelectRow>(ctx, node.Pos()) - .Cluster(selectRow.Cluster()) - .Table(selectRow.Table()) - .Key(newKey.Cast()) - .Select(selectRow.Select()) - .Done() - .Ptr(); - } - } - - if (auto maybeEraseRow = node.Maybe<TKiEraseRow>()) { - auto eraseRow = maybeEraseRow.Cast(); - - if (auto newKey = SimplifyKeyTuples(eraseRow.Key(), ctx)) { - return Build<TKiEraseRow>(ctx, node.Pos()) - .Cluster(eraseRow.Cluster()) - .Table(eraseRow.Table()) - .Key(newKey.Cast()) - .Done() - .Ptr(); - } - } - - if (auto maybeUpdateRow = node.Maybe<TKiUpdateRow>()) { - auto updateRow = maybeUpdateRow.Cast(); - - if (auto newKey = SimplifyKeyTuples(updateRow.Key(), ctx)) { - return Build<TKiUpdateRow>(ctx, node.Pos()) - .Cluster(updateRow.Cluster()) - .Table(updateRow.Table()) - .Key(newKey.Cast()) - .Update(updateRow.Update()) - .Done() - .Ptr(); - } - } - - return node.Ptr(); -} +} // namespace -TExprNode::TPtr DoRewriteSelectIndexRange(const TKiSelectIndexRange& selectIndexRange, - const TKikimrTablesData& tablesData, TExprContext& ctx, - const TVector<TString>& extraColumns, const std::function<TExprBase(const TExprBase&)>& middleFilter = {}) +TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx, + TTypeAnnotationContext& types) { - const auto pos = selectIndexRange.Pos(); - - const auto& cluster = selectIndexRange.Cluster().Value(); - const auto& versionedTable = selectIndexRange.Table(); - const auto& indexTableName = selectIndexRange.IndexName().Value(); - - const auto& tableDesc = tablesData.ExistingTable(TString(cluster), TString(versionedTable.Path())); - const auto& indexTableDesc = tablesData.ExistingTable(TString(cluster), TString(indexTableName)); - - const auto& fetchItemArg = Build<TCoArgument>(ctx, pos) - .Name("fetchItem") - .Done(); - - bool needDataRead = false; - for (const auto& col : selectIndexRange.Select()) { - if (!indexTableDesc.Metadata->Columns.contains(TString(col.Value()))) { - needDataRead = true; - break; - } - } - - const bool indexFullScan = TKikimrKeyRange::IsFull(selectIndexRange.Range()); - // Fullscan from index table but without reading data from main is OK - if (indexFullScan && needDataRead) { - auto issue = TIssue(ctx.GetPosition(pos), "Given predicate is not suitable for used index"); - SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_KIKIMR_WRONG_INDEX_USAGE, issue); - if (!ctx.AddWarning(issue)) { - return nullptr; - } - } - - auto keyColumnsList = needDataRead ? BuildKeyColumnsList(tableDesc, pos, ctx) : selectIndexRange.Select(); - auto columns = MergeColumns(keyColumnsList, extraColumns, ctx); - - TExprBase selectKeyRange = Build<TKiSelectRange>(ctx, pos) - .Cluster(selectIndexRange.Cluster()) - .Table(BuildVersionedTable(*indexTableDesc.Metadata, pos, ctx)) - .Range(selectIndexRange.Range()) - .Select(columns) - .Settings(selectIndexRange.Settings()) - .Done(); - - if (middleFilter) { - selectKeyRange = middleFilter(selectKeyRange); - } - - if (!needDataRead) { - return TExprBase(selectKeyRange).Ptr(); - } - - const auto& fetchLambda = Build<TCoLambda>(ctx, pos) - .Args(fetchItemArg) - .Body<TKiSelectRow>() - .Cluster() - .Value(cluster) - .Build() - .Table(versionedTable) - .Key(ExtractNamedKeyTuples(fetchItemArg, tableDesc, ctx)) - .Select(selectIndexRange.Select()) - .Build() - .Done(); - - const auto& flatMap = Build<TCoFlatMap>(ctx, pos) - .Input(selectKeyRange) - .Lambda(fetchLambda) - .Done(); - - YQL_CLOG(INFO, ProviderKikimr) << "KiRewriteSelectIndexRange"; - return TExprBase(flatMap).Ptr(); -} - -TExprNode::TPtr KiRewritePartialTakeSortOverSelectIndexRange(TExprBase node, const TKikimrTablesData& tablesData, TExprContext& ctx) { - auto maybePartialTake = node.Maybe<TKiPartialTake>(); - if (!maybePartialTake) { - return node.Ptr(); - } - - auto partialTake = maybePartialTake.Cast(); - - auto maybePartialSort = partialTake.Input().Maybe<TKiPartialSort>(); - if (!maybePartialSort) { - return node.Ptr(); - } - - auto partialSort = maybePartialSort.Cast(); - - auto maybeSelectIndexRange = partialSort.Input().Maybe<TKiSelectIndexRange>(); - if (!maybeSelectIndexRange) { - return node.Ptr(); - } - - auto selectIndexRange = maybeSelectIndexRange.Cast(); - - const auto cluster = selectIndexRange.Cluster().StringValue(); - const auto indexTableName = selectIndexRange.IndexName().StringValue(); - - const auto& indexDesc = tablesData.ExistingTable(cluster, indexTableName); - - TVector<TString> sortByColumns; - if (!CanPushPartialSort(maybePartialSort.Cast(), indexDesc, &sortByColumns)) { - return node.Ptr(); - } - - auto filter = [&ctx, &node, &partialSort, &partialTake](const TExprBase& in) mutable { - auto out = Build<TKiPartialTake>(ctx, node.Pos()) - .Input<TKiPartialSort>() - .Input(in) - .SortDirections(partialSort.SortDirections()) - .KeySelectorLambda(ctx.DeepCopyLambda(partialSort.KeySelectorLambda().Ref())) - .Build() - .Count(partialTake.Count()) - .Done(); - return TExprBase(out); - }; - - return DoRewriteSelectIndexRange(selectIndexRange, tablesData, ctx, sortByColumns, filter); -} - -TExprNode::TPtr KiRewriteSelectIndexRange(TExprBase node, const TKikimrTablesData& tablesData, TExprContext& ctx) { - if (auto maybeSelectIndexRange = node.Maybe<TKiSelectIndexRange>()) { - return DoRewriteSelectIndexRange(maybeSelectIndexRange.Cast(), tablesData, ctx, {}); - } - - return node.Ptr(); -} - -TExprNode::TPtr KiApplyExtractMembersToSelectRange(TExprBase node, TExprContext& ctx) { - if (!node.Maybe<TCoExtractMembers>().Input().Maybe<TKiSelectRangeBase>()) { - return node.Ptr(); - } - - auto extract = node.Cast<TCoExtractMembers>(); - - if (node.Maybe<TCoExtractMembers>().Input().Maybe<TKiSelectRange>()) { - auto range = extract.Input().Cast<TKiSelectRange>(); - - YQL_CLOG(INFO, ProviderKikimr) << "KiApplyExtractMembersToSelectRange"; - return Build<TKiSelectRange>(ctx, node.Pos()) - .Cluster(range.Cluster()) - .Table(range.Table()) - .Range(range.Range()) - .Select(extract.Members()) - .Settings(range.Settings()) - .Done().Ptr(); - } else if (node.Maybe<TCoExtractMembers>().Input().Maybe<TKiSelectIndexRange>()) { - auto range = extract.Input().Cast<TKiSelectIndexRange>(); - - YQL_CLOG(INFO, ProviderKikimr) << "KiApplyExtractMembersToSelectRange"; - return Build<TKiSelectIndexRange>(ctx, node.Pos()) - .Cluster(range.Cluster()) - .Table(range.Table()) - .Range(range.Range()) - .Select(extract.Members()) - .Settings(range.Settings()) - .IndexName(range.IndexName()) - .Done().Ptr(); - } else { - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Unexpected callable")); - return nullptr; - } -} + Y_UNUSED(sessionCtx); + Y_UNUSED(types); -} // namespace - -TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types) { - return CreateFunctorTransformer([sessionCtx, &types](const TExprNode::TPtr& input, TExprNode::TPtr& output, + return CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - typedef IGraphTransformer::TStatus TStatus; - - TParentsMap parentsMap; - TOptimizeContext optCtx; - GatherParents(*input, parentsMap); - optCtx.ParentsMap = &parentsMap; - - TStatus status = OptimizeExpr(input, output, [sessionCtx, &optCtx, &types](const TExprNode::TPtr& inputNode, TExprContext& ctx) { - auto ret = inputNode; - TExprBase node(inputNode); - - ret = KiSqlInToEquiJoin(node, sessionCtx->Tables(), sessionCtx->Config(), ctx); - if (ret != inputNode) { - return ret; - } - - ret = KiApplyExtractMembersToSelectRange(node, ctx); - if (ret != inputNode) { - return ret; - } - - ret = KiApplyExtractMembersToSelectRow(node, ctx); - if (ret != inputNode) { - return ret; - } + Y_UNUSED(input); + Y_UNUSED(output); + Y_UNUSED(ctx); - ret = KiRedundantSortByPk(node, ctx, sessionCtx->Tables(), sessionCtx->Config()); - if (ret != inputNode) { - return ret; - } - - ret = KiApplyLimitToSelectRange(node, ctx); - if (ret != inputNode) { - return ret; - } - - ret = KiPushPredicateToSelectRange(node, ctx, sessionCtx->Tables(), sessionCtx->Config()); - if (ret != inputNode) { - return ret; - } - - ret = KiEraseOverSelectRow(node, ctx); - if (ret != inputNode) { - return ret; - } - - ret = KiRewriteEquiJoin(node, sessionCtx->Tables(), sessionCtx->Config(), ctx); - if (ret != inputNode) { - return ret; - } - - ret = KiRewriteAggregate(node, ctx, types); - if (ret != inputNode) { - return ret; - } - - ret = KiTopSort(node, ctx, optCtx, sessionCtx->Config()); - if (ret != inputNode) { - return ret; - } - - ret = KiSimplifyRowKey(node, ctx); - if (ret != inputNode) { - return ret; - } - - ret = KiRewritePartialTakeSortOverSelectIndexRange(node, sessionCtx->Tables(), ctx); - if (ret != inputNode) { - return ret; - } - - return ret; - }, ctx, TOptimizeExprSettings(nullptr)); - - return status; + return IGraphTransformer::TStatus::Ok; }); } @@ -789,11 +108,6 @@ TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr return ret; } - ret = KiRewriteSelectIndexRange(node, sessionCtx->Tables(), ctx); - if (ret != inputNode) { - return ret; - } - return ret; }, ctx, TOptimizeExprSettings(nullptr)); @@ -801,24 +115,4 @@ TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr }); } -bool IsKqlPureExpr(NNodes::TExprBase expr) { - auto node = FindNode(expr.Ptr(), [] (const TExprNode::TPtr& node) { - if (!node->IsCallable()) { - return false; - } - - if (!KikimrKqlFunctions().contains(node->Content())) { - return false; - } - - return true; - }); - - return !node; -} - -bool IsKqlPureLambda(TCoLambda lambda) { - return IsKqlPureExpr(lambda.Body()); -} - } // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_join.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_join.cpp deleted file mode 100644 index 15c78510d88..00000000000 --- a/ydb/core/kqp/provider/yql_kikimr_opt_join.cpp +++ /dev/null @@ -1,936 +0,0 @@ -#include "yql_kikimr_provider_impl.h" -#include "yql_kikimr_opt_utils.h" - -#include <ydb/library/yql/core/yql_join.h> -#include <ydb/library/yql/core/yql_opt_utils.h> -#include <ydb/library/yql/utils/log/log.h> -#include <util/string/cast.h> - -namespace NYql { -namespace { - -using namespace NNodes; -using namespace NCommon; - -using TGetExprFunc = std::function<TExprBase()>; - -TExprBase GetEquiJoinInputList(TCoAtom scope, const TJoinLabels& joinLabels, - const TVector<TCoEquiJoinInput>& joinInputs) -{ - auto inputIndex = joinLabels.FindInputIndex(scope.Value()); - YQL_ENSURE(inputIndex); - - return joinInputs[*inputIndex].List(); -} - -void GatherEquiJoinLables(TExprBase joinScope, TVector<TString>& labels) { - if (joinScope.Maybe<TCoAtom>()) { - labels.emplace_back(TString(joinScope.Cast<TCoAtom>())); - return; - } - - auto joinTuple = joinScope.Cast<TCoEquiJoinTuple>(); - GatherEquiJoinLables(joinTuple.LeftScope(), labels); - GatherEquiJoinLables(joinTuple.RightScope(), labels); -} - -TSet<TString> GetEquiJoinLabelReps(TExprBase joinScope) { - TVector<TString> labels; - GatherEquiJoinLables(joinScope, labels); - return TSet<TString>(labels.begin(), labels.end()); -} - -TExprBase ConvertToTuples(const TSet<TString>& columns, const TCoArgument& structArg, TExprContext& ctx, - TPositionHandle pos) -{ - TVector<TExprBase> tuples{Reserve(columns.size())}; - - for (const auto& key : columns) { - tuples.emplace_back(Build<TCoMember>(ctx, pos) - .Struct(structArg) - .Name().Build(key) - .Done()); - } - - if (tuples.size() == 1) { - return tuples[0]; - } - - return Build<TExprList>(ctx, pos) - .Add(tuples) - .Done(); -} - -TVector<TExprBase> ConvertToAtoms(const TSet<TString>& columns, TExprContext& ctx, TPositionHandle pos) { - TVector<TExprBase> list{Reserve(columns.size())}; - for (const auto& column : columns) { - list.emplace_back(TCoAtom(ctx.NewAtom(pos, column))); - } - return list; -}; - - -TMaybeNode<TExprBase> EquiJoinGetIdxLookupValue(const TStringBuf& leftDataName, const TStringBuf& rightDataName, - TExprBase leftRow, const TString& leftMemberName, TPositionHandle pos, TExprContext& ctx) -{ - auto leftMember = Build<TCoMember>(ctx, pos) - .Struct(leftRow) - .Name().Build(leftMemberName) - .Done(); - - TExprBase value = leftMember; - if (leftDataName != rightDataName) { - bool canConvert = false; - - if (IsDataTypeNumeric(NKikimr::NUdf::GetDataSlot(leftDataName)) && IsDataTypeNumeric(NKikimr::NUdf::GetDataSlot(rightDataName))) { - canConvert = GetNumericDataTypeLevel(NKikimr::NUdf::GetDataSlot(rightDataName)) - >= GetNumericDataTypeLevel(NKikimr::NUdf::GetDataSlot(leftDataName)); - } - - if (leftDataName == "Utf8" && rightDataName == "String") { - canConvert = true; - } - - if (!canConvert) { - return TMaybeNode<TExprBase>(); - } - - value = Build<TCoConvert>(ctx, pos) - .Input(value) - .Type().Build(rightDataName) - .Done(); - } - - return value; -} - -bool EquiJoinToIdxLookup(TGetExprFunc getLeftExpr, TCoEquiJoinTuple joinTuple, const TJoinLabels& joinLabels, - const TVector<TCoEquiJoinInput>& joinInputs, const TKikimrTablesData& tablesData, TExprContext& ctx, - TMaybeNode<TExprBase>& idxLookupExpr) -{ - if (!joinTuple.RightScope().Maybe<TCoAtom>()) { - // Can't lookup in join result - return false; - } - - static const struct { - const TStringBuf Left = "Left"; - const TStringBuf Inner = "Inner"; - const TStringBuf LeftSemi = "LeftSemi"; - const TStringBuf LeftOnly = "LeftOnly"; - const TStringBuf RightSemi = "RightSemi"; - } joinNames; - - static const TVector<TStringBuf> allowedJoins { - joinNames.Left, - joinNames.Inner, - joinNames.LeftSemi, - joinNames.LeftOnly, - joinNames.RightSemi - }; - - const TStringBuf joinType = joinTuple.Type().Value(); - if (Find(allowedJoins, joinType) == allowedJoins.cend()) { - return false; - } - - auto linkSettings = GetEquiJoinLinkSettings(joinTuple.Options().Ref()); - if (linkSettings.LeftHints.contains("any") || linkSettings.RightHints.contains("any")) { - return false; - } - - auto rightExpr = GetEquiJoinInputList(joinTuple.RightScope().Cast<TCoAtom>(), joinLabels, joinInputs); - - TMaybeNode<TKiSelectRangeBase> rightSelect; - TMaybeNode<TCoFlatMap> rightFlatmap; - TMaybeNode<TCoFilterNullMembers> rightFilterNull; - TMaybeNode<TCoSkipNullMembers> rightSkipNull; - TMaybeNode<TCoAtom> indexTable; - - if (auto select = rightExpr.Maybe<TKiSelectRangeBase>()) { - rightSelect = select; - } - - if (auto select = rightExpr.Maybe<TCoFlatMap>().Input().Maybe<TKiSelectRangeBase>()) { - rightSelect = select; - rightFlatmap = rightExpr.Cast<TCoFlatMap>(); - } - - if (auto select = rightExpr.Maybe<TCoFlatMap>().Input().Maybe<TCoFilterNullMembers>() - .Input().Maybe<TKiSelectRangeBase>()) - { - rightSelect = select; - rightFlatmap = rightExpr.Cast<TCoFlatMap>(); - rightFilterNull = rightFlatmap.Input().Cast<TCoFilterNullMembers>(); - } - - if (auto select = rightExpr.Maybe<TCoFlatMap>().Input().Maybe<TCoSkipNullMembers>() - .Input().Maybe<TKiSelectRangeBase>()) - { - rightSelect = select; - rightFlatmap = rightExpr.Cast<TCoFlatMap>(); - rightSkipNull = rightFlatmap.Input().Cast<TCoSkipNullMembers>(); - } - - if (auto indexSelect = rightSelect.Maybe<TKiSelectIndexRange>()) { - indexTable = indexSelect.Cast().IndexName(); - } - - if (!rightSelect) { - return false; - } - - if (rightFlatmap && !IsPassthroughFlatMap(rightFlatmap.Cast(), nullptr)) { - // Can't lookup in modified table - return false; - } - - const auto selectRange = rightSelect.Cast(); - const TStringBuf cluster = selectRange.Cluster().Value(); - const TStringBuf lookupTable = indexTable ? indexTable.Cast().Value() : selectRange.Table().Path().Value(); - const TKikimrTableDescription& lookupTableDesc = tablesData.ExistingTable(cluster, lookupTable); - - auto rightKeyRange = TKikimrKeyRange::GetPointKeyRange(ctx, lookupTableDesc, selectRange.Range()); - if (!rightKeyRange) { - // Don't rewrite join with arbitrary range - return false; - } - - for (size_t i = 0; i < rightKeyRange->GetColumnRangesCount(); ++i) { - auto columnRange = rightKeyRange->GetColumnRange(i); - if (columnRange.IsDefined()) { - YQL_ENSURE(columnRange.IsPoint()); - auto argument = FindNode(columnRange.GetFrom().GetValue().Ptr(), [] (const TExprNode::TPtr& node) { - return node->IsArgument(); - }); - - if (argument) { - // Can't lookup in dependent range - return false; - } - } - } - - auto leftLabelReps = GetEquiJoinLabelReps(joinTuple.LeftScope()); - YQL_ENSURE(!leftLabelReps.empty()); - auto rightLabelReps = GetEquiJoinLabelReps(joinTuple.RightScope()); - YQL_ENSURE(rightLabelReps.size() == 1); - YQL_ENSURE(rightLabelReps.contains(joinTuple.RightScope().Cast<TCoAtom>().Value())); - - auto joinKeyCount = joinTuple.RightKeys().Size() / 2; - - bool prefixLeftMembers = !joinTuple.LeftScope().Maybe<TCoAtom>(); - - TCoArgument leftRowArg = Build<TCoArgument>(ctx, joinTuple.Pos()) - .Name("leftRow") - .Done(); - - TSet<TString> leftKeyColumnsSet; - TSet<TString> rightKeyColumnsSet; - TVector<TColumnRange> keyColumnRanges(lookupTableDesc.Metadata->KeyColumnNames.size()); - - for (size_t i = 0; i < joinKeyCount; ++i) { - auto joinLeftLabel = joinTuple.LeftKeys().Item(i * 2); - auto joinLeftColumn = joinTuple.LeftKeys().Item(i * 2 + 1); - auto joinRightLabel = joinTuple.RightKeys().Item(i * 2); - auto joinRightColumn = joinTuple.RightKeys().Item(i * 2 + 1); - - if (!rightLabelReps.contains(joinRightLabel) || !leftLabelReps.contains(joinLeftLabel)) { - // Join already rewritten, skip opt - return false; - } - - TString leftColumnName = ToString(joinLeftColumn.Value()); - TString leftMemberName = prefixLeftMembers - ? FullColumnName(joinLeftLabel.Value(), joinLeftColumn.Value()) - : leftColumnName; - - TString rightColumnName = ToString(joinRightColumn.Value()); - - auto keyColumnIdx = lookupTableDesc.GetKeyColumnIndex(rightColumnName); - if (!keyColumnIdx) { - // Non-key columns in join key currently not supported - return false; - } - - if (rightKeyColumnsSet.contains(rightColumnName)) { - // Can't lookup same column twice - return false; - } - - leftKeyColumnsSet.insert(leftMemberName); - rightKeyColumnsSet.insert(rightColumnName); - - auto leftInput = GetEquiJoinInputList(joinLeftLabel, joinLabels, joinInputs); - const TDataExprType* leftData; - const TDataExprType* rightData; - if (!GetEquiJoinKeyTypes(leftInput, leftColumnName, lookupTableDesc, rightColumnName, leftData, rightData)) { - return false; - } - - auto value = EquiJoinGetIdxLookupValue(leftData->GetName(), rightData->GetName(), leftRowArg, - leftMemberName, joinTuple.Pos(), ctx); - if (!value) { - return false; - } - - keyColumnRanges[*keyColumnIdx] = TColumnRange::MakePoint(value.Cast()); - } - - for (size_t i = 0; i < keyColumnRanges.size(); ++i) { - bool leftColumnDefined = keyColumnRanges[i].IsDefined(); - if (leftColumnDefined && rightKeyRange->GetColumnRange(i).IsDefined()) { - return false; - } - if (!leftColumnDefined) { - keyColumnRanges[i] = rightKeyRange->GetColumnRange(i); - } - - YQL_ENSURE(keyColumnRanges[i].IsPoint() || !keyColumnRanges[i].IsDefined()); - } - - for (size_t i = 0; i < keyColumnRanges.size() - 1; ++i) { - if (!keyColumnRanges[i].IsDefined() && keyColumnRanges[i + 1].IsDefined()) { - // Invalid lookup key - // TODO: Move part of lookup key to residual predicate - return false; - } - } - - TCoAtomList lookupColumns = selectRange.Select(); - bool requireIndexValues = false; // 'true' means that some requested columns are not presented in the index-table, - // so read from data-table is required - if (indexTable) { - // In this case lookupTableDesc == indexTable, - // so check whether index-table contains all lookup-columns - for (const auto& lookupColumn : lookupColumns) { - if (!lookupTableDesc.Metadata->Columns.contains(TString(lookupColumn.Value()))) { - requireIndexValues = true; - break; - } - } - } - - auto selectedColumns = (indexTable && requireIndexValues) - ? BuildKeyColumnsList(lookupTableDesc, selectRange.Pos(), ctx) - : lookupColumns; - - auto lookup = TKikimrKeyRange::BuildReadRangeExpr(lookupTableDesc, TKeyRange(ctx, keyColumnRanges, {}), - selectedColumns, false /* allowNulls */, ctx); - - // Skip null keys in lookup part as for equijoin semantics null != null, - // so we can't have nulls in lookup part - lookup = Build<TCoSkipNullMembers>(ctx, joinTuple.Pos()) - .Input(lookup) - .Members() - .Add(ConvertToAtoms(rightKeyColumnsSet, ctx, joinTuple.Pos())) - .Build() - .Done(); - - if (rightFilterNull) { - lookup = Build<TCoFilterNullMembers>(ctx, joinTuple.Pos()) - .Input(lookup) - .Members(rightFilterNull.Cast().Members()) - .Done(); - } - - if (rightSkipNull) { - lookup = Build<TCoSkipNullMembers>(ctx, joinTuple.Pos()) - .Input(lookup) - .Members(rightSkipNull.Cast().Members()) - .Done(); - } - - // If we have index table AND need data from main we cand add this flat map here - // because lookup is index table and does not have all columns - if (rightFlatmap && !requireIndexValues) { - lookup = Build<TCoFlatMap>(ctx, joinTuple.Pos()) - .Input(lookup) - .Lambda(rightFlatmap.Cast().Lambda()) - .Done(); - } - - auto addJoinResults = [&joinLabels, &ctx] - (const TExprBase& joinTuple, const TExprBase& rowArg, TVector<TExprBase>& resultTuples, - TVector<TString>* resultColumns) - { - TVector<std::pair<TString, TString>> joinColumns; - if (auto maybeAtom = joinTuple.Maybe<TCoAtom>()) { - auto maybeInput = joinLabels.FindInput(maybeAtom.Cast().Value()); - YQL_ENSURE(maybeInput); - - auto input = *maybeInput; - - for (auto item: input->InputType->GetItems()) { - joinColumns.emplace_back(item->GetName(), input->FullName(item->GetName())); - } - } else { - auto columns = GetJoinColumnTypes(joinTuple.Ref(), joinLabels, ctx); - for (auto it: joinLabels.Inputs) { - for (auto item: it.InputType->GetItems()) { - auto member = item->GetName(); - auto column = it.FullName(member); - - if (columns.FindPtr(column)) { - joinColumns.emplace_back(member, column); - } - } - } - } - - for (auto& pair : joinColumns) { - auto& member = pair.first; - auto& column = pair.second; - - if (resultColumns) { - resultColumns->push_back(column); - } - - auto tuple = Build<TCoNameValueTuple>(ctx, joinTuple.Pos()) - .Name().Build(column) - .Value<TCoMember>() - .Struct(rowArg) - .Name().Build(joinTuple.Maybe<TCoAtom>() ? member : column) - .Build() - .Done(); - - resultTuples.emplace_back(std::move(tuple)); - } - }; - - auto injectRightDataKey = [&tablesData, &selectRange, &ctx] - (TMaybeNode<TExprBase>& rightRow, TVector<TExprBase>& joinResultTuples) - { - const TStringBuf cluster = selectRange.Cluster(); - const TStringBuf table = selectRange.Table().Path(); - const auto& desc = tablesData.ExistingTable(cluster, table); - for (const auto& col : desc.Metadata->KeyColumnNames) { - auto tuple = Build<TCoNameValueTuple>(ctx, selectRange.Pos()) - .Name().Build(FullColumnName(desc.Metadata->Name, col)) - .Value<TCoMember>() - .Struct(rightRow.Cast()) - .Name().Build(col) - .Build() - .Done(); - joinResultTuples.push_back(tuple); - } - }; - - auto getJoinResultExpr = [requireIndexValues, &indexTable, &addJoinResults, - &injectRightDataKey, &ctx, &joinTuple] - (TMaybeNode<TExprBase> leftRowArg, TMaybeNode<TExprBase> rightRowArg) - { - TVector<TString> resultColumns; - TVector<TExprBase> joinResultTuples; - - if (leftRowArg.IsValid()) { - addJoinResults(joinTuple.LeftScope(), leftRowArg.Cast(), joinResultTuples, &resultColumns); - } - - if (rightRowArg.IsValid()) { - if (!requireIndexValues) { - addJoinResults(joinTuple.RightScope(), rightRowArg.Cast(), joinResultTuples, &resultColumns); - } else { - // we need data from the right table that not contains in the lookup-table (not indexed columns), - // so we add reads from the right table - YQL_ENSURE(indexTable.IsValid()); - //TODO: May be it'a a good idea to get some collumns data from the index-read? - injectRightDataKey(rightRowArg, joinResultTuples); - } - } else { - YQL_ENSURE(!requireIndexValues); - } - - auto expr = Build<TCoAsStruct>(ctx, joinTuple.Pos()) - .Add(joinResultTuples) - .Done(); - - return std::make_pair(expr, resultColumns); - }; - - auto finalizeJoinResultExpr = [&joinTuple, &tablesData, &ctx, - &selectRange, &addJoinResults, &rightFlatmap, &joinType] - (const TExprBase& input, const TVector<TString>& finishedColumns, bool needExtraRead) - -> NNodes::TExprBase - { - if (!needExtraRead) { - return input; - } - - TCoArgument joinResult = Build<TCoArgument>(ctx, joinTuple.Pos()) - .Name("joinResult") - .Done(); - - TVector<TExprBase> joinResultTuples; - for (const auto& col : finishedColumns) { - auto tuple = Build<TCoNameValueTuple>(ctx, joinTuple.Pos()) - .Name().Build(col) - .Value<TCoMember>() - .Struct(joinResult) - .Name().Build(col) - .Build() - .Done(); - joinResultTuples.push_back(tuple); - } - - const TStringBuf cluster = selectRange.Cluster(); - const TStringBuf table = selectRange.Table().Path(); - const auto& tableDesc = tablesData.ExistingTable(cluster, table); - TExprBase select = Build<TKiSelectRow>(ctx, joinTuple.Pos()) - .Cluster(selectRange.Cluster()) - .Table(selectRange.Table()) - .Key(ExtractNamedKeyTuples(joinResult, tableDesc, ctx, tableDesc.Metadata->Name)) - .Select(selectRange.Select()) - .Done(); - - if (rightFlatmap) { - select = Build<TCoFlatMap>(ctx, joinTuple.Pos()) - .Input(select) - .Lambda(rightFlatmap.Cast().Lambda()) - .Done(); - } - - addJoinResults(joinTuple.RightScope(), select, joinResultTuples, nullptr); - - if (joinType == joinNames.Inner || joinType == joinNames.RightSemi) { - return Build<TCoFlatMap>(ctx, joinTuple.Pos()) - .Input(input) - .Lambda() - .Args({joinResult}) - .Body<TCoOptionalIf>() - .Predicate<TCoHasItems>() - .List<TCoToList>() - .Optional(select) - .Build() - .Build() - .Value<TCoAsStruct>() - .Add(joinResultTuples) - .Build() - .Build() - .Build() - .Done(); - } else if (joinType == joinNames.Left){ - return Build<TCoMap>(ctx, joinTuple.Pos()) - .Input(input) - .Lambda() - .Args({joinResult}) - .Body<TCoAsStruct>() - .Add(joinResultTuples) - .Build() - .Build() - .Done(); - } else { - YQL_ENSURE(false, "unknown join type to call finalizeJoinResultExpr " << joinType); - } - }; - - auto leftExpr = getLeftExpr(); - TCoArgument rightRowArg = Build<TCoArgument>(ctx, joinTuple.Pos()) - .Name("rightRow") - .Done(); - - if (joinType == joinNames.Left) { - TExprBase rightNothing = Build<TCoNothing>(ctx, joinTuple.Pos()) - .OptionalType<TCoTypeOf>() - .Value<TCoToOptional>() - .List(lookup) - .Build() - .Build() - .Done(); - - auto joinResultExpr = getJoinResultExpr(leftRowArg, rightRowArg); - - auto joinMap = Build<TCoFlatMap>(ctx, joinTuple.Pos()) - .Input(leftExpr) - .Lambda() - .Args({leftRowArg}) - .Body<TCoIf>() - .Predicate<TCoHasItems>() - .List(lookup) - .Build() - .ThenValue<TCoMap>() - .Input(lookup) - .Lambda() - .Args({rightRowArg}) - .Body(joinResultExpr.first) - .Build() - .Build() - .ElseValue<TCoAsList>() - .Add(getJoinResultExpr(leftRowArg, rightNothing).first) - .Build() - .Build() - .Build() - .Done(); - idxLookupExpr = finalizeJoinResultExpr(joinMap, joinResultExpr.second, requireIndexValues); - return true; - } - - if (joinType == joinNames.Inner) { - const auto joinResultExpr = getJoinResultExpr(leftRowArg, rightRowArg); - - auto joinMap = Build<TCoFlatMap>(ctx, joinTuple.Pos()) - .Input<TCoSkipNullMembers>() - .Input(leftExpr) - .Members() - .Add(ConvertToAtoms(leftKeyColumnsSet, ctx, joinTuple.Pos())) - .Build() - .Build() - .Lambda() - .Args({leftRowArg}) - .Body<TCoMap>() - .Input(lookup) - .Lambda() - .Args({rightRowArg}) - .Body(joinResultExpr.first) - .Build() - .Build() - .Build() - .Done(); - idxLookupExpr = finalizeJoinResultExpr(joinMap, joinResultExpr.second, requireIndexValues); - return true; - } - - if (joinType == joinNames.LeftSemi) { - idxLookupExpr = Build<TCoFlatMap>(ctx, joinTuple.Pos()) - .Input<TCoSkipNullMembers>() - .Input(leftExpr) - .Members() - .Add(ConvertToAtoms(leftKeyColumnsSet, ctx, joinTuple.Pos())) - .Build() - .Build() - .Lambda() - .Args({leftRowArg}) - .Body<TCoOptionalIf>() - .Predicate<TCoHasItems>() - .List(lookup) - .Build() - .Value(getJoinResultExpr(leftRowArg, {}).first) - .Build() - .Build() - .Done(); - return true; - } - - if (joinType == joinNames.LeftOnly) { - idxLookupExpr = Build<TCoFlatMap>(ctx, joinTuple.Pos()) - .Input(leftExpr) - .Lambda() - .Args({leftRowArg}) - .Body<TCoOptionalIf>() - .Predicate<TCoNot>() - .Value<TCoHasItems>() - .List(lookup) - .Build() - .Build() - .Value(getJoinResultExpr(leftRowArg, {}).first) - .Build() - .Build() - .Done(); - return true; - } - - if (joinType == joinNames.RightSemi) { - // In this case we iterate over left table (with deduplication) - // and do index-lookup in the right one. - - auto joinResultExpr = getJoinResultExpr({}, rightRowArg); - - // drop nulls - leftExpr = Build<TCoSkipNullMembers>(ctx, joinTuple.Pos()) - .Input(leftExpr) - .Members() - .Add(ConvertToAtoms(leftKeyColumnsSet, ctx, joinTuple.Pos())) - .Build() - .Done(); - - // deduplicate keys in the left table - leftExpr = DeduplicateByMembers(leftExpr, leftKeyColumnsSet, ctx, joinTuple.Pos()); - - auto joinMap = Build<TCoFlatMap>(ctx, joinTuple.Pos()) - .Input(leftExpr) - .Lambda() - .Args({leftRowArg}) - .Body<TCoMap>() - .Input(lookup) - .Lambda() - .Args({rightRowArg}) - .Body(joinResultExpr.first) - .Build() - .Build() - .Build() - .Done(); - - // add extra reads if required - idxLookupExpr = finalizeJoinResultExpr(joinMap, joinResultExpr.second, requireIndexValues); - return true; - } - - YQL_ENSURE(false, "Unexpected join type " << joinType); - return false; -} - -TExprBase GetEquiJoinLabelsNode(const TVector<TString>& labels, TPositionHandle pos, TExprContext& ctx) { - TVector<TExprBase> labelAtoms; - for (auto& label : labels) { - auto atom = Build<TCoAtom>(ctx, pos) - .Value(label) - .Done(); - labelAtoms.push_back(atom); - } - - if (labelAtoms.size() == 1) { - return labelAtoms.front(); - } - - return Build<TCoAtomList>(ctx, pos) - .Add(labelAtoms) - .Done(); -} - -TCoEquiJoin BuildPairEquiJoin(TCoEquiJoinTuple joinTuple, TExprBase leftList, TExprBase rightList, - TExprContext& ctx) -{ - TVector<TString> leftLabels; - GatherEquiJoinLables(joinTuple.LeftScope(), leftLabels); - YQL_ENSURE(!leftLabels.empty()); - - TVector<TString> rightLabels; - GatherEquiJoinLables(joinTuple.RightScope(), rightLabels); - YQL_ENSURE(!rightLabels.empty()); - - auto join = Build<TCoEquiJoin>(ctx, joinTuple.Pos()) - .Add<TCoEquiJoinInput>() - .List(leftList) - .Scope(GetEquiJoinLabelsNode(leftLabels, joinTuple.Pos(), ctx)) - .Build() - .Add<TCoEquiJoinInput>() - .List(rightList) - .Scope(GetEquiJoinLabelsNode(rightLabels, joinTuple.Pos(), ctx)) - .Build() - .Add<TCoEquiJoinTuple>() - .Type(joinTuple.Type()) - .LeftScope<TCoAtom>().Build(leftLabels.front()) - .RightScope<TCoAtom>().Build(rightLabels.front()) - .LeftKeys(joinTuple.LeftKeys()) - .RightKeys(joinTuple.RightKeys()) - .Options(joinTuple.Options()) - .Build() - .Add<TExprList>().Build() - .Done(); - - return join; -} - -TExprBase GetEquiJoinTreeExpr(const TExprBase& joinScope, const TVector<TCoEquiJoinInput>& joinInputs, - const TJoinLabels& joinLabels, TExprContext& ctx) -{ - if (joinScope.Maybe<TCoAtom>()) { - return GetEquiJoinInputList(joinScope.Cast<TCoAtom>(), joinLabels, joinInputs); - } - - auto joinTree = joinScope.Cast<TCoEquiJoinTuple>(); - - TVector<TString> labels; - GatherEquiJoinLables(joinTree, labels); - - TVector<TExprBase> newJoinInputs; - - THashSet<ui32> usedIndices; - for (auto& label : labels) { - auto index = joinLabels.FindInputIndex(label); - YQL_ENSURE(index); - - if (!usedIndices.contains(*index)) { - newJoinInputs.push_back(joinInputs[*index]); - usedIndices.insert(*index); - } - } - - auto join = Build<TCoEquiJoin>(ctx, joinScope.Pos()) - .Add(newJoinInputs) - .Add(joinTree) - .Add<TExprList>().Build() - .Done(); - - return join; -} - -TMaybe<TStringBuf> TryFlipJoinType(TStringBuf joinType) { - if (joinType == TStringBuf("Inner")) { - return TStringBuf("Inner"); - } - if (joinType == TStringBuf("LeftSemi")) { - return TStringBuf("RightSemi"); - } - if (joinType == TStringBuf("RightSemi")) { - return TStringBuf("LeftSemi"); - } - if (joinType == TStringBuf("Right")) { - return TStringBuf("Left"); - } - if (joinType == TStringBuf("RightOnly")) { - return TStringBuf("LeftOnly"); - } - return Nothing(); -} - -bool RewriteEquiJoinInternal(const TCoEquiJoinTuple& joinTree, const TVector<TCoEquiJoinInput>& joinInputs, - const TJoinLabels& joinLabels, const TKikimrTablesData& tablesData, TExprContext& ctx, - const TKikimrConfiguration& config, TMaybeNode<TExprBase>& rewrittenExpr) -{ - bool leftRewritten = false; - TMaybeNode<TExprBase> leftExpr; - if (!joinTree.LeftScope().Maybe<TCoAtom>()) { - leftRewritten = RewriteEquiJoinInternal(joinTree.LeftScope().Cast<TCoEquiJoinTuple>(), joinInputs, - joinLabels, tablesData, ctx, config, leftExpr); - } - - bool rightRewritten = false; - TMaybeNode<TExprBase> rightExpr; - if (!joinTree.RightScope().Maybe<TCoAtom>()) { - rightRewritten = RewriteEquiJoinInternal(joinTree.RightScope().Cast<TCoEquiJoinTuple>(), joinInputs, - joinLabels, tablesData, ctx, config, rightExpr); - } - - auto getLeftExpr = [leftRewritten, leftExpr, joinTree, &joinInputs, &joinLabels, &ctx] () { - return leftRewritten - ? leftExpr.Cast() - : GetEquiJoinTreeExpr(joinTree.LeftScope(), joinInputs, joinLabels, ctx); - }; - - auto getRightExpr = [rightRewritten, rightExpr, joinTree, &joinInputs, &joinLabels, &ctx] () { - return rightRewritten - ? rightExpr.Cast() - : GetEquiJoinTreeExpr(joinTree.RightScope(), joinInputs, joinLabels, ctx); - }; - - if (!config.HasOptDisableJoinTableLookup()) { - if (EquiJoinToIdxLookup(getLeftExpr, joinTree, joinLabels, joinInputs, tablesData, ctx, rewrittenExpr)) { - return true; - } - - bool tryFlip = joinTree.Type().Value() == TStringBuf("LeftSemi") - ? !config.HasOptDisableJoinReverseTableLookupLeftSemi() - : !config.HasOptDisableJoinReverseTableLookup(); - - if (tryFlip) { - // try to switch left and right subtrees and do rewrite one more time - if (auto flipJoinType = TryFlipJoinType(joinTree.Type().Value())) { - auto flipJoinTree = Build<TCoEquiJoinTuple>(ctx, joinTree.Pos()) - .Type().Build(*flipJoinType) - .LeftScope(joinTree.RightScope()) - .LeftKeys(joinTree.RightKeys()) - .RightScope(joinTree.LeftScope()) - .RightKeys(joinTree.LeftKeys()) - .Options(joinTree.Options()) - .Done(); - - if (EquiJoinToIdxLookup(getRightExpr, flipJoinTree, joinLabels, joinInputs, tablesData, ctx, - rewrittenExpr)) - { - return true; - } - } - } - } - - if (leftRewritten || rightRewritten) { - rewrittenExpr = BuildPairEquiJoin(joinTree, getLeftExpr(), getRightExpr(), ctx); - return true; - } - - return false; -} - -} // namespace - -TExprBase DeduplicateByMembers(const TExprBase& expr, const TSet<TString>& members, TExprContext& ctx, - TPositionHandle pos) -{ - auto structArg = Build<TCoArgument>(ctx, pos) - .Name("struct") - .Done(); - - return Build<TCoPartitionByKey>(ctx, pos) - .Input(expr) - .KeySelectorLambda() - .Args(structArg) - .Body(ConvertToTuples(members, structArg, ctx, pos)) - .Build() - .SortDirections<TCoVoid>() - .Build() - .SortKeySelectorLambda<TCoVoid>() - .Build() - .ListHandlerLambda() - .Args({"stream"}) - .Body<TCoFlatMap>() - .Input("stream") - .Lambda() - .Args({"tuple"}) - .Body<TCoTake>() - .Input<TCoNth>() - .Tuple("tuple") - .Index().Value("1").Build() - .Build() - .Count<TCoUint64>() - .Literal().Value("1").Build() - .Build() - .Build() - .Build() - .Build() - .Build() - .Done(); -} - -TExprNode::TPtr KiRewriteEquiJoin(TExprBase node, const TKikimrTablesData& tablesData, - const TKikimrConfiguration& config, TExprContext& ctx) -{ - if (!node.Maybe<TCoEquiJoin>()) { - return node.Ptr(); - } - - if (config.HasOptDisableJoinRewrite()) { - return node.Ptr(); - } - - auto join = node.Cast<TCoEquiJoin>(); - auto joinTree = join.Arg(join.ArgCount() - 2).Cast<TCoEquiJoinTuple>(); - - TVector<TCoEquiJoinInput> joinInputs; - TJoinLabels joinLabels; - for (size_t i = 0; i < join.ArgCount() - 2; ++i) { - auto input = join.Arg(i).Cast<TCoEquiJoinInput>(); - joinInputs.push_back(input); - - auto itemType = input.List().Ptr()->GetTypeAnn()->Cast<TListExprType>()->GetItemType(); - joinLabels.Add(ctx, *input.Scope().Ptr(), itemType->Cast<TStructExprType>()); - } - - TMaybeNode<TExprBase> rewrittenJoin; - if (!RewriteEquiJoinInternal(joinTree, joinInputs, joinLabels, tablesData, ctx, config, rewrittenJoin)) { - return node.Ptr(); - } - - YQL_ENSURE(rewrittenJoin.IsValid()); - YQL_CLOG(INFO, ProviderKikimr) << "KiRewriteEquiJoin"; - - auto joinOptions = join.Arg(join.ArgCount() - 1).Cast<TExprList>(); - - if (rewrittenJoin.Maybe<TCoEquiJoin>()) { - auto equiJoin = rewrittenJoin.Cast<TCoEquiJoin>(); - return ctx.ChangeChild(*equiJoin.Ptr(), equiJoin.ArgCount() - 1, joinOptions.Ptr()); - } - - TExprBase joinExpr = rewrittenJoin.Cast(); - auto resultType = node.Ptr()->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); - auto renameMap = LoadJoinRenameMap(*joinOptions.Ptr()); - TCoLambda renamingLambda = BuildJoinRenameLambda(joinExpr.Pos(), renameMap, *resultType, ctx); - - return Build<TCoMap>(ctx, joinExpr.Pos()) - .Input(joinExpr) - .Lambda(renamingLambda) - .Done() - .Ptr(); -} - -} // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_range.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_range.cpp deleted file mode 100644 index cce81efc425..00000000000 --- a/ydb/core/kqp/provider/yql_kikimr_opt_range.cpp +++ /dev/null @@ -1,1046 +0,0 @@ -#include "yql_kikimr_provider_impl.h" -#include "yql_kikimr_opt_utils.h" - -#include <ydb/library/yql/core/common_opt/yql_co_sqlin.h> -#include <ydb/library/yql/core/yql_opt_utils.h> -#include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/utils/utf8.h> - -namespace NYql { -namespace { - -using namespace NNodes; -using namespace NCommon; - -struct TTakeNode { - const TExprBase Input; - const TExprBase Count; - const bool IsPartial; -}; - -TTakeNode GetTakeChildren(TExprBase node) { - if (auto maybeCoTake = node.Maybe<TCoTake>()) { - auto coTake = maybeCoTake.Cast(); - return TTakeNode {coTake.Input(), coTake.Count(), false}; - } else { - auto kiPartialTake = node.Maybe<TKiPartialTake>().Cast(); - return TTakeNode {kiPartialTake.Input(), kiPartialTake.Count(), true}; - } -} - -template<typename T> -TTableLookup::TCompareResult::TResult CompareValues(const T& left, const T& right) { - if (left == right) { - return TTableLookup::TCompareResult::Equal; - } else { - return left > right - ? TTableLookup::TCompareResult::Greater - : TTableLookup::TCompareResult::Less; - } -} - -template<typename T> -TTableLookup::TCompareResult CompareIntegralNodes(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { - T leftValue = FromString<T>(left.Ref(), slot); - T rightValue = FromString<T>(right.Ref(), slot); - auto compareResult = CompareValues(leftValue, rightValue); - - TMaybe<bool> adjacent; - switch (compareResult) { - case TTableLookup::TCompareResult::Equal: - break; - - case TTableLookup::TCompareResult::Greater: - adjacent = leftValue == rightValue + 1; - break; - - case TTableLookup::TCompareResult::Less: - adjacent = rightValue == leftValue + 1; - break; - } - - return TTableLookup::TCompareResult(compareResult, adjacent); -} - -template<typename T> -TTableLookup::TCompareResult CompareNodes(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { - T leftValue = FromString<T>(left.Ref(), slot); - T rightValue = FromString<T>(right.Ref(), slot); - return TTableLookup::TCompareResult(CompareValues(leftValue, rightValue)); -} - -template<> -TTableLookup::TCompareResult CompareNodes<bool>(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { - bool leftValue = FromString<bool>(left.Ref(), slot); - bool rightValue = FromString<bool>(right.Ref(), slot); - auto compareResult = CompareValues(leftValue, rightValue); - - return TTableLookup::TCompareResult(compareResult); -} - -template<> -TTableLookup::TCompareResult CompareNodes<ui64>(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { - return CompareIntegralNodes<ui64>(left, right, slot); -} - -template<> -TTableLookup::TCompareResult CompareNodes<i64>(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { - return CompareIntegralNodes<i64>(left, right, slot); -} - -template<> -TTableLookup::TCompareResult CompareNodes<TString>(TCoAtom left, TCoAtom right, NKikimr::NUdf::EDataSlot slot) { - Y_UNUSED(slot); - - const auto& leftValue = left.Value(); - const auto& rightValue = right.Value(); - return TTableLookup::TCompareResult(CompareValues(leftValue, rightValue)); -} - -} // namespace - -bool KiTableLookupCanCompare(TExprBase node) { - if (node.Maybe<TCoBool>()) { - return true; - } - - if (node.Maybe<TCoIntegralCtor>()) { - return true; - } - - if (node.Maybe<TCoString>()) { - return true; - } - - if (node.Maybe<TCoUtf8>()) { - return true; - } - - return false; -} - -TMaybeNode<TExprBase> KiTableLookupGetValue(TExprBase node, const TTypeAnnotationNode* type, - TExprContext& ctx) -{ - const TTypeAnnotationNode* targetType = type; - bool isTargetOptional = false; - if (type->GetKind() == ETypeAnnotationKind::Optional) { - targetType = type->Cast<TOptionalExprType>()->GetItemType(); - isTargetOptional = true; - } - - if (targetType->GetKind() != ETypeAnnotationKind::Data) { - return TMaybeNode<TExprBase>(); - } - - THashSet<const TExprNode*> knownArgs; - bool canPush = true; - VisitExpr(node.Ptr(), [&knownArgs, &canPush] (const TExprNode::TPtr& exprNode) { - auto node = TExprBase(exprNode); - - if (!canPush) { - return false; - } - - if (auto maybeLambda = node.Maybe<TCoLambda>()) { - for (const auto& arg : maybeLambda.Cast().Args()) { - knownArgs.emplace(arg.Raw()); - } - } - - if (auto maybeArg = node.Maybe<TCoArgument>()) { - if (!knownArgs.contains(maybeArg.Cast().Raw())) { - canPush = false; - return false; - } - } - - if (auto maybeCallable = node.Maybe<TCallable>()) { - auto callable = maybeCallable.Cast(); - - if (KikimrKqlFunctions().contains(callable.CallableName())) { - if (callable.Maybe<TKiSelectRow>()) { - return true; - } - - if (callable.Maybe<TKiSelectRange>()) { - return true; - } - - canPush = false; - return false; - } - } - - return true; - }); - - if (!canPush) { - return TMaybeNode<TExprBase>(); - } - - const auto& dataTypeName = targetType->Cast<TDataExprType>()->GetName(); - - TExprBase valueNode = node; - if (isTargetOptional) { - if (auto maybeJust = node.Maybe<TCoJust>()) { - valueNode = maybeJust.Cast().Input(); - } - - if (node.Maybe<TCoNothing>()) { - return Build<TCoNothing>(ctx, node.Pos()) - .OptionalType(ExpandType(node.Pos(), *type, ctx)) - .Done() - .Ptr(); - } - } - - TExprNode::TPtr literal; - if (auto maybeInt = valueNode.Maybe<TCoIntegralCtor>()) { - if (maybeInt.Cast().CallableName() == dataTypeName) { - return valueNode; - } - - if (AllowIntegralConversion(maybeInt.Cast(), false, NKikimr::NUdf::GetDataSlot(dataTypeName))) { - literal = maybeInt.Cast().Literal().Ptr(); - } - } - - if (auto maybeString = valueNode.Maybe<TCoString>()) { - if (dataTypeName == "String") { - return valueNode; - } - - if (dataTypeName == "Utf8") { - auto atom = maybeString.Cast().Literal(); - auto value = atom.Value(); - if (!IsUtf8(value)) { - return {}; - } - - literal = atom.Ptr(); - } - } - - if (auto maybeUtf8 = valueNode.Maybe<TCoUtf8>()) { - if (dataTypeName == "String" || dataTypeName == "Utf8") { - literal = maybeUtf8.Cast().Literal().Ptr(); - } - } - - if (auto maybeBool = valueNode.Maybe<TCoBool>()) { - if (dataTypeName == "Bool") { - literal = maybeBool.Cast().Literal().Ptr(); - } - } - - if (literal) { - auto ret = ctx.Builder(valueNode.Pos()) - .Callable(dataTypeName) - .Add(0, literal) - .Seal() - .Build(); - - return ret; - } - - auto valueType = valueNode.Ref().GetTypeAnn(); - if (isTargetOptional && valueType->GetKind() == ETypeAnnotationKind::Optional) { - valueType = valueType->Cast<TOptionalExprType>()->GetItemType(); - } - - if (valueType->GetKind() == ETypeAnnotationKind::Data && - valueType->Cast<TDataExprType>()->GetName() == dataTypeName) - { - return node; - } - - return Build<TCoConvert>(ctx, node.Pos()) - .Input(node) - .Type().Build(dataTypeName) - .Done(); -} - -TTableLookup::TCompareResult KiTableLookupCompare(TExprBase left, TExprBase right) { - if (left.Maybe<TCoBool>() && right.Maybe<TCoBool>()) { - return CompareNodes<bool>(left.Cast<TCoBool>().Literal(), - right.Cast<TCoBool>().Literal(), NKikimr::NUdf::EDataSlot::Bool); - } - - if (left.Maybe<TCoUint64>() && right.Maybe<TCoUint64>()) { - return CompareNodes<ui64>(left.Cast<TCoUint64>().Literal(), - right.Cast<TCoUint64>().Literal(), NKikimr::NUdf::EDataSlot::Uint64); - } - - if (left.Maybe<TCoIntegralCtor>() && right.Maybe<TCoIntegralCtor>()) { - return CompareNodes<i64>(left.Cast<TCoIntegralCtor>().Literal(), - right.Cast<TCoIntegralCtor>().Literal(), NKikimr::NUdf::EDataSlot::Int64); - } - - if (left.Maybe<TCoString>() && right.Maybe<TCoString>() || - left.Maybe<TCoUtf8>() && right.Maybe<TCoUtf8>()) - { - return CompareNodes<TString>(left.Cast<TCoDataCtor>().Literal(), - right.Cast<TCoDataCtor>().Literal(), NKikimr::NUdf::EDataSlot::String); - } - - YQL_ENSURE(false, "Unexpected nodes in Kikimr TableLookupCompare: (" << left.Ref().Content() - << ", " << right.Ref().Content() << ")"); -} - - -TExprNode::TPtr KiApplyLimitToSelectRange(TExprBase node, TExprContext& ctx) { - if (!node.Maybe<TCoTake>() && !node.Maybe<TKiPartialTake>()) { - return node.Ptr(); - } - - auto takeNode = GetTakeChildren(node); - - if (!takeNode.Input.Maybe<TCoSkip>().Input().Maybe<TKiSelectRangeBase>() && - !takeNode.Input.Maybe<TKiSelectRangeBase>()) - { - return node.Ptr(); - } - - auto maybeSkip = takeNode.Input.Maybe<TCoSkip>(); - TMaybeNode<TExprBase> limitValue; - - if (auto maybeTakeCount = takeNode.Count.Maybe<TCoUint64>()) { - ui64 totalLimit; - auto takeValue = FromString<ui64>(maybeTakeCount.Cast().Literal().Value()); - - if (maybeSkip) { - if (auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>()) { - auto skipValue = FromString<ui64>(maybeSkipCount.Cast().Literal().Value()); - totalLimit = takeValue + skipValue; - } else { - return node.Ptr(); - } - } else { - totalLimit = takeValue; - } - - limitValue = Build<TCoUint64>(ctx, node.Pos()) - .Literal<TCoAtom>() - .Value(ToString(totalLimit)) - .Build() - .Done(); - } else { - limitValue = takeNode.Count; - if (maybeSkip) { - limitValue = Build<TCoPlus>(ctx, node.Pos()) - .Left(limitValue.Cast()) - .Right(maybeSkip.Cast().Count()) - .Done(); - } - } - - YQL_ENSURE(limitValue); - - auto input = maybeSkip - ? takeNode.Input.Cast<TCoSkip>().Input() - : takeNode.Input; - - bool isIndexRange = input.Maybe<TKiSelectIndexRange>().IsValid(); - - auto select = input.Cast<TKiSelectRangeBase>(); - - if (HasSetting(select.Settings().Ref(), "ItemsLimit")) { - return node.Ptr(); - } - - auto newSettings = Build<TCoNameValueTupleList>(ctx, select.Settings().Pos()) - .Add(TVector<TExprBase>(select.Settings().begin(), select.Settings().end())) - .Add<TCoNameValueTuple>() - .Name().Build("ItemsLimit") - .Value(limitValue) - .Build() - .Done(); - - TExprNode::TPtr newSelect; - if (isIndexRange) { - newSelect = Build<TKiSelectIndexRange>(ctx, select.Pos()) - .Cluster(select.Cluster()) - .Table(select.Table()) - .Range(select.Range()) - .Select(select.Select()) - .IndexName(input.Cast<TKiSelectIndexRange>().IndexName()) - .Settings(newSettings) - .Done() - .Ptr(); - } else { - newSelect = Build<TKiSelectRange>(ctx, select.Pos()) - .Cluster(select.Cluster()) - .Table(select.Table()) - .Range(select.Range()) - .Select(select.Select()) - .Settings(newSettings) - .Done() - .Ptr(); - } - - YQL_CLOG(INFO, ProviderKikimr) << "KiApplyLimitToSelectRange"; - - if (maybeSkip) { - if (takeNode.IsPartial) { - return Build<TKiPartialTake>(ctx, node.Pos()) - .Input<TCoSkip>() - .Input(newSelect) - .Count(maybeSkip.Cast().Count()) - .Build() - .Count(takeNode.Count) - .Done().Ptr(); - } else { - return Build<TCoTake>(ctx, node.Pos()) - .Input<TCoSkip>() - .Input(newSelect) - .Count(maybeSkip.Cast().Count()) - .Build() - .Count(takeNode.Count) - .Done().Ptr(); - } - } else { - if (takeNode.IsPartial) { - return Build<TKiPartialTake>(ctx, node.Pos()) - .Input(newSelect) - .Count(takeNode.Count) - .Done().Ptr(); - - } else { - return Build<TCoTake>(ctx, node.Pos()) - .Input(newSelect) - .Count(takeNode.Count) - .Done().Ptr(); - } - } -} - -TExprNode::TPtr BuildFallbackSelectRange(TCoFlatMap oldFlatMap, - TKiSelectRangeBase select, TMaybeNode<TCoFilterNullMembers> filterNull, - TMaybeNode<TCoSkipNullMembers> skipNull, const TKikimrTableDescription& tableDesc, - TExprContext& ctx) -{ - TKikimrKeyRange range(ctx, tableDesc); - TExprNode::TPtr input; - - input = Build<TKiSelectRange>(ctx, select.Pos()) - .Cluster(select.Cluster()) - .Table(select.Table()) - .Range(range.ToRangeExpr(select, ctx)) - .Select(select.Select()) - .Settings(select.Settings()) - .Done() - .Ptr(); - - if (filterNull) { - input = Build<TCoFilterNullMembers>(ctx, oldFlatMap.Pos()) - .Input(input) - .Members(filterNull.Cast().Members()) - .Done() - .Ptr(); - } - - if (skipNull) { - input = Build<TCoSkipNullMembers>(ctx, oldFlatMap.Pos()) - .Input(input) - .Members(skipNull.Cast().Members()) - .Done() - .Ptr(); - } - - return Build<TCoFlatMap>(ctx, oldFlatMap.Pos()) - .Input(input) - .Lambda(oldFlatMap.Lambda()) - .Done() - .Ptr(); -} - -TExprNode::TPtr KiPushPredicateToSelectRange(TExprBase node, TExprContext& ctx, - const TKikimrTablesData& tablesData, const TKikimrConfiguration& config) -{ - if (!node.Maybe<TCoFlatMap>()) { - return node.Ptr(); - } - auto flatmap = node.Cast<TCoFlatMap>(); - - if (!IsPredicateFlatMap(flatmap.Lambda().Body().Ref())) { - return node.Ptr(); - } - - TMaybeNode<TKiSelectRangeBase> select; - TMaybeNode<TCoFilterNullMembers> filterNull; - TMaybeNode<TCoSkipNullMembers> skipNull; - TMaybeNode<TCoAtom> indexTable; - - if (auto maybeRange = flatmap.Input().Maybe<TKiSelectRangeBase>()) { - select = maybeRange.Cast(); - } - - if (auto maybeRange = flatmap.Input().Maybe<TCoFilterNullMembers>().Input().Maybe<TKiSelectRangeBase>()) { - select = maybeRange.Cast(); - filterNull = flatmap.Input().Cast<TCoFilterNullMembers>(); - } - - if (auto maybeRange = flatmap.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKiSelectRangeBase>()) { - select = maybeRange.Cast(); - skipNull = flatmap.Input().Cast<TCoSkipNullMembers>(); - } - - if (!select) { - return node.Ptr(); - } - - if (auto indexSelect = select.Maybe<TKiSelectIndexRange>()) { - indexTable = indexSelect.Cast().IndexName(); - } - - auto selectRange = select.Cast(); - const auto& cluster = TString(selectRange.Cluster()); - - const auto& lookupTableName = indexTable ? TString(indexTable.Cast()) : TString(selectRange.Table().Path()); - auto& lookupTableDesc = tablesData.ExistingTable(cluster, lookupTableName); - - if (!TKikimrKeyRange::IsFull(selectRange.Range())) { - return node.Ptr(); - } - - if (indexTable) { - bool needDataRead = false; - for (const auto& col : selectRange.Select()) { - if (!lookupTableDesc.Metadata->Columns.contains(TString(col.Value()))) { - needDataRead = true; - break; - } - } - - if (!needDataRead) { - // All selected data present in index table - // apply this optimizer again after TKiSelectIndexRange -> TKiSelectRange rewriting - return node.Ptr(); - } - } - - auto row = flatmap.Lambda().Args().Arg(0); - auto predicate = TExprBase(flatmap.Lambda().Body().Ref().ChildPtr(0)); - TTableLookup lookup = ExtractTableLookup(row, predicate, lookupTableDesc.Metadata->KeyColumnNames, - &KiTableLookupGetValue, &KiTableLookupCanCompare, &KiTableLookupCompare, ctx, - config.HasAllowNullCompareInIndex()); - - auto& dataTableDesc = tablesData.ExistingTable(cluster, TString(selectRange.Table().Path())); - - if (lookup.IsFullScan()) { - // WARNING: Sometimes we want to check index table usage here. - // Ok, but we must be aware about other optimizations. - // Example SqlIn using secondary index rewriten to EquiJoin on - // KiSelectIndexRange. The problem is if predicate contains also - // "AND non_index_fiels" clause - // (WHERE secondary_key IN (...) AND non_index == ) - // check for lookup.IsFullScan() returns true here because - // EquiJoin optimization has bot been made yet. - // See KIKIMR-11041 for more examples and consider KiRewriteSelectIndexRange method - - return node.Ptr(); - } - - TVector<TExprBase> fetches; - - for (auto& keyRange : lookup.GetKeyRanges()) { - TExprBase predicate = keyRange.GetResidualPredicate() - ? keyRange.GetResidualPredicate().Cast() - : Build<TCoBool>(ctx, node.Pos()) - .Literal().Build("true") - .Done(); - - auto newBody = ctx.ChangeChild(flatmap.Lambda().Body().Ref(), 0, predicate.Ptr()); - - TExprNode::TPtr input; - - if (indexTable) { - if (!keyRange.IsEquiRange()) { - input = TKikimrKeyRange::BuildIndexReadRangeExpr(lookupTableDesc, keyRange, selectRange.Select(), - config.HasAllowNullCompareInIndex(), dataTableDesc, ctx).Ptr(); - } else { - TCoAtomList columnsToSelect = BuildKeyColumnsList(dataTableDesc, node.Pos(), ctx); - - input = TKikimrKeyRange::BuildReadRangeExpr(lookupTableDesc, keyRange, columnsToSelect, - config.HasAllowNullCompareInIndex(), ctx).Ptr(); - const auto& fetchItemArg = Build<TCoArgument>(ctx, node.Pos()) - .Name("fetchItem") - .Done(); - - input = Build<TCoFlatMap>(ctx, node.Pos()) - .Input(input) - .Lambda() - .Args(fetchItemArg) - .Body<TKiSelectRow>() - .Cluster(selectRange.Cluster()) - .Table(selectRange.Table()) - .Key(ExtractNamedKeyTuples(fetchItemArg, dataTableDesc, ctx)) - .Select(selectRange.Select()) - .Build() - .Build() - .Done() - .Ptr(); - } - } else { - input = TKikimrKeyRange::BuildReadRangeExpr(lookupTableDesc, keyRange, selectRange.Select(), - config.HasAllowNullCompareInIndex(), ctx).Ptr(); - } - - if (filterNull) { - input = Build<TCoFilterNullMembers>(ctx, node.Pos()) - .Input(input) - .Members(filterNull.Cast().Members()) - .Done() - .Ptr(); - } - - if (skipNull) { - input = Build<TCoSkipNullMembers>(ctx, node.Pos()) - .Input(input) - .Members(skipNull.Cast().Members()) - .Done() - .Ptr(); - } - - auto fetch = Build<TCoFlatMap>(ctx, node.Pos()) - .Input(input) - .Lambda() - .Args({"item"}) - .Body<TExprApplier>() - .Apply(TExprBase(newBody)) - .With(flatmap.Lambda().Args().Arg(0), "item") - .Build() - .Build() - .Done(); - - fetches.push_back(fetch); - } - - YQL_CLOG(INFO, ProviderKikimr) << "KiPushPredicateToSelectRange"; - return Build<TCoExtend>(ctx, node.Pos()) - .Add(fetches) - .Done() - .Ptr(); -} - -TExprNode::TPtr KiApplyExtractMembersToSelectRow(TExprBase node, TExprContext& ctx) { - if (!node.Maybe<TCoExtractMembers>().Input().Maybe<TKiSelectRow>()) { - return node.Ptr(); - } - - auto extract = node.Cast<TCoExtractMembers>(); - auto selectRow = extract.Input().Cast<TKiSelectRow>(); - auto input = extract.Input(); - - YQL_CLOG(INFO, ProviderKikimr) << "KiApplyExtractMembersToSelectRow"; - return Build<TKiSelectRow>(ctx, node.Pos()) - .Cluster(selectRow.Cluster()) - .Table(selectRow.Table()) - .Key(selectRow.Key()) - .Select(extract.Members()) - .Done().Ptr(); -} - -TKikimrKeyRange::TKikimrKeyRange(TExprContext& ctx, const TKikimrTableDescription& table) - : Table(table) - , KeyRange(ctx, Table.Metadata->KeyColumnNames.size(), TMaybeNode<TExprBase>()) -{} - -TKikimrKeyRange::TKikimrKeyRange(const TKikimrTableDescription& table, const TKeyRange& keyRange) - : Table(table) - , KeyRange(keyRange) -{ - YQL_ENSURE(Table.Metadata->KeyColumnNames.size() == KeyRange.GetColumnRangesCount()); -} - -bool TKikimrKeyRange::IsFull(TExprList list) { - for (auto node : list) { - if (auto maybeColumnRange = node.Maybe<TKiColumnRangeTuple>()) { - auto columnRange = maybeColumnRange.Cast(); - - if (!columnRange.From().Maybe<TCoNothing>()) { - return false; - } - - if (!columnRange.To().Maybe<TCoVoid>()) { - return false; - } - } - - if (auto maybeAtom = node.Maybe<TCoAtom>()) { - auto atom = maybeAtom.Cast(); - - if (atom.Value() != "IncFrom" && atom.Value() != "IncTo") { - return false; - } - } - } - - return true; -} - -TMaybe<TKeyRange> TKikimrKeyRange::GetPointKeyRange(TExprContext& ctx, const TKikimrTableDescription& table, TExprList range) { - size_t keyColumnsCount = table.Metadata->KeyColumnNames.size(); - TVector<TMaybeNode<TExprBase>> fromValues(keyColumnsCount); - TVector<TMaybeNode<TExprBase>> toValues(keyColumnsCount); - bool incFrom = false; - bool incTo = false; - - for (auto node : range) { - if (auto maybeColumnRange = node.Maybe<TKiColumnRangeTuple>()) { - auto columnRange = maybeColumnRange.Cast(); - - auto idx = table.GetKeyColumnIndex(TString(columnRange.Column())); - YQL_ENSURE(idx); - - fromValues[*idx] = columnRange.From(); - toValues[*idx] = columnRange.To(); - } - - if (auto maybeAtom = node.Maybe<TCoAtom>()) { - auto value = maybeAtom.Cast().Value(); - - if (value == "IncFrom") { - incFrom = true; - } - - if (value == "IncTo") { - incTo = true; - } - } - } - - if (!incFrom || !incTo) { - return {}; - } - - TVector<TColumnRange> columnRanges(keyColumnsCount); - for (size_t i = 0; i < keyColumnsCount; ++i) { - YQL_ENSURE(fromValues[i]); - YQL_ENSURE(toValues[i]); - - if (!fromValues[i].Maybe<TCoVoid>() && !toValues[i].Maybe<TCoVoid>()) { - if (fromValues[i].Cast().Raw() != toValues[i].Cast().Raw()) { - return {}; - } - - columnRanges[i] = TColumnRange::MakePoint(fromValues[i].Cast()); - YQL_ENSURE(i == 0 || columnRanges[i - 1].IsPoint()); - } else { - if (!fromValues[i].Maybe<TCoNothing>() || !toValues[i].Maybe<TCoVoid>()) { - return {}; - } - } - } - - return TKeyRange(ctx, columnRanges, {}); -} - -TExprList TKikimrKeyRange::ToRangeExpr(TExprBase owner, TExprContext& ctx) { - auto pInf = [owner, &ctx] () -> TExprBase { - return Build<TCoVoid>(ctx, owner.Pos()).Done(); - }; - - auto nInf = [owner, &ctx] (const TTypeAnnotationNode* type) -> TExprBase { - /* required optional type for TCoNothing */ - if (type->GetKind() != ETypeAnnotationKind::Optional) { - type = ctx.MakeType<TOptionalExprType>(type); - } - return Build<TCoNothing>(ctx, owner.Pos()) - .OptionalType(BuildTypeExpr(owner.Pos(), *type, ctx)) - .Done(); - }; - - bool fromInclusive = true; - bool toInclusive = true; - TVector<TExprBase> columnRanges; - - for (size_t i = 0; i < KeyRange.GetColumnRangesCount(); ++i) { - const auto& column = Table.Metadata->KeyColumnNames[i]; - const auto& range = KeyRange.GetColumnRange(i); - - auto type = Table.GetColumnType(column); - YQL_ENSURE(type); - - TMaybeNode<TExprBase> from; - TMaybeNode<TExprBase> to; - - if (range.GetFrom().IsDefined()) { - fromInclusive = range.GetFrom().IsInclusive(); - from = range.GetFrom().GetValue(); - } else { - from = fromInclusive ? nInf(type) : pInf(); - } - - if (range.GetTo().IsDefined()) { - toInclusive = range.GetTo().IsInclusive(); - to = range.GetTo().GetValue(); - } else { - to = toInclusive ? pInf() : nInf(type); - } - - auto rangeExpr = Build<TKiColumnRangeTuple>(ctx, owner.Pos()) - .Column().Build(column) - .From(from.Cast()) - .To(to.Cast()) - .Done(); - - columnRanges.push_back(rangeExpr); - } - - return Build<TExprList>(ctx, owner.Pos()) - .Add<TCoAtom>() - .Value(fromInclusive ? "IncFrom" : "ExcFrom") - .Build() - .Add<TCoAtom>() - .Value(toInclusive ? "IncTo" : "ExcTo") - .Build() - .Add(columnRanges) - .Done(); -} - -static TVector<TExprNodePtr> GetSkipNullKeys(const NCommon::TKeyRange& keyRange, - const TKikimrTableDescription& tableDesc, const NYql::TPositionHandle& pos, TExprContext& ctx) -{ - TVector<TExprNodePtr> skipNullKeys; - for (size_t i = 0; i < keyRange.GetColumnRangesCount(); ++i) { - const auto& column = tableDesc.Metadata->KeyColumnNames[i]; - auto& range = keyRange.GetColumnRange(i); - if (range.IsDefined() && !range.IsNull()) { - skipNullKeys.push_back(ctx.NewAtom(pos, column)); - } - } - return skipNullKeys; -} - -NNodes::TExprBase TKikimrKeyRange::BuildReadRangeExpr(const TKikimrTableDescription& tableDesc, - const NCommon::TKeyRange& keyRange, NNodes::TCoAtomList select, bool allowNulls, - TExprContext& ctx) -{ - YQL_ENSURE(tableDesc.Metadata); - TString cluster = tableDesc.Metadata->Cluster; - - const auto versionedTable = BuildVersionedTable(*tableDesc.Metadata, select.Pos(), ctx); - - if (keyRange.IsEquiRange()) { - TVector<TExprBase> columnTuples; - for (size_t i = 0; i < keyRange.GetColumnRangesCount(); ++i) { - const auto& column = tableDesc.Metadata->KeyColumnNames[i]; - auto& range = keyRange.GetColumnRange(i); - - auto tuple = Build<TCoNameValueTuple>(ctx, select.Pos()) - .Name().Build(column) - .Value(range.GetFrom().GetValue()) - .Done(); - - columnTuples.push_back(tuple); - } - - return Build<TCoToList>(ctx, select.Pos()) - .Optional<TKiSelectRow>() - .Cluster().Build(cluster) - .Table(versionedTable) - .Key<TCoNameValueTupleList>() - .Add(columnTuples) - .Build() - .Select(select) - .Build() - .Done(); - } else { - TVector<TExprNodePtr> skipNullKeys; - if (!allowNulls) { - skipNullKeys = GetSkipNullKeys(keyRange, tableDesc, select.Pos(), ctx); - } - - TVector<TCoNameValueTuple> settings; - if (!skipNullKeys.empty()) { - auto setting = Build<TCoNameValueTuple>(ctx, select.Pos()) - .Name().Build("SkipNullKeys") - .Value<TCoAtomList>() - .Add(skipNullKeys) - .Build() - .Done(); - - settings.push_back(setting); - } - - TKikimrKeyRange tableKeyRange(tableDesc, keyRange); - - return Build<TKiSelectRange>(ctx, select.Pos()) - .Cluster().Build(cluster) - .Table(versionedTable) - .Range(tableKeyRange.ToRangeExpr(select, ctx)) - .Select(select) - .Settings() - .Add(settings) - .Build() - .Done(); - } -} - -NNodes::TExprBase TKikimrKeyRange::BuildIndexReadRangeExpr(const TKikimrTableDescription& lookupTableDesc, - const NCommon::TKeyRange& keyRange, NNodes::TCoAtomList select, bool allowNulls, - const TKikimrTableDescription& dataTableDesc, TExprContext& ctx) -{ - YQL_ENSURE(lookupTableDesc.Metadata); - YQL_ENSURE(dataTableDesc.Metadata); - TString cluster = lookupTableDesc.Metadata->Cluster; - - const auto versionedTable = BuildVersionedTable(*dataTableDesc.Metadata, select.Pos(), ctx); - - YQL_ENSURE(!keyRange.IsEquiRange()); - - TVector<TExprNodePtr> skipNullKeys; - if (!allowNulls) { - skipNullKeys = GetSkipNullKeys(keyRange, lookupTableDesc, select.Pos(), ctx); - } - - TVector<TCoNameValueTuple> settings; - if (!skipNullKeys.empty()) { - auto setting = Build<TCoNameValueTuple>(ctx, select.Pos()) - .Name().Build("SkipNullKeys") - .Value<TCoAtomList>() - .Add(skipNullKeys) - .Build() - .Done(); - - settings.push_back(setting); - } - - TKikimrKeyRange tableKeyRange(lookupTableDesc, keyRange); - - return Build<TKiSelectIndexRange>(ctx, select.Pos()) - .Cluster().Build(cluster) - .Table(versionedTable) - .Range(tableKeyRange.ToRangeExpr(select, ctx)) - .Select(select) - .Settings() - .Add(settings) - .Build() - .IndexName() - .Value(lookupTableDesc.Metadata->Name) - .Build() - .Done(); -} - -TExprNode::TPtr KiSqlInToEquiJoin(NNodes::TExprBase node, const TKikimrTablesData& tablesData, - const TKikimrConfiguration& config, TExprContext& ctx) -{ - if (config.HasOptDisableSqlInToJoin()) { - return node.Ptr(); - } - - if (!node.Maybe<TCoFlatMap>()) { - return node.Ptr(); - } - auto flatMap = node.Cast<TCoFlatMap>(); - - // SqlIn expected to be rewritten to (FlatMap <collection> (OptionalIf ...)) - // or (FlatMap <collection> (FlatListIf ...)) - if (!flatMap.Lambda().Body().Maybe<TCoOptionalIf>() && !flatMap.Lambda().Body().Maybe<TCoFlatListIf>()) { - return node.Ptr(); - } - - if (!flatMap.Input().Maybe<TKiSelectRangeBase>()) { - return node.Ptr(); - } - - auto selectRange = flatMap.Input().Cast<TKiSelectRangeBase>(); - - TMaybeNode<TCoAtom> indexTable; - if (auto indexSelect = selectRange.Maybe<TKiSelectIndexRange>()) { - indexTable = indexSelect.Cast().IndexName(); - } - - // retrieve selected ranges - const TStringBuf lookupTable = indexTable ? indexTable.Cast().Value() : selectRange.Table().Path().Value(); - const TKikimrTableDescription& tableDesc = tablesData.ExistingTable(selectRange.Cluster().Value(), lookupTable); - auto selectKeyRange = TKikimrKeyRange::GetPointKeyRange(ctx, tableDesc, selectRange.Range()); - if (!selectKeyRange) { - return node.Ptr(); - } - - // check which key prefixes are used (and only with points) - TVector<TStringBuf> keys; // remaining key parts, that can be used in SqlIn (only in asc order) - for (size_t idx = 0; idx < selectKeyRange->GetColumnRanges().size(); ++idx) { - const auto& columnRange = selectKeyRange->GetColumnRange(idx); - if (columnRange.IsDefined()) { - if (!keys.empty()) { - return node.Ptr(); - } - if (columnRange.IsPoint()) { - continue; - } - return node.Ptr(); - } - keys.emplace_back(tableDesc.Metadata->KeyColumnNames[idx]); - } - if (keys.empty()) { - return node.Ptr(); - } - - auto flatMapLambdaArg = flatMap.Lambda().Args().Arg(0); - - auto findMemberIndexInKeys = [&keys](const TCoArgument& flatMapLambdaArg, const TCoMember& member) { - if (member.Struct().Raw() != flatMapLambdaArg.Raw()) { - return -1; - } - for (size_t i = 0; i < keys.size(); ++i) { - if (member.Name().Value() == keys[i]) { - return (int) i; - } - } - return -1; - }; - - auto shouldConvertSqlInToJoin = [&flatMapLambdaArg, &findMemberIndexInKeys](const TCoSqlIn& sqlIn, bool negated) { - if (negated) { - // negated can't be rewritten to the index-lookup, so skip it - return false; - } - - // validate key prefix - if (sqlIn.Lookup().Maybe<TCoMember>()) { - if (findMemberIndexInKeys(flatMapLambdaArg, sqlIn.Lookup().Cast<TCoMember>()) != 0) { - return false; - } - } else if (sqlIn.Lookup().Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Tuple) { - auto children = sqlIn.Lookup().Ref().ChildrenList(); - TVector<int> usedKeyIndexes{Reserve(children.size())}; - for (const auto& itemPtr : children) { - TExprBase item{itemPtr}; - if (!item.Maybe<TCoMember>()) { - return false; - } - int keyIndex = findMemberIndexInKeys(flatMapLambdaArg, item.Cast<TCoMember>()); - if (keyIndex >= 0) { - usedKeyIndexes.push_back(keyIndex); - } - } - if (usedKeyIndexes.empty()) { - return false; - } - ::Sort(usedKeyIndexes); - for (size_t i = 0; i < usedKeyIndexes.size(); ++i) { - if (usedKeyIndexes[i] != (int) i) { - return false; - } - } - } else { - return false; - } - - return CanRewriteSqlInToEquiJoin(sqlIn.Lookup().Ref().GetTypeAnn(), sqlIn.Collection().Ref().GetTypeAnn()); - }; - - const bool prefixOnly = true; - if (auto ret = TryConvertSqlInPredicatesToJoins(flatMap, shouldConvertSqlInToJoin, ctx, prefixOnly)) { - YQL_CLOG(INFO, ProviderKikimr) << "KiSqlInToEquiJoin"; - return ret; - } - - return node.Ptr(); -} - -} // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_utils.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_utils.cpp deleted file mode 100644 index 912dfca53c3..00000000000 --- a/ydb/core/kqp/provider/yql_kikimr_opt_utils.cpp +++ /dev/null @@ -1,72 +0,0 @@ -#include "yql_kikimr_opt_utils.h" - -namespace NYql { - -using namespace NNodes; -using namespace NKikimr; -using namespace NKikimr::NUdf; - -bool GetEquiJoinKeyTypes(TExprBase leftInput, const TString& leftColumnName, - const TKikimrTableDescription& rightTable, const TString& rightColumnName, - const TDataExprType*& leftData, const TDataExprType*& rightData) -{ - auto rightType = rightTable.GetColumnType(rightColumnName); - YQL_ENSURE(rightType); - if (rightType->GetKind() == ETypeAnnotationKind::Optional) { - rightType = rightType->Cast<TOptionalExprType>()->GetItemType(); - } - - YQL_ENSURE(rightType->GetKind() == ETypeAnnotationKind::Data); - rightData = rightType->Cast<TDataExprType>(); - - auto leftInputType = leftInput.Ref().GetTypeAnn(); - YQL_ENSURE(leftInputType); - YQL_ENSURE(leftInputType->GetKind() == ETypeAnnotationKind::List); - auto itemType = leftInputType->Cast<TListExprType>()->GetItemType(); - YQL_ENSURE(itemType->GetKind() == ETypeAnnotationKind::Struct); - auto structType = itemType->Cast<TStructExprType>(); - auto memberIndex = structType->FindItem(leftColumnName); - YQL_ENSURE(memberIndex, "Column '" << leftColumnName << "' not found in " << *((TTypeAnnotationNode*) structType)); - - auto leftType = structType->GetItems()[*memberIndex]->GetItemType(); - if (leftType->GetKind() == ETypeAnnotationKind::Optional) { - leftType = leftType->Cast<TOptionalExprType>()->GetItemType(); - } - - if (leftType->GetKind() != ETypeAnnotationKind::Data) { - return false; - } - - leftData = leftType->Cast<TDataExprType>(); - return true; -} - -bool CanRewriteSqlInToEquiJoin(const TTypeAnnotationNode* lookupType, const TTypeAnnotationNode* collectionType) { - // SqlIn in Dict - if (collectionType->GetKind() == ETypeAnnotationKind::Dict) { - return IsDataOrOptionalOfData(lookupType); - } - - // SqlIn in List<DataType> or List<Tuple<DataType...>> - if (collectionType->GetKind() == ETypeAnnotationKind::List) { - auto collectionItemType = collectionType->Cast<TListExprType>()->GetItemType(); - - if (collectionItemType->GetKind() == ETypeAnnotationKind::Tuple) { - if (lookupType->GetKind() != ETypeAnnotationKind::Tuple) { - return false; - } - auto lookupItems = lookupType->Cast<TTupleExprType>()->GetItems(); - auto collectionItems = collectionItemType->Cast<TTupleExprType>()->GetItems(); - if (lookupItems.size() != collectionItems.size()) { - return false; - } - return AllOf(collectionItems, [](const auto& item) { return IsDataOrOptionalOfData(item); }); - } - - return IsDataOrOptionalOfData(collectionItemType); - } - - return false; -} - -} // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_utils.h b/ydb/core/kqp/provider/yql_kikimr_opt_utils.h deleted file mode 100644 index 219a36f5e83..00000000000 --- a/ydb/core/kqp/provider/yql_kikimr_opt_utils.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include "yql_kikimr_provider.h" - -namespace NYql { - -bool GetEquiJoinKeyTypes(NNodes::TExprBase leftInput, const TString& leftColumnName, - const TKikimrTableDescription& rightTable, const TString& rightColumnName, - const TDataExprType*& leftData, const TDataExprType*& rightData); - -bool CanRewriteSqlInToEquiJoin(const TTypeAnnotationNode* lookupType, const TTypeAnnotationNode* collectionType); - -} // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index 43b39ee0e85..0e4f11db3f1 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -38,7 +38,6 @@ struct TKikimrData { DataSourceNames.insert(TKiReadTableScheme::CallableName()); DataSourceNames.insert(TKiReadTableList::CallableName()); - DataSinkNames.insert(TKiClusterConfig::CallableName()); DataSinkNames.insert(TKiWriteTable::CallableName()); DataSinkNames.insert(TKiUpdateTable::CallableName()); DataSinkNames.insert(TKiDeleteTable::CallableName()); @@ -55,19 +54,6 @@ struct TKikimrData { DataSinkNames.insert(TKiExecDataQuery::CallableName()); DataSinkNames.insert(TKiEffects::CallableName()); - KqlNames.insert(TKiSelectRow::CallableName()); - KqlNames.insert(TKiSelectRange::CallableName()); - KqlNames.insert(TKiSelectIndexRange::CallableName()); - KqlNames.insert(TKiUpdateRow::CallableName()); - KqlNames.insert(TKiEraseRow::CallableName()); - KqlNames.insert(TKiSetResult::CallableName()); - KqlNames.insert(TKiMapParameter::CallableName()); - KqlNames.insert(TKiFlatMapParameter::CallableName()); - KqlNames.insert(TKiPartialSort::CallableName()); - KqlNames.insert(TKiPartialTake::CallableName()); - KqlNames.insert(TKiRevertIf::CallableName()); - KqlNames.insert(TKiAbortIf::CallableName()); - CommitModes.insert(CommitModeFlush); CommitModes.insert(CommitModeRollback); CommitModes.insert(CommitModeScheme); @@ -385,20 +371,9 @@ bool TKikimrKey::Extract(const TExprNode& key) { return true; } -NNodes::TKiVersionedTable BuildVersionedTable(const TKikimrTableMetadata& metadata, TPositionHandle pos, TExprContext& ctx) { - return Build<TKiVersionedTable>(ctx, pos) - .Path().Build(metadata.Name) - .SchemaVersion().Build(ToString(metadata.SchemaVersion)) - .PathId().Build(metadata.PathId.ToString()) - .Done(); -} - -NNodes::TCoAtomList BuildColumnsList( - const TKikimrTableDescription& table, - TPositionHandle pos, - TExprContext& ctx, - bool withSystemColumns -) { +TCoAtomList BuildColumnsList(const TKikimrTableDescription& table, TPositionHandle pos, + TExprContext& ctx, bool withSystemColumns) +{ TVector<TExprBase> columnsToSelect; for (const auto& pair : table.Metadata->Columns) { auto atom = Build<TCoAtom>(ctx, pos) @@ -423,68 +398,6 @@ NNodes::TCoAtomList BuildColumnsList( .Done(); } -NNodes::TCoAtomList BuildKeyColumnsList(const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx) { - TVector<TExprBase> columnsToSelect; - columnsToSelect.reserve(table.Metadata->KeyColumnNames.size()); - for (auto key : table.Metadata->KeyColumnNames) { - auto value = table.Metadata->Columns.at(key); - auto atom = Build<TCoAtom>(ctx, pos) - .Value(value.Name) - .Done(); - - columnsToSelect.push_back(atom); - } - - return Build<TCoAtomList>(ctx, pos) - .Add(columnsToSelect) - .Done(); -} - -NNodes::TCoAtomList MergeColumns(const NNodes::TCoAtomList& col1, const TVector<TString>& col2, TExprContext& ctx) { - TVector<TCoAtom> columns; - THashSet<TString> uniqColumns; - columns.reserve(col1.Size() + col2.size()); - - for (const auto& c : col1) { - YQL_ENSURE(uniqColumns.emplace(c.StringValue()).second); - columns.push_back(c); - } - - for (const auto& c : col2) { - if (uniqColumns.emplace(c).second) { - auto atom = Build<TCoAtom>(ctx, col1.Pos()) - .Value(c) - .Done(); - columns.push_back(atom); - } - } - - return Build<TCoAtomList>(ctx, col1.Pos()) - .Add(columns) - .Done(); -} - -TCoNameValueTupleList ExtractNamedKeyTuples(TCoArgument itemArg, const TKikimrTableDescription& tableDesc, - TExprContext& ctx, const TString& tablePrefix) -{ - TVector<TExprBase> keyTuples; - for (TString& keyColumnName : tableDesc.Metadata->KeyColumnNames) { - auto tuple = Build<TCoNameValueTuple>(ctx, itemArg.Pos()) - .Name().Build(keyColumnName) - .Value<TCoMember>() - .Struct(itemArg) - .Name().Build(tablePrefix.empty() ? keyColumnName : TString::Join(tablePrefix, ".", keyColumnName)) - .Build() - .Done(); - - keyTuples.push_back(tuple); - } - - return Build<TCoNameValueTupleList>(ctx, itemArg.Pos()) - .Add(keyTuples) - .Done(); -} - TVector<NKqpProto::TKqpTableOp> TableOperationsToProto(const TCoNameValueTupleList& operations, TExprContext& ctx) { TVector<NKqpProto::TKqpTableOp> protoOps; for (const auto& op : operations) { diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index a5cc6e66fb9..352ea6ff4bb 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -2,7 +2,6 @@ #include "yql_kikimr_gateway.h" #include "yql_kikimr_settings.h" -#include "yql_kikimr_query_traits.h" #include <ydb/library/yql/ast/yql_gc_nodes.h> #include <ydb/library/yql/core/yql_type_annotation.h> @@ -155,10 +154,6 @@ struct TKikimrQueryContext : TThrRefBase, TTimeAndRandomProvider { // full mode can be enabled explicitly. bool DocumentApiRestricted = true; - // Force NewEngine stuff - // remove it after enabling NewEngine - std::optional<NKikimr::NKqp::TQueryTraits> QueryTraits; - std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery; std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery; TKikimrParamsMap Parameters; @@ -178,7 +173,6 @@ struct TKikimrQueryContext : TThrRefBase, TTimeAndRandomProvider { Deadlines = {}; Limits = {}; - QueryTraits.reset(); PreparingQuery.reset(); PreparedQuery.reset(); Parameters.clear(); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index 4c3682082a9..ca775e2e1cf 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -27,7 +27,6 @@ public: TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; private: - virtual TStatus HandleClusterConfig(NNodes::TKiClusterConfig, TExprContext& ctx) = 0; virtual TStatus HandleWriteTable(NNodes::TKiWriteTable node, TExprContext& ctx) = 0; virtual TStatus HandleUpdateTable(NNodes::TKiUpdateTable node, TExprContext& ctx) = 0; virtual TStatus HandleDeleteTable(NNodes::TKiDeleteTable node, TExprContext& ctx) = 0; @@ -106,27 +105,6 @@ struct TKiExecDataQuerySettings { static TKiExecDataQuerySettings Parse(NNodes::TKiExecDataQuery exec); }; -class TKikimrKeyRange { -public: - TKikimrKeyRange(TExprContext& ctx, const TKikimrTableDescription& table); - TKikimrKeyRange(const TKikimrTableDescription& table, const NCommon::TKeyRange& keyRange); - - static bool IsFull(NNodes::TExprList list); - static TMaybe<NCommon::TKeyRange> GetPointKeyRange(TExprContext& ctx, const TKikimrTableDescription& table, NNodes::TExprList range); - static NNodes::TExprBase BuildReadRangeExpr(const TKikimrTableDescription& tableDesc, - const NCommon::TKeyRange& keyRange, NNodes::TCoAtomList select, bool allowNulls, - TExprContext& ctx); - static NNodes::TExprBase BuildIndexReadRangeExpr(const TKikimrTableDescription& lookupTableDesc, - const NCommon::TKeyRange& keyRange, NNodes::TCoAtomList select, bool allowNulls, - const TKikimrTableDescription& dataTableDesc, TExprContext& ctx); - - NNodes::TExprList ToRangeExpr(NNodes::TExprBase owner, TExprContext& ctx); - -private: - const TKikimrTableDescription& Table; - NCommon::TKeyRange KeyRange; -}; - template<typename TResult> class TKikimrFutureResult : public IKikimrAsyncResult<TResult> { public: @@ -180,7 +158,8 @@ TAutoPtr<IGraphTransformer> CreateKiSourceTypeAnnotationTransformer(TIntrusivePt TTypeAnnotationContext& types); TAutoPtr<IGraphTransformer> CreateKiSinkTypeAnnotationTransformer(TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx); -TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types); +TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx, + TTypeAnnotationContext& types); TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx); TAutoPtr<IGraphTransformer> CreateKiSourceLoadTableMetadataTransformer(TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx); @@ -197,33 +176,8 @@ TAutoPtr<IGraphTransformer> CreateKiSinkCallableExecutionTransformer( TAutoPtr<IGraphTransformer> CreateKiSinkPlanInfoTransformer(TIntrusivePtr<IKikimrQueryExecutor> queryExecutor); -NNodes::TMaybeNode<NNodes::TExprBase> TranslateToMkql(NNodes::TExprBase node, TExprContext& ctx, - const TMaybe<TString>& rtParamName); - -NNodes::TExprBase UnwrapKiReadTableValues(NNodes::TExprBase input, const TKikimrTableDescription& tableDesc, - NNodes::TCoAtomList columns, TExprContext& ctx); - -NNodes::TKiVersionedTable BuildVersionedTable(const TKikimrTableMetadata& metadata, TPositionHandle pos, TExprContext& ctx); -NNodes::TCoAtomList BuildColumnsList( - const TKikimrTableDescription& table, - TPositionHandle pos, - TExprContext& ctx, - bool withSystemColumns = false -); -NNodes::TCoAtomList BuildKeyColumnsList( - const TKikimrTableDescription& table, - TPositionHandle pos, - TExprContext& ctx -); - -NNodes::TCoAtomList MergeColumns(const NNodes::TCoAtomList& col1, const TVector<TString>& col2, TExprContext& ctx); - -bool IsKqlPureExpr(NNodes::TExprBase expr); -bool IsKqlPureLambda(NNodes::TCoLambda lambda); -bool IsKeySelectorPkPrefix(NNodes::TCoLambda lambda, const TKikimrTableDescription& desc, TVector<TString>* columns); - -NNodes::TCoNameValueTupleList ExtractNamedKeyTuples(NNodes::TCoArgument arg, - const TKikimrTableDescription& desc, TExprContext& ctx, const TString& tablePrefix = TString()); +NNodes::TCoAtomList BuildColumnsList(const TKikimrTableDescription& table, TPositionHandle pos, + TExprContext& ctx, bool withSystemColumns); const TTypeAnnotationNode* GetReadTableRowType(TExprContext& ctx, const TKikimrTablesData& tablesData, const TString& cluster, const TString& table, NNodes::TCoAtomList select, bool withSystemColumns = false); @@ -232,34 +186,18 @@ NKikimrKqp::EIsolationLevel GetIsolationLevel(const TMaybe<TString>& isolationLe TMaybe<TString> GetIsolationLevel(const NKikimrKqp::EIsolationLevel& isolationLevel); TYdbOperation GetTableOp(const NNodes::TKiWriteTable& write); -TVector<NKqpProto::TKqpTableOp> TableOperationsToProto(const NNodes::TCoNameValueTupleList& operations, TExprContext& ctx); +TVector<NKqpProto::TKqpTableOp> TableOperationsToProto(const NNodes::TCoNameValueTupleList& operations, + TExprContext& ctx); TVector<NKqpProto::TKqpTableOp> TableOperationsToProto(const NNodes::TKiOperationList& operations, TExprContext& ctx); -void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperation op, NProtoBuf::RepeatedPtrField<NKqpProto::TKqpTableInfo>& infos); -void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperation op, TVector<NKqpProto::TKqpTableInfo>& infos); - -NNodes::TExprBase DeduplicateByMembers(const NNodes::TExprBase& expr, const TSet<TString>& members, TExprContext& ctx, - TPositionHandle pos); +void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperation op, + NProtoBuf::RepeatedPtrField<NKqpProto::TKqpTableInfo>& infos); +void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperation op, + TVector<NKqpProto::TKqpTableInfo>& infos); // Optimizer rules TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx); TExprNode::TPtr KiBuildResult(NNodes::TExprBase node, const TString& cluster, TExprContext& ctx); -TExprNode::TPtr KiApplyLimitToSelectRange(NNodes::TExprBase node, TExprContext& ctx); -TExprNode::TPtr KiPushPredicateToSelectRange(NNodes::TExprBase node, TExprContext& ctx, - const TKikimrTablesData& tablesData, const TKikimrConfiguration& config); -TExprNode::TPtr KiApplyExtractMembersToSelectRow(NNodes::TExprBase node, TExprContext& ctx); -TExprNode::TPtr KiRewriteEquiJoin(NNodes::TExprBase node, const TKikimrTablesData& tablesData, - const TKikimrConfiguration& config, TExprContext& ctx); -TExprNode::TPtr KiSqlInToEquiJoin(NNodes::TExprBase node, const TKikimrTablesData& tablesData, - const TKikimrConfiguration& config, TExprContext& ctx); - -bool KiTableLookupCanCompare(NNodes::TExprBase node); -NNodes::TMaybeNode<NNodes::TExprBase> KiTableLookupGetValue(NNodes::TExprBase node, const TTypeAnnotationNode* type, - TExprContext& ctx); -NCommon::TTableLookup::TCompareResult KiTableLookupCompare(NNodes::TExprBase left, NNodes::TExprBase right); - -NNodes::TKiProgram BuildKiProgram(NNodes::TKiDataQuery query, const TKikimrTablesData& tablesData, TExprContext& ctx, - bool withSystemColumns); const THashSet<TStringBuf>& KikimrDataSourceFunctions(); const THashSet<TStringBuf>& KikimrDataSinkFunctions(); diff --git a/ydb/core/kqp/provider/yql_kikimr_query_traits.cpp b/ydb/core/kqp/provider/yql_kikimr_query_traits.cpp deleted file mode 100644 index 8a6056d5498..00000000000 --- a/ydb/core/kqp/provider/yql_kikimr_query_traits.cpp +++ /dev/null @@ -1,44 +0,0 @@ -#include "yql_kikimr_query_traits.h" -#include "yql_kikimr_expr_nodes.h" - -#include <ydb/library/yql/ast/yql_expr.h> -#include <ydb/library/yql/core/yql_expr_optimize.h> -#include <ydb/library/yql/utils/log/log.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYql; - -TQueryTraits CollectQueryTraits(const NNodes::TExprBase& program, TExprContext&) { - using namespace NNodes; - - TQueryTraits traits; - - VisitExpr(program.Ptr(), [&traits](const TExprNode::TPtr& node) { - const auto* x = node.Get(); - - if (TKiUpdateRow::Match(x) || TKiEraseRow::Match(x) || TKiDeleteTable::Match(x) || TKiEffects::Match(x)) { - traits.ReadOnly = 0; - } else if (TKiSelectIndexRange::Match(x)) { - traits.WithIndex = 1; - } else if (TCoEquiJoin::Match(x) || TCoJoin::Match(x) || TCoMapJoinCore::Match(x) || TCoJoinDict::Match(x)) { - traits.WithJoin = 1; - } else if (TCoSqlIn::Match(x)) { - traits.WithSqlIn = 1; - } else if (TCoUdf::Match(x)) { - traits.WithUdf = 1; - } else if (auto selectRange = TMaybeNode<TKiSelectRange>(node)) { - if (selectRange.Cast().Table().Path().Value().ends_with("/indexImplTable"sv)) { - traits.WithIndex = 1; - } - } - - return true; - }); - - return traits; -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/provider/yql_kikimr_query_traits.h b/ydb/core/kqp/provider/yql_kikimr_query_traits.h deleted file mode 100644 index 26f83cdb0bf..00000000000 --- a/ydb/core/kqp/provider/yql_kikimr_query_traits.h +++ /dev/null @@ -1,31 +0,0 @@ -#pragma once - -#include <util/string/builder.h> - -namespace NYql { -struct TExprContext; -namespace NNodes { -class TExprBase; -} // namespace NNodes -} // namespace NYql - -namespace NKikimr { -namespace NKqp { - -struct TQueryTraits { - ui32 ReadOnly:1 = 1; - ui32 WithJoin:1 = 0; - ui32 WithSqlIn:1 = 0; - ui32 WithIndex:1 = 0; - ui32 WithUdf:1 = 0; - - TString ToString() const { - return TStringBuilder() << "{ro: " << ReadOnly << ", join: " << WithJoin << ", sqlIn: " << WithSqlIn - << ", index: " << WithIndex << ", udf: " << WithUdf << '}'; - } -}; - -TQueryTraits CollectQueryTraits(const NYql::NNodes::TExprBase& program, NYql::TExprContext& ctx); - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 8e84a6e969b..2be0482632b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -269,19 +269,6 @@ public: , SessionCtx(sessionCtx) {} private: - virtual TStatus HandleClusterConfig(TKiClusterConfig node, TExprContext& ctx) override { - if (!EnsureTuple(node.GrpcData().Ref(), ctx)) { - return TStatus::Error; - } - - if (!EnsureAtom(node.TvmId().Ref(), ctx)) { - return TStatus::Error; - } - - node.Ptr()->SetTypeAnn(ctx.MakeType<TUnitExprType>()); - return TStatus::Ok; - } - virtual TStatus HandleWriteTable(TKiWriteTable node, TExprContext& ctx) override { if (!EnsureWorldType(node.World().Ref(), ctx)) { return TStatus::Error; @@ -1284,151 +1271,6 @@ private: } virtual TStatus HandleKql(TCallable node, TExprContext& ctx) override { - bool sysColumnsEnabled = SessionCtx->Config().SystemColumnsEnabled(); - if (auto call = node.Maybe<TKiSelectRow>()) { - auto selectRow = call.Cast(); - - auto selectType = GetReadTableRowType(ctx, SessionCtx->Tables(), TString(selectRow.Cluster()), - TString(selectRow.Table().Path()), selectRow.Select(), sysColumnsEnabled); - if (!selectType) { - return TStatus::Error; - } - - auto optSelectType = ctx.MakeType<TOptionalExprType>(selectType); - - node.Ptr()->SetTypeAnn(optSelectType); - - return TStatus::Ok; - } - - if (auto call = node.Maybe<TKiSelectRangeBase>()) { - auto selectRange = call.Cast(); - - auto selectType = GetReadTableRowType(ctx, SessionCtx->Tables(), TString(selectRange.Cluster()), - TString(selectRange.Table().Path()), selectRange.Select(), sysColumnsEnabled); - if (!selectType) { - return TStatus::Error; - } - - auto listSelectType = ctx.MakeType<TListExprType>(selectType); - - node.Ptr()->SetTypeAnn(listSelectType); - - return TStatus::Ok; - } - - if (node.Maybe<TKiUpdateRow>()) { - node.Ptr()->SetTypeAnn(ctx.MakeType<TVoidExprType>()); - - return TStatus::Ok; - } - - if (node.Maybe<TKiEraseRow>()) { - node.Ptr()->SetTypeAnn(ctx.MakeType<TVoidExprType>()); - - return TStatus::Ok; - } - - if (node.Maybe<TKiSetResult>()) { - node.Ptr()->SetTypeAnn(ctx.MakeType<TVoidExprType>()); - - return TStatus::Ok; - } - - if (auto maybeMap = node.Maybe<TKiMapParameter>()) { - auto map = maybeMap.Cast(); - - if (!EnsureArgsCount(map.Ref(), 2, ctx)) { - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureListType(map.Input().Ref(), ctx)) { - return IGraphTransformer::TStatus::Error; - } - - auto& lambda = map.Ptr()->ChildRef(TKiMapParameter::idx_Lambda); - auto itemType = map.Input().Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType(); - if (!UpdateLambdaAllArgumentsTypes(lambda, {itemType}, ctx)) { - return IGraphTransformer::TStatus::Error; - } - - if (!lambda->GetTypeAnn()) { - return IGraphTransformer::TStatus::Repeat; - } - - map.Ptr()->SetTypeAnn(ctx.MakeType<TListExprType>(lambda->GetTypeAnn())); - - return TStatus::Ok; - } - - if (auto maybeMap = node.Maybe<TKiFlatMapParameter>()) { - auto map = maybeMap.Cast(); - - if (!EnsureArgsCount(map.Ref(), 2, ctx)) { - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureListType(map.Input().Ref(), ctx)) { - return IGraphTransformer::TStatus::Error; - } - - auto& lambda = map.Ptr()->ChildRef(TKiFlatMapParameter::idx_Lambda); - auto itemType = map.Input().Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType(); - if (!UpdateLambdaAllArgumentsTypes(lambda, {itemType}, ctx)) { - return IGraphTransformer::TStatus::Error; - } - - if (!lambda->GetTypeAnn()) { - return IGraphTransformer::TStatus::Repeat; - } - - auto retKind = lambda->GetTypeAnn()->GetKind(); - if (retKind != ETypeAnnotationKind::List) { - ctx.AddError(TIssue(ctx.GetPosition(lambda->Pos()), TStringBuilder() << "Expected list as labmda return type, but got: " << *lambda->GetTypeAnn())); - return IGraphTransformer::TStatus::Error; - } - - map.Ptr()->SetTypeAnn(lambda->GetTypeAnn()); - - return TStatus::Ok; - } - - if (node.Maybe<TKiPartialSort>()) { - NTypeAnnImpl::TContext typeAnnCtx(ctx); - TExprNode::TPtr output; - return NTypeAnnImpl::SortWrapper(node.Ptr(), output, typeAnnCtx); - } - - if (node.Maybe<TKiPartialTake>()) { - NTypeAnnImpl::TContext typeAnnCtx(ctx); - TExprNode::TPtr output; - return NTypeAnnImpl::TakeWrapper(node.Ptr(), output, typeAnnCtx); - } - - if (auto maybeCondEffect = node.Maybe<TKiConditionalEffect>()) { - auto condEffect = maybeCondEffect.Cast(); - - if (!EnsureDataType(condEffect.Predicate().Ref(), ctx)) { - return IGraphTransformer::TStatus::Error; - } - - auto predicateType = condEffect.Predicate().Ref().GetTypeAnn()->Cast<TDataExprType>(); - YQL_ENSURE(predicateType); - - if (predicateType->GetSlot() != EDataSlot::Bool) { - ctx.AddError(TIssue(ctx.GetPosition(condEffect.Pos()), "Expected bool as predicate type")); - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureListOfVoidType(condEffect.Effect().Ref(), ctx)) { - return IGraphTransformer::TStatus::Error; - } - - condEffect.Ptr()->SetTypeAnn(condEffect.Effect().Ref().GetTypeAnn()); - - return TStatus::Ok; - } - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Unknown Kql callable in type annotation: " << node.CallableName())); |