diff options
author | vvvv <vvvv@yandex-team.ru> | 2022-03-18 04:32:25 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.ru> | 2022-03-18 04:32:25 +0300 |
commit | 375c6518aaf3387692ecce32a26e88443606ae3c (patch) | |
tree | 97120351d69b54c3ffe40e3cf29101b63204a807 | |
parent | d14fc69b3bfe40c19e4f7e80ba7f6ba8009ce220 (diff) | |
download | ydb-375c6518aaf3387692ecce32a26e88443606ae3c.tar.gz |
YQL-13710 initial implementation of aggregations (without context), support of traits without merge
ref:c28054a44aa19784cbb64db3d143f82a12563fcb
-rw-r--r-- | ydb/library/yql/ast/yql_expr.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_simple1.cpp | 71 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_core.cpp | 246 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_list.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.cpp | 36 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_aggregate.cpp | 8 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_utils.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_node.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_program_builder.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_catalog/catalog.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_catalog/catalog.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 30 |
16 files changed, 340 insertions, 75 deletions
diff --git a/ydb/library/yql/ast/yql_expr.cpp b/ydb/library/yql/ast/yql_expr.cpp index e91a95adff..128b9bac2f 100644 --- a/ydb/library/yql/ast/yql_expr.cpp +++ b/ydb/library/yql/ast/yql_expr.cpp @@ -3021,6 +3021,7 @@ const TListExprType* TMakeTypeImpl<TListExprType>::Make(TExprContext& ctx, const } const TOptionalExprType* TMakeTypeImpl<TOptionalExprType>::Make(TExprContext& ctx, const TTypeAnnotationNode* itemType) { + Y_ENSURE(itemType->GetKind() != ETypeAnnotationKind::Pg); const auto hash = TOptionalExprType::MakeHash(itemType); TOptionalExprType sample(hash, itemType); if (const auto found = FindType(sample, ctx)) diff --git a/ydb/library/yql/core/common_opt/CMakeLists.txt b/ydb/library/yql/core/common_opt/CMakeLists.txt index 0ec215ab77..5e8019bb47 100644 --- a/ydb/library/yql/core/common_opt/CMakeLists.txt +++ b/ydb/library/yql/core/common_opt/CMakeLists.txt @@ -16,6 +16,7 @@ target_link_libraries(yql-core-common_opt PUBLIC yutil library-yql-core yql-core-expr_nodes + yql-parser-pg_catalog ) target_sources(yql-core-common_opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 1fc8746898..dff33a9155 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -9,6 +9,7 @@ #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/parser/pg_catalog/catalog.h> #include <util/generic/map.h> #include <util/generic/bitmap.h> @@ -6692,30 +6693,52 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { TNodeOnNodeOwnedMap deepClones; TExprNode::TListType payloadItems; for (ui32 i = 0; i < aggs.size(); ++i) { - const auto& exports = exportsPtr->Symbols(); auto func = aggs[i].first->Head().Content(); - if (func == "count" && aggs[i].first->ChildrenSize() == 1) { - func = "count_all"; - } + TExprNode::TPtr traits; + if (optCtx.Types->PgTypes) { + auto arg = ctx.NewArgument(node->Pos(), "row"); + auto arguments = ctx.NewArguments(node->Pos(), { arg }); + TExprNode::TListType aggFuncArgs; + for (ui32 j = 1; j < aggs[i].first->ChildrenSize(); ++j) { + aggFuncArgs.push_back(ctx.ReplaceNode(aggs[i].first->ChildPtr(j), *aggs[i].second, arg)); + } - TString factory = TString(func) + "_traits_factory"; - const auto ex = exports.find(factory); - YQL_ENSURE(exports.cend() != ex); - auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false); - auto arg = ctx.NewArgument(node->Pos(), "row"); - auto arguments = ctx.NewArguments(node->Pos(), { arg }); - auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments), - ctx.ReplaceNode(aggs[i].first->TailPtr(), *aggs[i].second, arg)); - - auto traits = ctx.ReplaceNodes(lambda->TailPtr(), { - {lambda->Head().Child(0), listTypeNode}, - {lambda->Head().Child(1), extractor} - }); - - ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas); - auto status = ExpandApply(traits, traits, ctx); - if (status == IGraphTransformer::TStatus::Error) { - return {}; + auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments), std::move(aggFuncArgs)); + + traits = ctx.Builder(node->Pos()) + .Callable("PgAggregationTraits") + .Atom(0, func) + .Callable(1, "ListItemType") + .Add(0, listTypeNode) + .Seal() + .Add(2, extractor) + .Seal() + .Build(); + } else { + const auto& exports = exportsPtr->Symbols(); + if (func == "count" && aggs[i].first->ChildrenSize() == 1) { + func = "count_all"; + } + + TString factory = TString(func) + "_traits_factory"; + const auto ex = exports.find(factory); + YQL_ENSURE(exports.cend() != ex); + auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false); + auto arg = ctx.NewArgument(node->Pos(), "row"); + auto arguments = ctx.NewArguments(node->Pos(), { arg }); + auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments), + ctx.ReplaceNode(aggs[i].first->TailPtr(), *aggs[i].second, arg)); + + traits = ctx.ReplaceNodes(lambda->TailPtr(), { + {lambda->Head().Child(0), listTypeNode}, + {lambda->Head().Child(1), extractor} + }); + + ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas); + auto status = ExpandApply(traits, traits, ctx); + if (status == IGraphTransformer::TStatus::Error) { + return {}; + } } payloadItems.push_back(ctx.Builder(node->Pos()) @@ -6759,7 +6782,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { .Atom(1, "_yql_agg_" + ToString(it->second)) .Seal() .Build(); - if (node->Head().Content() == "count") { + if (!optCtx.Types->PgTypes && node->Head().Content() == "count") { ret = ctx.Builder(node->Pos()) .Callable("SafeCast") .Add(0, ret) @@ -7073,7 +7096,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { .Seal() .Build(); - if (node->Head().Content() == "row_number" || node->Head().Content() == "count") { + if (!optCtx.Types->PgTypes && (node->Head().Content() == "row_number" || node->Head().Content() == "count")) { ret = ctx.Builder(node->Pos()) .Callable("SafeCast") .Add(0, ret) diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 8cbacdfa7a..32ffc0fe9e 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -908,7 +908,9 @@ namespace NTypeAnnImpl { } input->SetTypeAnn(structType->GetItems()[*pos]->GetItemType()); - if (isOptional && input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional && input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Null) { + if (isOptional && input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional && + input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Null && + input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) { input->SetTypeAnn(ctx.Expr.MakeType<TOptionalExprType>(input->GetTypeAnn())); } @@ -8797,40 +8799,6 @@ template <NKikimr::NUdf::EDataSlot DataSlot> } }; - bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, TPositionHandle pos, TExprContext& ctx) { - pgType = 0; - if (type->GetKind() == ETypeAnnotationKind::Null) { - return true; - } - - if (type->GetKind() == ETypeAnnotationKind::Data || type->GetKind() == ETypeAnnotationKind::Optional) { - const TTypeAnnotationNode* unpacked = RemoveOptionalType(type); - if (unpacked->GetKind() != ETypeAnnotationKind::Data) { - ctx.AddError(TIssue(ctx.GetPosition(pos), - "Nested optional type is not compatible to PG")); - return IGraphTransformer::TStatus::Error; - } - - auto slot = unpacked->Cast<TDataExprType>()->GetSlot(); - auto convertedTypeId = ConvertToPgType(slot); - if (!convertedTypeId) { - ctx.AddError(TIssue(ctx.GetPosition(pos), - TStringBuilder() << "Type is not compatible to PG: " << slot)); - return false; - } - - pgType = *convertedTypeId; - return true; - } else if (type->GetKind() != ETypeAnnotationKind::Pg) { - ctx.AddError(TIssue(ctx.GetPosition(pos), - TStringBuilder() << "Expected PG type, but got: " << type->GetKind())); - return false; - } else { - pgType = type->Cast<TPgExprType>()->GetId(); - return true; - } - } - IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { bool isResolved = input->Content() == "PgResolvedCall"; if (!EnsureMinArgsCount(*input, isResolved ? 2 : 1, ctx.Expr)) { @@ -9701,6 +9669,213 @@ template <NKikimr::NUdf::EDataSlot DataSlot> return IGraphTransformer::TStatus::Ok; } + IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + if (!EnsureArgsCount(*input, 3, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureAtom(*input->Child(0), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto func = input->Child(0)->Content(); + if (!EnsureType(*input->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto itemType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + auto& lambda = input->ChildRef(2); + auto convertStatus = ConvertToLambda(lambda, ctx.Expr, 1); + if (convertStatus.Level != IGraphTransformer::TStatus::Ok) { + return convertStatus; + } + + if (!UpdateLambdaAllArgumentsTypes(lambda, { itemType }, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto lambdaResult = lambda->GetTypeAnn(); + if (!lambdaResult) { + return IGraphTransformer::TStatus::Repeat; + } + + TVector<ui32> argTypes; + for (ui32 i = 1; i < lambda->ChildrenSize(); ++i) { + auto type = lambda->Child(i)->GetTypeAnn(); + ui32 argType; + if (!ExtractPgType(type, argType, lambda->Child(i)->Pos(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + argTypes.push_back(argType); + } + + const auto& aggDesc = NPg::LookupAggregation(func, argTypes); + if (aggDesc.Kind != NPg::EAggKind::Normal) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + "Only normal aggregation supported")); + return IGraphTransformer::TStatus::Error; + } + + auto idLambda = ctx.Expr.Builder(input->Pos()) + .Lambda() + .Param("state") + .Arg("state") + .Seal() + .Build(); + + auto saveLambda = idLambda; + auto loadLambda = idLambda; + auto finishLambda = idLambda; + if (aggDesc.FinalFuncId) { + finishLambda = ctx.Expr.Builder(input->Pos()) + .Lambda() + .Param("state") + .Callable("PgResolvedCall") + .Atom(0, NPg::LookupProc(aggDesc.FinalFuncId).Name) + .Atom(1, ToString(aggDesc.FinalFuncId)) + .Arg(2, "state") + .Seal() + .Seal() + .Build(); + } + + auto nullValue = ctx.Expr.NewCallable(input->Pos(), "Null", {}); + auto initValue = nullValue; + if (aggDesc.InitValue) { + initValue = ctx.Expr.Builder(input->Pos()) + .Callable("PgCast") + .Callable(0, "PgConst") + .Callable(0, "PgType") + .Atom(0, "text") + .Seal() + .Atom(1, aggDesc.InitValue) + .Seal() + .Callable(1, "PgType") + .Atom(0, NPg::LookupType(aggDesc.TransTypeId).Name) + .Seal() + .Seal() + .Build(); + } + + const auto& transFuncDesc = NPg::LookupProc(aggDesc.TransFuncId); + // use first non-null value as state if transFunc is strict + bool searchNonNullForState = false; + if (transFuncDesc.IsStrict && !aggDesc.InitValue) { + Y_ENSURE(argTypes.size() == 1); + searchNonNullForState = true; + } + + TExprNode::TPtr initLambda, updateLambda; + if (!searchNonNullForState) { + initLambda = ctx.Expr.Builder(input->Pos()) + .Lambda() + .Param("row") + .Callable("PgResolvedCall") + .Atom(0, transFuncDesc.Name) + .Atom(1, ToString(aggDesc.TransFuncId)) + .Add(2, initValue) + .Apply(3, lambda) + .With(0, "row") + .Seal() + .Seal() + .Seal() + .Build(); + + updateLambda = ctx.Expr.Builder(input->Pos()) + .Lambda() + .Param("row") + .Param("state") + .Callable("Coalesce") + .Callable(0, "PgResolvedCall") + .Atom(0, transFuncDesc.Name) + .Atom(1, ToString(aggDesc.TransFuncId)) + .Callable(2, "Coalesce") + .Arg(0, "state") + .Add(1, initValue) + .Seal() + .Apply(3, lambda) + .With(0, "row") + .Seal() + .Seal() + .Arg(1, "state") + .Seal() + .Seal() + .Build(); + } else { + initLambda = ctx.Expr.Builder(input->Pos()) + .Lambda() + .Param("row") + .Apply(lambda) + .With(0, "row") + .Seal() + .Seal() + .Build(); + + updateLambda = ctx.Expr.Builder(input->Pos()) + .Lambda() + .Param("row") + .Param("state") + .Callable("If") + .Callable(0, "Exists") + .Arg(0, "state") + .Seal() + .Callable(1, "Coalesce") + .Callable(0, "PgResolvedCall") + .Atom(0, transFuncDesc.Name) + .Atom(1, ToString(aggDesc.TransFuncId)) + .Arg(2, "state") + .Apply(3, lambda) + .With(0, "row") + .Seal() + .Seal() + .Arg(1, "state") + .Seal() + .Apply(2, lambda) + .With(0, "row") + .Seal() + .Seal() + .Seal() + .Build(); + } + + auto mergeLambda = ctx.Expr.Builder(input->Pos()) + .Lambda() + .Param("state1") + .Param("state2") + .Callable("Void") + .Seal() + .Seal() + .Build(); + + auto zero = ctx.Expr.Builder(input->Pos()) + .Callable("PgConst") + .Callable(0, "PgType") + .Atom(0, "int8") + .Seal() + .Atom(1, "0") + .Seal() + .Build(); + + auto defaultValue = (func != "count") ? nullValue : zero; + + auto typeNode = ExpandType(input->Pos(), *itemType, ctx.Expr); + output = ctx.Expr.Builder(input->Pos()) + .Callable("AggregationTraits") + .Add(0, typeNode) + .Add(1, initLambda) + .Add(2, updateLambda) + .Add(3, saveLambda) + .Add(4, loadLambda) + .Add(5, mergeLambda) + .Add(6, finishLambda) + .Add(7, defaultValue) + .Seal() + .Build(); + + return IGraphTransformer::TStatus::Repeat; + } + IGraphTransformer::TStatus PgTypeWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { Y_UNUSED(output); if (!EnsureArgsCount(*input, 1, ctx.Expr)) { @@ -13436,6 +13611,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["PgConst"] = &PgConstWrapper; Functions["PgType"] = &PgTypeWrapper; Functions["PgCast"] = &PgCastWrapper; + Functions["PgAggregationTraits"] = &PgAggregationTraitsWrapper; Functions["AutoDemuxList"] = &AutoDemuxListWrapper; Functions["AggrCountInit"] = &AggrCountInitWrapper; Functions["AggrCountUpdate"] = &AggrCountUpdateWrapper; diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index 92420a574c..568cf0ad32 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -4264,7 +4264,7 @@ namespace { return IGraphTransformer::TStatus::Repeat; } - if (!IsSameAnnotation(*lambdaMerge->GetTypeAnn(), *reduceStateType)) { + if (!lambdaMerge->Tail().IsCallable("Void") && !IsSameAnnotation(*lambdaMerge->GetTypeAnn(), *reduceStateType)) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(lambdaUpdate->Pos()), TStringBuilder() << "Mismatch merge lambda result type, expected: " << *reduceStateType << ", but got: " << *lambdaMerge->GetTypeAnn())); return IGraphTransformer::TStatus::Error; @@ -4581,7 +4581,8 @@ namespace { } else { auto defVal = child->Child(1)->Child(7); if (defVal->IsCallable("Null") && !isOptional && !isHopping && input->Child(1)->ChildrenSize() == 0) { - if (finishType->GetKind() != ETypeAnnotationKind::Null) { + if (finishType->GetKind() != ETypeAnnotationKind::Null && + finishType->GetKind() != ETypeAnnotationKind::Pg) { finishType = ctx.Expr.MakeType<TOptionalExprType>(finishType); } } else if (!defVal->IsCallable("Null") && defVal->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp index 2ac3fe7f95..223cd1e847 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.cpp +++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp @@ -2,6 +2,7 @@ #include "yql_opt_proposed_by_data.h" #include "yql_opt_rewrite_io.h" #include "yql_opt_utils.h" +#include "yql_pg_utils.h" #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/minikql/dom/json.h> @@ -4961,4 +4962,39 @@ bool IsCallableTypeHasStreams(const TCallableExprType* callableType) { return false; } +bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, TPositionHandle pos, TExprContext& ctx) { + pgType = 0; + if (type->GetKind() == ETypeAnnotationKind::Null) { + return true; + } + + if (type->GetKind() == ETypeAnnotationKind::Data || type->GetKind() == ETypeAnnotationKind::Optional) { + const TTypeAnnotationNode* unpacked = RemoveOptionalType(type); + if (unpacked->GetKind() != ETypeAnnotationKind::Data) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + "Nested optional type is not compatible to PG")); + return IGraphTransformer::TStatus::Error; + } + + auto slot = unpacked->Cast<TDataExprType>()->GetSlot(); + auto convertedTypeId = ConvertToPgType(slot); + if (!convertedTypeId) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + TStringBuilder() << "Type is not compatible to PG: " << slot)); + return false; + } + + pgType = *convertedTypeId; + return true; + } else if (type->GetKind() != ETypeAnnotationKind::Pg) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + TStringBuilder() << "Expected PG type, but got: " << type->GetKind())); + return false; + } else { + pgType = type->Cast<TPgExprType>()->GetId(); + return true; + } +} + + } // NYql diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h index 9316dfcacf..ad217ab595 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.h +++ b/ydb/library/yql/core/yql_expr_type_annotation.h @@ -280,5 +280,6 @@ std::optional<ui32> GetFieldPosition(const TTupleExprType& tupleType, const TStr std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TStringBuf& field); bool IsCallableTypeHasStreams(const TCallableExprType* callableType); +bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, TPositionHandle pos, TExprContext& ctx); } diff --git a/ydb/library/yql/core/yql_opt_aggregate.cpp b/ydb/library/yql/core/yql_opt_aggregate.cpp index 226339b8c5..5ea30e3340 100644 --- a/ydb/library/yql/core/yql_opt_aggregate.cpp +++ b/ydb/library/yql/core/yql_opt_aggregate.cpp @@ -26,6 +26,14 @@ TExprNode::TPtr ExpandAggregate(const TExprNode::TPtr& node, TExprContext& ctx, TExprNode::TPtr sortOrder = voidNode; bool effectiveCompact = forceCompact || HasSetting(*settings, "compact"); + for (ui32 index = 0; index < aggregatedColumns->ChildrenSize(); ++index) { + auto trait = aggregatedColumns->Child(index)->Child(1); + auto mergeLambda = trait->Child(5); + if (mergeLambda->Tail().IsCallable("Void")) { + effectiveCompact = true; + break; + } + } const TStructExprType* originalRowType = GetSeqItemType(node->Head().GetTypeAnn())->Cast<TStructExprType>(); TVector<const TItemExprType*> rowItems = originalRowType->GetItems(); diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index 00cba774ae..2c3c52ad24 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -1281,7 +1281,8 @@ TExprNode::TPtr OptimizeExists(const TExprNode::TPtr& node, TExprContext& ctx) return MakeBool<false>(node->Pos(), ctx); } - if (node->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional) { + if (node->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional && + node->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) { YQL_CLOG(DEBUG, Core) << node->Content() << " over non-optional"; return MakeBool<true>(node->Pos(), ctx); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp index 826646a505..a11c05f023 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp @@ -64,7 +64,7 @@ IComputationNode* WrapCoalesce(TCallable& callable, const TComputationNodeFactor bool isLeftOptional = false; const auto& leftType = UnpackOptional(callable.GetInput(0), isLeftOptional); - MKQL_ENSURE(isLeftOptional, "Expected optional"); + MKQL_ENSURE(isLeftOptional || leftType->IsPg(), "Expected optional or pg"); bool isRightOptional = false; if (!leftType->IsSameType(*callable.GetInput(1).GetStaticType())) { diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp index 52a6f56140..3706381103 100644 --- a/ydb/library/yql/minikql/mkql_node.cpp +++ b/ydb/library/yql/minikql/mkql_node.cpp @@ -1017,6 +1017,7 @@ TOptionalType::TOptionalType(TType* itemType, const TTypeEnvironment& env, bool } TOptionalType* TOptionalType::Create(TType* itemType, const TTypeEnvironment& env) { + MKQL_ENSURE(!itemType->IsPg(), "PG type can't be wrapped into optional type"); return ::new(env.Allocate<TOptionalType>()) TOptionalType(itemType, env); } diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 8caa18ad40..e1bc8ed11d 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -348,7 +348,7 @@ TRuntimeNode TProgramBuilder::Member(TRuntimeNode structObj, const std::string_v const auto type = AS_TYPE(TStructType, UnpackOptional(structObj.GetStaticType(), isOptional)); const auto memberIndex = type->GetMemberIndex(memberName); auto memberType = type->GetMemberType(memberIndex); - if (isOptional && !memberType->IsOptional() && !memberType->IsNull()) { + if (isOptional && !memberType->IsOptional() && !memberType->IsNull() && !memberType->IsPg()) { memberType = NewOptionalType(memberType); } @@ -2146,7 +2146,7 @@ TRuntimeNode TProgramBuilder::NewVariant(TRuntimeNode item, const std::string_vi TRuntimeNode TProgramBuilder::Coalesce(TRuntimeNode data, TRuntimeNode defaultData) { bool isOptional = false; const auto dataType = UnpackOptional(data, isOptional); - if (!isOptional) { + if (!isOptional && !data.GetStaticType()->IsPg()) { MKQL_ENSURE(data.GetStaticType()->IsSameType(*defaultData.GetStaticType()), "Mismatch operand types"); return data; } @@ -2685,7 +2685,7 @@ TRuntimeNode TProgramBuilder::Exists(TRuntimeNode data) { return NewDataLiteral(false); } - if (!nodeType->IsOptional()) { + if (!nodeType->IsOptional() && !nodeType->IsPg()) { return NewDataLiteral(true); } diff --git a/ydb/library/yql/parser/pg_catalog/catalog.cpp b/ydb/library/yql/parser/pg_catalog/catalog.cpp index 0007f8869d..1dc8aa8d56 100644 --- a/ydb/library/yql/parser/pg_catalog/catalog.cpp +++ b/ydb/library/yql/parser/pg_catalog/catalog.cpp @@ -454,6 +454,8 @@ public: } else { ythrow yexception() << "Unknown aggkind value: " << value; } + } else if (key == "agginitval") { + LastAggregation.InitValue = value; } } diff --git a/ydb/library/yql/parser/pg_catalog/catalog.h b/ydb/library/yql/parser/pg_catalog/catalog.h index 76178e470c..4a67d4414d 100644 --- a/ydb/library/yql/parser/pg_catalog/catalog.h +++ b/ydb/library/yql/parser/pg_catalog/catalog.h @@ -73,6 +73,7 @@ struct TAggregateDesc { ui32 CombineFuncId = 0; ui32 SerializeFuncId = 0; ui32 DeserializeFuncId = 0; + TString InitValue; }; const TProcDesc& LookupProc(const TString& name, const TVector<ui32>& argTypeIds); diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt index 5b49fcb41b..92125b02d4 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt @@ -94,6 +94,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC yql-minikql-computation yql-parser-pg_catalog library-yql-core + providers-common-codec library-cpp-yson contrib-libs-icu contrib-libs-libc_compat diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index df5c9944c3..c60ac089c4 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -104,7 +104,7 @@ Datum PointerDatumFromPod(const NUdf::TUnboxedValuePod& value, bool isVar) { struct TPAllocLeakGuard { TPAllocLeakGuard() { - Y_ENSURE(!PAllocList); + PrevList = PAllocList; PAllocList = &Root; Root.Next = &Root; Root.Prev = &Root; @@ -118,9 +118,11 @@ struct TPAllocLeakGuard { current = next; } - PAllocList = nullptr; + Y_VERIFY(PAllocList == &Root); + PAllocList = PrevList; } + TPAllocListItem* PrevList; TPAllocListItem Root; }; @@ -239,8 +241,14 @@ public: NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const { SET_MEMORY_CONTEXT; - if (TypeId == INT4OID) { + if (TypeId == INT2OID) { + return ScalarDatumToPod(Int16GetDatum(FromString<i16>(Value))); + } else if (TypeId == INT4OID) { return ScalarDatumToPod(Int32GetDatum(FromString<i32>(Value))); + } else if (TypeId == INT8OID) { + return ScalarDatumToPod(Int64GetDatum(FromString<i64>(Value))); + } else if (TypeId == FLOAT4OID) { + return ScalarDatumToPod(Float4GetDatum(FromString<float>(Value))); } else if (TypeId == FLOAT8OID) { return ScalarDatumToPod(Float8GetDatum(FromString<double>(Value))); } else if (TypeId == TEXTOID) { @@ -311,6 +319,7 @@ public: , RetTypeDesc(NPg::LookupType(ProcDesc.ResultType)) { Zero(FInfo); + Y_ENSURE(Id); fmgr_info(Id, &FInfo); Y_ENSURE(!FInfo.fn_retset); Y_ENSURE(FInfo.fn_addr); @@ -456,6 +465,7 @@ public: } } + Y_ENSURE(funcId); fmgr_info(funcId, &FInfo1); Y_ENSURE(!FInfo1.fn_retset); Y_ENSURE(FInfo1.fn_addr); @@ -467,6 +477,7 @@ public: } if (funcId2) { + Y_ENSURE(funcId2); fmgr_info(funcId2, &FInfo2); Y_ENSURE(!FInfo2.fn_retset); Y_ENSURE(FInfo2.fn_addr); @@ -896,9 +907,9 @@ void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::T SET_MEMORY_CONTEXT; TPAllocLeakGuard leakGuard; const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - Y_ENSURE(typeInfo.SendFuncId); FmgrInfo finfo; Zero(finfo); + Y_ENSURE(typeInfo.SendFuncId); fmgr_info(typeInfo.SendFuncId, &finfo); Y_ENSURE(!finfo.fn_retset); Y_ENSURE(finfo.fn_addr); @@ -985,6 +996,7 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v const auto& typeInfo = NPg::LookupType(type->GetTypeId()); FmgrInfo finfo; Zero(finfo); + Y_ENSURE(typeInfo.OutFuncId); fmgr_info(typeInfo.OutFuncId, &finfo); Y_ENSURE(!finfo.fn_retset); Y_ENSURE(finfo.fn_addr); @@ -1078,9 +1090,9 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) { const auto& typeInfo = NPg::LookupType(type->GetTypeId()); auto typeIOParam = MakeTypeIOParam(typeInfo); - Y_ENSURE(typeInfo.ReceiveFuncId); FmgrInfo finfo; Zero(finfo); + Y_ENSURE(typeInfo.ReceiveFuncId); fmgr_info(typeInfo.ReceiveFuncId, &finfo); Y_ENSURE(!finfo.fn_retset); Y_ENSURE(finfo.fn_addr); @@ -1190,9 +1202,9 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm const auto& typeInfo = NPg::LookupType(type->GetTypeId()); auto typeIOParam = MakeTypeIOParam(typeInfo); - Y_ENSURE(typeInfo.ReceiveFuncId); FmgrInfo finfo; Zero(finfo); + Y_ENSURE(typeInfo.ReceiveFuncId); fmgr_info(typeInfo.ReceiveFuncId, &finfo); Y_ENSURE(!finfo.fn_retset); Y_ENSURE(finfo.fn_addr); @@ -1282,9 +1294,9 @@ void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxe SET_MEMORY_CONTEXT; TPAllocLeakGuard leakGuard; const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - Y_ENSURE(typeInfo.SendFuncId); FmgrInfo finfo; Zero(finfo); + Y_ENSURE(typeInfo.SendFuncId); fmgr_info(typeInfo.SendFuncId, &finfo); Y_ENSURE(!finfo.fn_retset); Y_ENSURE(finfo.fn_addr); @@ -1438,9 +1450,9 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe SET_MEMORY_CONTEXT; TPAllocLeakGuard leakGuard; const auto& typeInfo = NPg::LookupType(type->GetTypeId()); - Y_ENSURE(typeInfo.SendFuncId); FmgrInfo finfo; Zero(finfo); + Y_ENSURE(typeInfo.SendFuncId); fmgr_info(typeInfo.SendFuncId, &finfo); Y_ENSURE(!finfo.fn_retset); Y_ENSURE(finfo.fn_addr); @@ -1530,9 +1542,9 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { const auto& typeInfo = NPg::LookupType(type->GetTypeId()); auto typeIOParam = MakeTypeIOParam(typeInfo); - Y_ENSURE(typeInfo.ReceiveFuncId); FmgrInfo finfo; Zero(finfo); + Y_ENSURE(typeInfo.ReceiveFuncId); fmgr_info(typeInfo.ReceiveFuncId, &finfo); Y_ENSURE(!finfo.fn_retset); Y_ENSURE(finfo.fn_addr); |