aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.ru>2022-03-18 04:32:25 +0300
committervvvv <vvvv@yandex-team.ru>2022-03-18 04:32:25 +0300
commit375c6518aaf3387692ecce32a26e88443606ae3c (patch)
tree97120351d69b54c3ffe40e3cf29101b63204a807
parentd14fc69b3bfe40c19e4f7e80ba7f6ba8009ce220 (diff)
downloadydb-375c6518aaf3387692ecce32a26e88443606ae3c.tar.gz
YQL-13710 initial implementation of aggregations (without context), support of traits without merge
ref:c28054a44aa19784cbb64db3d143f82a12563fcb
-rw-r--r--ydb/library/yql/ast/yql_expr.cpp1
-rw-r--r--ydb/library/yql/core/common_opt/CMakeLists.txt1
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_simple1.cpp71
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp246
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp5
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp36
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.h1
-rw-r--r--ydb/library/yql/core/yql_opt_aggregate.cpp8
-rw-r--r--ydb/library/yql/core/yql_opt_utils.cpp3
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp2
-rw-r--r--ydb/library/yql/minikql/mkql_node.cpp1
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp6
-rw-r--r--ydb/library/yql/parser/pg_catalog/catalog.cpp2
-rw-r--r--ydb/library/yql/parser/pg_catalog/catalog.h1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.txt1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp30
16 files changed, 340 insertions, 75 deletions
diff --git a/ydb/library/yql/ast/yql_expr.cpp b/ydb/library/yql/ast/yql_expr.cpp
index e91a95adff..128b9bac2f 100644
--- a/ydb/library/yql/ast/yql_expr.cpp
+++ b/ydb/library/yql/ast/yql_expr.cpp
@@ -3021,6 +3021,7 @@ const TListExprType* TMakeTypeImpl<TListExprType>::Make(TExprContext& ctx, const
}
const TOptionalExprType* TMakeTypeImpl<TOptionalExprType>::Make(TExprContext& ctx, const TTypeAnnotationNode* itemType) {
+ Y_ENSURE(itemType->GetKind() != ETypeAnnotationKind::Pg);
const auto hash = TOptionalExprType::MakeHash(itemType);
TOptionalExprType sample(hash, itemType);
if (const auto found = FindType(sample, ctx))
diff --git a/ydb/library/yql/core/common_opt/CMakeLists.txt b/ydb/library/yql/core/common_opt/CMakeLists.txt
index 0ec215ab77..5e8019bb47 100644
--- a/ydb/library/yql/core/common_opt/CMakeLists.txt
+++ b/ydb/library/yql/core/common_opt/CMakeLists.txt
@@ -16,6 +16,7 @@ target_link_libraries(yql-core-common_opt PUBLIC
yutil
library-yql-core
yql-core-expr_nodes
+ yql-parser-pg_catalog
)
target_sources(yql-core-common_opt PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
index 1fc8746898..dff33a9155 100644
--- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
@@ -9,6 +9,7 @@
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <util/generic/map.h>
#include <util/generic/bitmap.h>
@@ -6692,30 +6693,52 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
TNodeOnNodeOwnedMap deepClones;
TExprNode::TListType payloadItems;
for (ui32 i = 0; i < aggs.size(); ++i) {
- const auto& exports = exportsPtr->Symbols();
auto func = aggs[i].first->Head().Content();
- if (func == "count" && aggs[i].first->ChildrenSize() == 1) {
- func = "count_all";
- }
+ TExprNode::TPtr traits;
+ if (optCtx.Types->PgTypes) {
+ auto arg = ctx.NewArgument(node->Pos(), "row");
+ auto arguments = ctx.NewArguments(node->Pos(), { arg });
+ TExprNode::TListType aggFuncArgs;
+ for (ui32 j = 1; j < aggs[i].first->ChildrenSize(); ++j) {
+ aggFuncArgs.push_back(ctx.ReplaceNode(aggs[i].first->ChildPtr(j), *aggs[i].second, arg));
+ }
- TString factory = TString(func) + "_traits_factory";
- const auto ex = exports.find(factory);
- YQL_ENSURE(exports.cend() != ex);
- auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
- auto arg = ctx.NewArgument(node->Pos(), "row");
- auto arguments = ctx.NewArguments(node->Pos(), { arg });
- auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments),
- ctx.ReplaceNode(aggs[i].first->TailPtr(), *aggs[i].second, arg));
-
- auto traits = ctx.ReplaceNodes(lambda->TailPtr(), {
- {lambda->Head().Child(0), listTypeNode},
- {lambda->Head().Child(1), extractor}
- });
-
- ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
- auto status = ExpandApply(traits, traits, ctx);
- if (status == IGraphTransformer::TStatus::Error) {
- return {};
+ auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments), std::move(aggFuncArgs));
+
+ traits = ctx.Builder(node->Pos())
+ .Callable("PgAggregationTraits")
+ .Atom(0, func)
+ .Callable(1, "ListItemType")
+ .Add(0, listTypeNode)
+ .Seal()
+ .Add(2, extractor)
+ .Seal()
+ .Build();
+ } else {
+ const auto& exports = exportsPtr->Symbols();
+ if (func == "count" && aggs[i].first->ChildrenSize() == 1) {
+ func = "count_all";
+ }
+
+ TString factory = TString(func) + "_traits_factory";
+ const auto ex = exports.find(factory);
+ YQL_ENSURE(exports.cend() != ex);
+ auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
+ auto arg = ctx.NewArgument(node->Pos(), "row");
+ auto arguments = ctx.NewArguments(node->Pos(), { arg });
+ auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments),
+ ctx.ReplaceNode(aggs[i].first->TailPtr(), *aggs[i].second, arg));
+
+ traits = ctx.ReplaceNodes(lambda->TailPtr(), {
+ {lambda->Head().Child(0), listTypeNode},
+ {lambda->Head().Child(1), extractor}
+ });
+
+ ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
+ auto status = ExpandApply(traits, traits, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return {};
+ }
}
payloadItems.push_back(ctx.Builder(node->Pos())
@@ -6759,7 +6782,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
.Atom(1, "_yql_agg_" + ToString(it->second))
.Seal()
.Build();
- if (node->Head().Content() == "count") {
+ if (!optCtx.Types->PgTypes && node->Head().Content() == "count") {
ret = ctx.Builder(node->Pos())
.Callable("SafeCast")
.Add(0, ret)
@@ -7073,7 +7096,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
.Seal()
.Build();
- if (node->Head().Content() == "row_number" || node->Head().Content() == "count") {
+ if (!optCtx.Types->PgTypes && (node->Head().Content() == "row_number" || node->Head().Content() == "count")) {
ret = ctx.Builder(node->Pos())
.Callable("SafeCast")
.Add(0, ret)
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 8cbacdfa7a..32ffc0fe9e 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -908,7 +908,9 @@ namespace NTypeAnnImpl {
}
input->SetTypeAnn(structType->GetItems()[*pos]->GetItemType());
- if (isOptional && input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional && input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Null) {
+ if (isOptional && input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional &&
+ input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Null &&
+ input->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) {
input->SetTypeAnn(ctx.Expr.MakeType<TOptionalExprType>(input->GetTypeAnn()));
}
@@ -8797,40 +8799,6 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
}
};
- bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, TPositionHandle pos, TExprContext& ctx) {
- pgType = 0;
- if (type->GetKind() == ETypeAnnotationKind::Null) {
- return true;
- }
-
- if (type->GetKind() == ETypeAnnotationKind::Data || type->GetKind() == ETypeAnnotationKind::Optional) {
- const TTypeAnnotationNode* unpacked = RemoveOptionalType(type);
- if (unpacked->GetKind() != ETypeAnnotationKind::Data) {
- ctx.AddError(TIssue(ctx.GetPosition(pos),
- "Nested optional type is not compatible to PG"));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto slot = unpacked->Cast<TDataExprType>()->GetSlot();
- auto convertedTypeId = ConvertToPgType(slot);
- if (!convertedTypeId) {
- ctx.AddError(TIssue(ctx.GetPosition(pos),
- TStringBuilder() << "Type is not compatible to PG: " << slot));
- return false;
- }
-
- pgType = *convertedTypeId;
- return true;
- } else if (type->GetKind() != ETypeAnnotationKind::Pg) {
- ctx.AddError(TIssue(ctx.GetPosition(pos),
- TStringBuilder() << "Expected PG type, but got: " << type->GetKind()));
- return false;
- } else {
- pgType = type->Cast<TPgExprType>()->GetId();
- return true;
- }
- }
-
IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
bool isResolved = input->Content() == "PgResolvedCall";
if (!EnsureMinArgsCount(*input, isResolved ? 2 : 1, ctx.Expr)) {
@@ -9701,6 +9669,213 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
return IGraphTransformer::TStatus::Ok;
}
+ IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 3, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto func = input->Child(0)->Content();
+ if (!EnsureType(*input->Child(1), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto itemType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ auto& lambda = input->ChildRef(2);
+ auto convertStatus = ConvertToLambda(lambda, ctx.Expr, 1);
+ if (convertStatus.Level != IGraphTransformer::TStatus::Ok) {
+ return convertStatus;
+ }
+
+ if (!UpdateLambdaAllArgumentsTypes(lambda, { itemType }, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto lambdaResult = lambda->GetTypeAnn();
+ if (!lambdaResult) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ TVector<ui32> argTypes;
+ for (ui32 i = 1; i < lambda->ChildrenSize(); ++i) {
+ auto type = lambda->Child(i)->GetTypeAnn();
+ ui32 argType;
+ if (!ExtractPgType(type, argType, lambda->Child(i)->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ argTypes.push_back(argType);
+ }
+
+ const auto& aggDesc = NPg::LookupAggregation(func, argTypes);
+ if (aggDesc.Kind != NPg::EAggKind::Normal) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ "Only normal aggregation supported"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto idLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state")
+ .Arg("state")
+ .Seal()
+ .Build();
+
+ auto saveLambda = idLambda;
+ auto loadLambda = idLambda;
+ auto finishLambda = idLambda;
+ if (aggDesc.FinalFuncId) {
+ finishLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state")
+ .Callable("PgResolvedCall")
+ .Atom(0, NPg::LookupProc(aggDesc.FinalFuncId).Name)
+ .Atom(1, ToString(aggDesc.FinalFuncId))
+ .Arg(2, "state")
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ auto nullValue = ctx.Expr.NewCallable(input->Pos(), "Null", {});
+ auto initValue = nullValue;
+ if (aggDesc.InitValue) {
+ initValue = ctx.Expr.Builder(input->Pos())
+ .Callable("PgCast")
+ .Callable(0, "PgConst")
+ .Callable(0, "PgType")
+ .Atom(0, "text")
+ .Seal()
+ .Atom(1, aggDesc.InitValue)
+ .Seal()
+ .Callable(1, "PgType")
+ .Atom(0, NPg::LookupType(aggDesc.TransTypeId).Name)
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ const auto& transFuncDesc = NPg::LookupProc(aggDesc.TransFuncId);
+ // use first non-null value as state if transFunc is strict
+ bool searchNonNullForState = false;
+ if (transFuncDesc.IsStrict && !aggDesc.InitValue) {
+ Y_ENSURE(argTypes.size() == 1);
+ searchNonNullForState = true;
+ }
+
+ TExprNode::TPtr initLambda, updateLambda;
+ if (!searchNonNullForState) {
+ initLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("row")
+ .Callable("PgResolvedCall")
+ .Atom(0, transFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.TransFuncId))
+ .Add(2, initValue)
+ .Apply(3, lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ updateLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("row")
+ .Param("state")
+ .Callable("Coalesce")
+ .Callable(0, "PgResolvedCall")
+ .Atom(0, transFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.TransFuncId))
+ .Callable(2, "Coalesce")
+ .Arg(0, "state")
+ .Add(1, initValue)
+ .Seal()
+ .Apply(3, lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Arg(1, "state")
+ .Seal()
+ .Seal()
+ .Build();
+ } else {
+ initLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("row")
+ .Apply(lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Build();
+
+ updateLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("row")
+ .Param("state")
+ .Callable("If")
+ .Callable(0, "Exists")
+ .Arg(0, "state")
+ .Seal()
+ .Callable(1, "Coalesce")
+ .Callable(0, "PgResolvedCall")
+ .Atom(0, transFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.TransFuncId))
+ .Arg(2, "state")
+ .Apply(3, lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Arg(1, "state")
+ .Seal()
+ .Apply(2, lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ auto mergeLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state1")
+ .Param("state2")
+ .Callable("Void")
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto zero = ctx.Expr.Builder(input->Pos())
+ .Callable("PgConst")
+ .Callable(0, "PgType")
+ .Atom(0, "int8")
+ .Seal()
+ .Atom(1, "0")
+ .Seal()
+ .Build();
+
+ auto defaultValue = (func != "count") ? nullValue : zero;
+
+ auto typeNode = ExpandType(input->Pos(), *itemType, ctx.Expr);
+ output = ctx.Expr.Builder(input->Pos())
+ .Callable("AggregationTraits")
+ .Add(0, typeNode)
+ .Add(1, initLambda)
+ .Add(2, updateLambda)
+ .Add(3, saveLambda)
+ .Add(4, loadLambda)
+ .Add(5, mergeLambda)
+ .Add(6, finishLambda)
+ .Add(7, defaultValue)
+ .Seal()
+ .Build();
+
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
IGraphTransformer::TStatus PgTypeWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
@@ -13436,6 +13611,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["PgConst"] = &PgConstWrapper;
Functions["PgType"] = &PgTypeWrapper;
Functions["PgCast"] = &PgCastWrapper;
+ Functions["PgAggregationTraits"] = &PgAggregationTraitsWrapper;
Functions["AutoDemuxList"] = &AutoDemuxListWrapper;
Functions["AggrCountInit"] = &AggrCountInitWrapper;
Functions["AggrCountUpdate"] = &AggrCountUpdateWrapper;
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index 92420a574c..568cf0ad32 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -4264,7 +4264,7 @@ namespace {
return IGraphTransformer::TStatus::Repeat;
}
- if (!IsSameAnnotation(*lambdaMerge->GetTypeAnn(), *reduceStateType)) {
+ if (!lambdaMerge->Tail().IsCallable("Void") && !IsSameAnnotation(*lambdaMerge->GetTypeAnn(), *reduceStateType)) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(lambdaUpdate->Pos()), TStringBuilder() << "Mismatch merge lambda result type, expected: "
<< *reduceStateType << ", but got: " << *lambdaMerge->GetTypeAnn()));
return IGraphTransformer::TStatus::Error;
@@ -4581,7 +4581,8 @@ namespace {
} else {
auto defVal = child->Child(1)->Child(7);
if (defVal->IsCallable("Null") && !isOptional && !isHopping && input->Child(1)->ChildrenSize() == 0) {
- if (finishType->GetKind() != ETypeAnnotationKind::Null) {
+ if (finishType->GetKind() != ETypeAnnotationKind::Null &&
+ finishType->GetKind() != ETypeAnnotationKind::Pg) {
finishType = ctx.Expr.MakeType<TOptionalExprType>(finishType);
}
} else if (!defVal->IsCallable("Null") && defVal->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index 2ac3fe7f95..223cd1e847 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -2,6 +2,7 @@
#include "yql_opt_proposed_by_data.h"
#include "yql_opt_rewrite_io.h"
#include "yql_opt_utils.h"
+#include "yql_pg_utils.h"
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/minikql/dom/json.h>
@@ -4961,4 +4962,39 @@ bool IsCallableTypeHasStreams(const TCallableExprType* callableType) {
return false;
}
+bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, TPositionHandle pos, TExprContext& ctx) {
+ pgType = 0;
+ if (type->GetKind() == ETypeAnnotationKind::Null) {
+ return true;
+ }
+
+ if (type->GetKind() == ETypeAnnotationKind::Data || type->GetKind() == ETypeAnnotationKind::Optional) {
+ const TTypeAnnotationNode* unpacked = RemoveOptionalType(type);
+ if (unpacked->GetKind() != ETypeAnnotationKind::Data) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
+ "Nested optional type is not compatible to PG"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto slot = unpacked->Cast<TDataExprType>()->GetSlot();
+ auto convertedTypeId = ConvertToPgType(slot);
+ if (!convertedTypeId) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
+ TStringBuilder() << "Type is not compatible to PG: " << slot));
+ return false;
+ }
+
+ pgType = *convertedTypeId;
+ return true;
+ } else if (type->GetKind() != ETypeAnnotationKind::Pg) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
+ TStringBuilder() << "Expected PG type, but got: " << type->GetKind()));
+ return false;
+ } else {
+ pgType = type->Cast<TPgExprType>()->GetId();
+ return true;
+ }
+}
+
+
} // NYql
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h
index 9316dfcacf..ad217ab595 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.h
+++ b/ydb/library/yql/core/yql_expr_type_annotation.h
@@ -280,5 +280,6 @@ std::optional<ui32> GetFieldPosition(const TTupleExprType& tupleType, const TStr
std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TStringBuf& field);
bool IsCallableTypeHasStreams(const TCallableExprType* callableType);
+bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, TPositionHandle pos, TExprContext& ctx);
}
diff --git a/ydb/library/yql/core/yql_opt_aggregate.cpp b/ydb/library/yql/core/yql_opt_aggregate.cpp
index 226339b8c5..5ea30e3340 100644
--- a/ydb/library/yql/core/yql_opt_aggregate.cpp
+++ b/ydb/library/yql/core/yql_opt_aggregate.cpp
@@ -26,6 +26,14 @@ TExprNode::TPtr ExpandAggregate(const TExprNode::TPtr& node, TExprContext& ctx,
TExprNode::TPtr sortOrder = voidNode;
bool effectiveCompact = forceCompact || HasSetting(*settings, "compact");
+ for (ui32 index = 0; index < aggregatedColumns->ChildrenSize(); ++index) {
+ auto trait = aggregatedColumns->Child(index)->Child(1);
+ auto mergeLambda = trait->Child(5);
+ if (mergeLambda->Tail().IsCallable("Void")) {
+ effectiveCompact = true;
+ break;
+ }
+ }
const TStructExprType* originalRowType = GetSeqItemType(node->Head().GetTypeAnn())->Cast<TStructExprType>();
TVector<const TItemExprType*> rowItems = originalRowType->GetItems();
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp
index 00cba774ae..2c3c52ad24 100644
--- a/ydb/library/yql/core/yql_opt_utils.cpp
+++ b/ydb/library/yql/core/yql_opt_utils.cpp
@@ -1281,7 +1281,8 @@ TExprNode::TPtr OptimizeExists(const TExprNode::TPtr& node, TExprContext& ctx)
return MakeBool<false>(node->Pos(), ctx);
}
- if (node->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional) {
+ if (node->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional &&
+ node->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) {
YQL_CLOG(DEBUG, Core) << node->Content() << " over non-optional";
return MakeBool<true>(node->Pos(), ctx);
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp
index 826646a505..a11c05f023 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_coalesce.cpp
@@ -64,7 +64,7 @@ IComputationNode* WrapCoalesce(TCallable& callable, const TComputationNodeFactor
bool isLeftOptional = false;
const auto& leftType = UnpackOptional(callable.GetInput(0), isLeftOptional);
- MKQL_ENSURE(isLeftOptional, "Expected optional");
+ MKQL_ENSURE(isLeftOptional || leftType->IsPg(), "Expected optional or pg");
bool isRightOptional = false;
if (!leftType->IsSameType(*callable.GetInput(1).GetStaticType())) {
diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp
index 52a6f56140..3706381103 100644
--- a/ydb/library/yql/minikql/mkql_node.cpp
+++ b/ydb/library/yql/minikql/mkql_node.cpp
@@ -1017,6 +1017,7 @@ TOptionalType::TOptionalType(TType* itemType, const TTypeEnvironment& env, bool
}
TOptionalType* TOptionalType::Create(TType* itemType, const TTypeEnvironment& env) {
+ MKQL_ENSURE(!itemType->IsPg(), "PG type can't be wrapped into optional type");
return ::new(env.Allocate<TOptionalType>()) TOptionalType(itemType, env);
}
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 8caa18ad40..e1bc8ed11d 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -348,7 +348,7 @@ TRuntimeNode TProgramBuilder::Member(TRuntimeNode structObj, const std::string_v
const auto type = AS_TYPE(TStructType, UnpackOptional(structObj.GetStaticType(), isOptional));
const auto memberIndex = type->GetMemberIndex(memberName);
auto memberType = type->GetMemberType(memberIndex);
- if (isOptional && !memberType->IsOptional() && !memberType->IsNull()) {
+ if (isOptional && !memberType->IsOptional() && !memberType->IsNull() && !memberType->IsPg()) {
memberType = NewOptionalType(memberType);
}
@@ -2146,7 +2146,7 @@ TRuntimeNode TProgramBuilder::NewVariant(TRuntimeNode item, const std::string_vi
TRuntimeNode TProgramBuilder::Coalesce(TRuntimeNode data, TRuntimeNode defaultData) {
bool isOptional = false;
const auto dataType = UnpackOptional(data, isOptional);
- if (!isOptional) {
+ if (!isOptional && !data.GetStaticType()->IsPg()) {
MKQL_ENSURE(data.GetStaticType()->IsSameType(*defaultData.GetStaticType()), "Mismatch operand types");
return data;
}
@@ -2685,7 +2685,7 @@ TRuntimeNode TProgramBuilder::Exists(TRuntimeNode data) {
return NewDataLiteral(false);
}
- if (!nodeType->IsOptional()) {
+ if (!nodeType->IsOptional() && !nodeType->IsPg()) {
return NewDataLiteral(true);
}
diff --git a/ydb/library/yql/parser/pg_catalog/catalog.cpp b/ydb/library/yql/parser/pg_catalog/catalog.cpp
index 0007f8869d..1dc8aa8d56 100644
--- a/ydb/library/yql/parser/pg_catalog/catalog.cpp
+++ b/ydb/library/yql/parser/pg_catalog/catalog.cpp
@@ -454,6 +454,8 @@ public:
} else {
ythrow yexception() << "Unknown aggkind value: " << value;
}
+ } else if (key == "agginitval") {
+ LastAggregation.InitValue = value;
}
}
diff --git a/ydb/library/yql/parser/pg_catalog/catalog.h b/ydb/library/yql/parser/pg_catalog/catalog.h
index 76178e470c..4a67d4414d 100644
--- a/ydb/library/yql/parser/pg_catalog/catalog.h
+++ b/ydb/library/yql/parser/pg_catalog/catalog.h
@@ -73,6 +73,7 @@ struct TAggregateDesc {
ui32 CombineFuncId = 0;
ui32 SerializeFuncId = 0;
ui32 DeserializeFuncId = 0;
+ TString InitValue;
};
const TProcDesc& LookupProc(const TString& name, const TVector<ui32>& argTypeIds);
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt
index 5b49fcb41b..92125b02d4 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.txt
@@ -94,6 +94,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
yql-minikql-computation
yql-parser-pg_catalog
library-yql-core
+ providers-common-codec
library-cpp-yson
contrib-libs-icu
contrib-libs-libc_compat
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index df5c9944c3..c60ac089c4 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -104,7 +104,7 @@ Datum PointerDatumFromPod(const NUdf::TUnboxedValuePod& value, bool isVar) {
struct TPAllocLeakGuard {
TPAllocLeakGuard() {
- Y_ENSURE(!PAllocList);
+ PrevList = PAllocList;
PAllocList = &Root;
Root.Next = &Root;
Root.Prev = &Root;
@@ -118,9 +118,11 @@ struct TPAllocLeakGuard {
current = next;
}
- PAllocList = nullptr;
+ Y_VERIFY(PAllocList == &Root);
+ PAllocList = PrevList;
}
+ TPAllocListItem* PrevList;
TPAllocListItem Root;
};
@@ -239,8 +241,14 @@ public:
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const {
SET_MEMORY_CONTEXT;
- if (TypeId == INT4OID) {
+ if (TypeId == INT2OID) {
+ return ScalarDatumToPod(Int16GetDatum(FromString<i16>(Value)));
+ } else if (TypeId == INT4OID) {
return ScalarDatumToPod(Int32GetDatum(FromString<i32>(Value)));
+ } else if (TypeId == INT8OID) {
+ return ScalarDatumToPod(Int64GetDatum(FromString<i64>(Value)));
+ } else if (TypeId == FLOAT4OID) {
+ return ScalarDatumToPod(Float4GetDatum(FromString<float>(Value)));
} else if (TypeId == FLOAT8OID) {
return ScalarDatumToPod(Float8GetDatum(FromString<double>(Value)));
} else if (TypeId == TEXTOID) {
@@ -311,6 +319,7 @@ public:
, RetTypeDesc(NPg::LookupType(ProcDesc.ResultType))
{
Zero(FInfo);
+ Y_ENSURE(Id);
fmgr_info(Id, &FInfo);
Y_ENSURE(!FInfo.fn_retset);
Y_ENSURE(FInfo.fn_addr);
@@ -456,6 +465,7 @@ public:
}
}
+ Y_ENSURE(funcId);
fmgr_info(funcId, &FInfo1);
Y_ENSURE(!FInfo1.fn_retset);
Y_ENSURE(FInfo1.fn_addr);
@@ -467,6 +477,7 @@ public:
}
if (funcId2) {
+ Y_ENSURE(funcId2);
fmgr_info(funcId2, &FInfo2);
Y_ENSURE(!FInfo2.fn_retset);
Y_ENSURE(FInfo2.fn_addr);
@@ -896,9 +907,9 @@ void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::T
SET_MEMORY_CONTEXT;
TPAllocLeakGuard leakGuard;
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- Y_ENSURE(typeInfo.SendFuncId);
FmgrInfo finfo;
Zero(finfo);
+ Y_ENSURE(typeInfo.SendFuncId);
fmgr_info(typeInfo.SendFuncId, &finfo);
Y_ENSURE(!finfo.fn_retset);
Y_ENSURE(finfo.fn_addr);
@@ -985,6 +996,7 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
FmgrInfo finfo;
Zero(finfo);
+ Y_ENSURE(typeInfo.OutFuncId);
fmgr_info(typeInfo.OutFuncId, &finfo);
Y_ENSURE(!finfo.fn_retset);
Y_ENSURE(finfo.fn_addr);
@@ -1078,9 +1090,9 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) {
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
auto typeIOParam = MakeTypeIOParam(typeInfo);
- Y_ENSURE(typeInfo.ReceiveFuncId);
FmgrInfo finfo;
Zero(finfo);
+ Y_ENSURE(typeInfo.ReceiveFuncId);
fmgr_info(typeInfo.ReceiveFuncId, &finfo);
Y_ENSURE(!finfo.fn_retset);
Y_ENSURE(finfo.fn_addr);
@@ -1190,9 +1202,9 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
auto typeIOParam = MakeTypeIOParam(typeInfo);
- Y_ENSURE(typeInfo.ReceiveFuncId);
FmgrInfo finfo;
Zero(finfo);
+ Y_ENSURE(typeInfo.ReceiveFuncId);
fmgr_info(typeInfo.ReceiveFuncId, &finfo);
Y_ENSURE(!finfo.fn_retset);
Y_ENSURE(finfo.fn_addr);
@@ -1282,9 +1294,9 @@ void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxe
SET_MEMORY_CONTEXT;
TPAllocLeakGuard leakGuard;
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- Y_ENSURE(typeInfo.SendFuncId);
FmgrInfo finfo;
Zero(finfo);
+ Y_ENSURE(typeInfo.SendFuncId);
fmgr_info(typeInfo.SendFuncId, &finfo);
Y_ENSURE(!finfo.fn_retset);
Y_ENSURE(finfo.fn_addr);
@@ -1438,9 +1450,9 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe
SET_MEMORY_CONTEXT;
TPAllocLeakGuard leakGuard;
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
- Y_ENSURE(typeInfo.SendFuncId);
FmgrInfo finfo;
Zero(finfo);
+ Y_ENSURE(typeInfo.SendFuncId);
fmgr_info(typeInfo.SendFuncId, &finfo);
Y_ENSURE(!finfo.fn_retset);
Y_ENSURE(finfo.fn_addr);
@@ -1530,9 +1542,9 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
auto typeIOParam = MakeTypeIOParam(typeInfo);
- Y_ENSURE(typeInfo.ReceiveFuncId);
FmgrInfo finfo;
Zero(finfo);
+ Y_ENSURE(typeInfo.ReceiveFuncId);
fmgr_info(typeInfo.ReceiveFuncId, &finfo);
Y_ENSURE(!finfo.fn_retset);
Y_ENSURE(finfo.fn_addr);