aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.ru>2022-04-06 21:36:55 +0300
committervvvv <vvvv@yandex-team.ru>2022-04-06 21:36:55 +0300
commit3b53fc30368aa56b2cdcfeaf8215cbc680fce307 (patch)
tree189aca9705759329f615ef610efdabdd960d1552
parent670c434be4ba87d1e30f5b674278b13c9dc30cb3 (diff)
downloadydb-3b53fc30368aa56b2cdcfeaf8215cbc680fce307.tar.gz
YQL-13710 refactoring, part1: extract code to another files
ref:ecb9adbf0ffe5548f5fe355a5a811ff61a4b6005
-rw-r--r--ydb/library/yql/core/common_opt/CMakeLists.txt1
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_pgselect.cpp1030
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_pgselect.h12
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_simple1.cpp1023
-rw-r--r--ydb/library/yql/core/type_ann/CMakeLists.txt3
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp2541
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.cpp2555
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.h34
8 files changed, 3637 insertions, 3562 deletions
diff --git a/ydb/library/yql/core/common_opt/CMakeLists.txt b/ydb/library/yql/core/common_opt/CMakeLists.txt
index 5e8019bb475..9aafea51818 100644
--- a/ydb/library/yql/core/common_opt/CMakeLists.txt
+++ b/ydb/library/yql/core/common_opt/CMakeLists.txt
@@ -24,6 +24,7 @@ target_sources(yql-core-common_opt PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_flow1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_last.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_simple2.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/common_opt/yql_co_transformer.cpp
diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
new file mode 100644
index 00000000000..1f37486486a
--- /dev/null
+++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
@@ -0,0 +1,1030 @@
+#include "yql_co_pgselect.h"
+
+#include <ydb/library/yql/core/yql_expr_optimize.h>
+#include <ydb/library/yql/core/yql_opt_utils.h>
+
+namespace NYql {
+
+TExprNode::TPtr ExpandPositionalUnionAll(const TExprNode& node, const TVector<TColumnOrder>& columnOrders,
+ TExprNode::TListType children, TExprContext& ctx, TOptimizeContext& optCtx) {
+ auto targetColumnOrder = optCtx.Types->LookupColumnOrder(node);
+ YQL_ENSURE(targetColumnOrder);
+
+ for (ui32 childIndex = 0; childIndex < children.size(); ++childIndex) {
+ const auto& childColumnOrder = columnOrders[childIndex];
+ auto& child = children[childIndex];
+ if (childColumnOrder == *targetColumnOrder) {
+ continue;
+ }
+
+ YQL_ENSURE(childColumnOrder.size() == targetColumnOrder->size());
+ child = ctx.Builder(child->Pos())
+ .Callable("Map")
+ .Add(0, child)
+ .Lambda(1)
+ .Param("row")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder &parent) -> TExprNodeBuilder & {
+ for (size_t i = 0; i < childColumnOrder.size(); ++i) {
+ parent
+ .List(i)
+ .Atom(0, child->Pos(), (*targetColumnOrder)[i])
+ .Callable(1, "Member")
+ .Arg(0, "row")
+ .Atom(1, childColumnOrder[i])
+ .Seal()
+ .Seal();
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ auto res = ctx.NewCallable(node.Pos(), "UnionAll", std::move(children));
+ return KeepColumnOrder(res, node, ctx, *optCtx.Types);
+}
+
+TExprNode::TPtr ExpandPgSelect(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
+ auto setItems = GetSetting(node->Head(), "set_items");
+ auto order = optCtx.Types->LookupColumnOrder(*node);
+ YQL_ENSURE(order);
+ TExprNode::TListType columnsItems;
+ for (const auto& x : *order) {
+ columnsItems.push_back(ctx.NewAtom(node->Pos(), x));
+ }
+
+ auto columns = ctx.NewList(node->Pos(), std::move(columnsItems));
+ TExprNode::TListType setItemNodes;
+ TVector<TColumnOrder> columnOrders;
+ for (auto setItem : setItems->Tail().Children()) {
+ auto childOrder = optCtx.Types->LookupColumnOrder(*setItem);
+ YQL_ENSURE(*childOrder);
+ columnOrders.push_back(*childOrder);
+ auto result = GetSetting(setItem->Tail(), "result");
+ auto values = GetSetting(setItem->Tail(), "values");
+ auto from = GetSetting(setItem->Tail(), "from");
+ auto filter = GetSetting(setItem->Tail(), "where");
+ auto joinOps = GetSetting(setItem->Tail(), "join_ops");
+ auto groupBy = GetSetting(setItem->Tail(), "group_by");
+ auto having = GetSetting(setItem->Tail(), "having");
+ auto window = GetSetting(setItem->Tail(), "window");
+ bool oneRow = !from;
+ TExprNode::TPtr list;
+ if (values) {
+ YQL_ENSURE(!result);
+ list = ctx.Builder(node->Pos())
+ .Callable("Map")
+ .Add(0, values->ChildPtr(2))
+ .Lambda(1)
+ .Param("row")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 index = 0; index < values->Child(1)->ChildrenSize(); ++index) {
+ parent
+ .List(index)
+ .Atom(0, values->Child(1)->Child(index)->Content())
+ .Callable(1, "Nth")
+ .Arg(0, "row")
+ .Atom(1, ToString(index))
+ .Seal()
+ .Seal();
+ }
+
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ } else {
+ YQL_ENSURE(result);
+ TExprNode::TPtr projectionLambda;
+ if (oneRow) {
+ TExprNode::TListType rowItems;
+ for (const auto& x : result->Tail().Children()) {
+ rowItems.push_back(ctx.NewList(x->Pos(), { x->HeadPtr(), x->Tail().TailPtr() }));
+ }
+
+ auto row = ctx.NewCallable(node->Pos(), "AsStruct", std::move(rowItems));
+ list = ctx.Builder(node->Pos())
+ .Callable("AsList")
+ .Callable(0, "AsStruct")
+ .Seal()
+ .Seal()
+ .Build();
+
+ projectionLambda = ctx.Builder(node->Pos())
+ .Lambda()
+ .Param("row")
+ .Set(row)
+ .Seal()
+ .Build();
+ } else {
+ // extract all used columns
+ TMap<TString,std::pair<ui32, TString>> usedColumns;
+ for (const auto& x : result->Tail().Children()) {
+ auto type = x->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
+ for (auto item : type->GetItems()) {
+ usedColumns.insert(std::make_pair(TString(item->GetName()), std::make_pair(Max<ui32>(), TString())));
+ }
+ }
+
+ for (ui32 groupNo = 0; groupNo < joinOps->Tail().ChildrenSize(); ++groupNo) {
+ auto groupTuple = joinOps->Tail().Child(groupNo);
+ for (ui32 i = 0; i < groupTuple->ChildrenSize(); ++i) {
+ auto join = groupTuple->Child(i);
+ auto joinType = join->Child(0)->Content();
+ if (joinType != "cross") {
+ auto type = join->Tail().Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
+ for (auto item : type->GetItems()) {
+ usedColumns.insert(std::make_pair(TString(item->GetName()), std::make_pair(Max<ui32>(), TString())));
+ }
+ }
+ }
+ }
+
+ // fill index of input for each column
+ for (auto& x : usedColumns) {
+ bool foundColumn = false;
+ for (ui32 inputIndex = 0; inputIndex < from->Tail().ChildrenSize(); ++inputIndex) {
+ const auto& read = from->Tail().Child(inputIndex)->Head();
+ const auto& columns = from->Tail().Child(inputIndex)->Tail();
+ if (columns.ChildrenSize() > 0) {
+ auto readOrder = optCtx.Types->LookupColumnOrder(read);
+ YQL_ENSURE(*readOrder);
+ for (ui32 i = 0; i < columns.ChildrenSize(); ++i) {
+ if (columns.Child(i)->Content() == x.first) {
+ foundColumn = true;
+ x.second.second = (*readOrder)[i];
+ break;
+ }
+ }
+ } else {
+ auto type = read.GetTypeAnn()->
+ Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+ auto pos = type->FindItem(x.first);
+ foundColumn = pos.Defined();
+ }
+
+ if (foundColumn) {
+ x.second.first = inputIndex;
+ break;
+ }
+ }
+
+ YQL_ENSURE(foundColumn, "Missing column: " << x.first);
+ }
+
+ TVector<TExprNode::TPtr> cleanedInputs;
+ for (ui32 i = 0; i < from->Tail().ChildrenSize(); ++i) {
+ auto cleaned = ctx.Builder(node->Pos())
+ .Callable("OrderedMap")
+ .Add(0, from->Tail().Child(i)->HeadPtr())
+ .Lambda(1)
+ .Param("row")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 index = 0;
+ for (const auto& x : usedColumns) {
+ if (x.second.first != i) {
+ continue;
+ }
+
+ auto listBuilder = parent.List(index++);
+ listBuilder.Atom(0, x.first);
+ listBuilder.Callable(1, "Member")
+ .Arg(0, "row")
+ .Atom(1, x.second.second ? x.second.second : x.first)
+ .Seal();
+ listBuilder.Seal();
+ }
+
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ cleanedInputs.push_back(cleaned);
+ }
+
+ if (cleanedInputs.size() == 1) {
+ list = cleanedInputs.front();
+ } else {
+ TVector<ui32> groupForIndex;
+ TExprNode::TListType joinGroups;
+ ui32 inputIndex = 0;
+ for (ui32 groupNo = 0; groupNo < joinOps->Tail().ChildrenSize(); ++groupNo) {
+ groupForIndex.push_back(groupNo);
+ auto groupTuple = joinOps->Tail().Child(groupNo);
+ if (groupTuple->ChildrenSize() == 0) {
+ joinGroups.push_back(cleanedInputs[inputIndex++]);
+ continue;
+ }
+
+ auto current = cleanedInputs[inputIndex++];
+ for (ui32 i = 0; i < groupTuple->ChildrenSize(); ++i) {
+ groupForIndex.push_back(groupNo);
+ auto with = cleanedInputs[inputIndex++];
+ // current = join current & with
+ auto join = groupTuple->Child(i);
+ auto joinType = join->Child(0)->Content();
+ auto cartesian = ctx.Builder(node->Pos())
+ .Callable("OrderedFlatMap")
+ .Add(0, current)
+ .Lambda(1)
+ .Param("x")
+ .Callable("Map")
+ .Add(0, with)
+ .Lambda(1)
+ .Param("y")
+ .Callable("FlattenMembers")
+ .List(0)
+ .Atom(0, "")
+ .Arg(1,"x")
+ .Seal()
+ .List(1)
+ .Atom(0, "")
+ .Arg(1, "y")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto buildMinus = [&](auto left, auto right) {
+ return ctx.Builder(node->Pos())
+ .Callable("OrderedFlatMap")
+ .Add(0, left)
+ .Lambda(1)
+ .Param("x")
+ .Callable("OptionalIf")
+ .Callable(0, "Not")
+ .Callable(0, "HasItems")
+ .Callable(0, "Filter")
+ .Add(0, right)
+ .Lambda(1)
+ .Param("y")
+ .Apply(join->Tail().TailPtr())
+ .With(0)
+ .Callable("FlattenMembers")
+ .List(0)
+ .Atom(0, "")
+ .Arg(1,"x")
+ .Seal()
+ .List(1)
+ .Atom(0, "")
+ .Arg(1, "y")
+ .Seal()
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Arg(1, "x")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ };
+
+ TExprNode::TPtr filteredCartesian;
+ if (joinType != "cross") {
+ filteredCartesian = ctx.Builder(node->Pos())
+ .Callable("OrderedFilter")
+ .Add(0, cartesian)
+ .Lambda(1)
+ .Param("row")
+ .Apply(join->Tail().TailPtr())
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ if (joinType == "cross") {
+ current = cartesian;
+ } else if (joinType == "inner") {
+ current = filteredCartesian;
+ } else if (joinType == "left") {
+ current = ctx.Builder(node->Pos())
+ .Callable("UnionAll")
+ .Add(0, filteredCartesian)
+ .Add(1, buildMinus(current, with))
+ .Seal()
+ .Build();
+ } else if (joinType == "right") {
+ current = ctx.Builder(node->Pos())
+ .Callable("UnionAll")
+ .Add(0, filteredCartesian)
+ .Add(1, buildMinus(with, current))
+ .Seal()
+ .Build();
+ } else {
+ YQL_ENSURE(joinType == "full");
+ current = ctx.Builder(node->Pos())
+ .Callable("UnionAll")
+ .Add(0, filteredCartesian)
+ .Add(1, buildMinus(current, with))
+ .Add(2, buildMinus(with, current))
+ .Seal()
+ .Build();
+ }
+ }
+
+ joinGroups.push_back(current);
+ }
+
+ if (joinGroups.size() == 1) {
+ list = joinGroups.front();
+ } else {
+ TExprNode::TListType args;
+ for (ui32 i = 0; i < joinGroups.size(); ++i) {
+ args.push_back(ctx.Builder(node->Pos())
+ .List()
+ .Add(0, joinGroups[i])
+ .Atom(1, ToString(i))
+ .Seal()
+ .Build());
+ }
+
+ auto tree = ctx.Builder(node->Pos())
+ .List()
+ .Atom(0, "Cross")
+ .Atom(1, "0")
+ .Atom(2, "1")
+ .List(3)
+ .Seal()
+ .List(4)
+ .Seal()
+ .List(5)
+ .Seal()
+ .Seal()
+ .Build();
+
+ for (ui32 i = 2; i < joinGroups.size(); ++i) {
+ tree = ctx.Builder(node->Pos())
+ .List()
+ .Atom(0, "Cross")
+ .Add(1, tree)
+ .Atom(2, ToString(i))
+ .List(3)
+ .Seal()
+ .List(4)
+ .Seal()
+ .List(5)
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ args.push_back(tree);
+ TExprNode::TListType settings;
+ for (const auto& x : usedColumns) {
+ settings.push_back(ctx.Builder(node->Pos())
+ .List()
+ .Atom(0, "rename")
+ .Atom(1, ToString(groupForIndex[x.second.first]) + "." + x.first)
+ .Atom(2, x.first)
+ .Seal()
+ .Build());
+ }
+
+ auto settingsNode = ctx.NewList(node->Pos(), std::move(settings));
+ args.push_back(settingsNode);
+ list = ctx.NewCallable(node->Pos(), "EquiJoin", std::move(args));
+ }
+ }
+
+ projectionLambda = ctx.Builder(node->Pos())
+ .Lambda()
+ .Param("row")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 index = 0;
+ for (const auto& x : result->Tail().Children()) {
+ if (x->HeadPtr()->IsAtom()) {
+ auto listBuilder = parent.List(index++);
+ listBuilder.Add(0, x->HeadPtr());
+ listBuilder.Apply(1, x->TailPtr())
+ .With(0, "row")
+ .Seal();
+ listBuilder.Seal();
+ } else {
+ for (ui32 i = 0; i < x->Head().ChildrenSize(); ++i) {
+ auto listBuilder = parent.List(index++);
+ listBuilder.Add(0, x->Head().ChildPtr(i));
+ listBuilder.Callable(1, "Member")
+ .Arg(0, "row")
+ .Add(1, x->Head().ChildPtr(i));
+ listBuilder.Seal();
+ }
+ }
+ }
+
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ if (filter) {
+ list = ctx.Builder(node->Pos())
+ .Callable("Filter")
+ .Add(0, list)
+ .Lambda(1)
+ .Param("row")
+ .Callable("Coalesce")
+ .Callable(0, "FromPg")
+ .Apply(0, filter->Tail().TailPtr())
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Callable(1, "Bool")
+ .Atom(0, "0")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ TVector<std::pair<TExprNode::TPtr, TExprNode::TPtr>> aggs;
+ TNodeMap<ui32> aggId;
+ VisitExpr(projectionLambda->TailPtr(), [&](const TExprNode::TPtr& node) {
+ if (node->IsCallable("PgAgg") || node->IsCallable("PgAggAll")) {
+ aggId[node.Get()] = aggs.size();
+ aggs.push_back({ node, projectionLambda->Head().HeadPtr() });
+ }
+
+ return true;
+ });
+
+ if (having) {
+ auto havingLambda = having->Tail().TailPtr();
+ VisitExpr(having->Tail().TailPtr(), [&](const TExprNode::TPtr& node) {
+ if (node->IsCallable("PgAgg") || node->IsCallable("PgAggAll")) {
+ aggId[node.Get()] = aggs.size();
+ aggs.push_back({ node, havingLambda->Head().HeadPtr() });
+ }
+
+ return true;
+ });
+ }
+
+ if (!aggs.empty() || groupBy) {
+ auto listTypeNode = ctx.Builder(node->Pos())
+ .Callable("TypeOf")
+ .Add(0, list)
+ .Seal()
+ .Build();
+
+ auto exportsPtr = optCtx.Types->Modules->GetModule("/lib/yql/aggregate.yql");
+ YQL_ENSURE(exportsPtr);
+
+ TNodeOnNodeOwnedMap deepClones;
+ TExprNode::TListType payloadItems;
+ for (ui32 i = 0; i < aggs.size(); ++i) {
+ auto func = aggs[i].first->Head().Content();
+ TExprNode::TPtr traits;
+ if (optCtx.Types->PgTypes) {
+ auto arg = ctx.NewArgument(node->Pos(), "row");
+ auto arguments = ctx.NewArguments(node->Pos(), { arg });
+ TExprNode::TListType aggFuncArgs;
+ for (ui32 j = 1; j < aggs[i].first->ChildrenSize(); ++j) {
+ aggFuncArgs.push_back(ctx.ReplaceNode(aggs[i].first->ChildPtr(j), *aggs[i].second, arg));
+ }
+
+ auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments), std::move(aggFuncArgs));
+
+ traits = ctx.Builder(node->Pos())
+ .Callable("PgAggregationTraits")
+ .Atom(0, func)
+ .Callable(1, "ListItemType")
+ .Add(0, listTypeNode)
+ .Seal()
+ .Add(2, extractor)
+ .Seal()
+ .Build();
+ } else {
+ const auto& exports = exportsPtr->Symbols();
+ if (func == "count" && aggs[i].first->ChildrenSize() == 1) {
+ func = "count_all";
+ }
+
+ TString factory = TString(func) + "_traits_factory";
+ const auto ex = exports.find(factory);
+ YQL_ENSURE(exports.cend() != ex);
+ auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
+ auto arg = ctx.NewArgument(node->Pos(), "row");
+ auto arguments = ctx.NewArguments(node->Pos(), { arg });
+ auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments),
+ ctx.ReplaceNode(aggs[i].first->TailPtr(), *aggs[i].second, arg));
+
+ traits = ctx.ReplaceNodes(lambda->TailPtr(), {
+ {lambda->Head().Child(0), listTypeNode},
+ {lambda->Head().Child(1), extractor}
+ });
+
+ ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
+ auto status = ExpandApply(traits, traits, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return {};
+ }
+ }
+
+ payloadItems.push_back(ctx.Builder(node->Pos())
+ .List()
+ .Atom(0, "_yql_agg_" + ToString(i))
+ .Add(1, traits)
+ .Seal()
+ .Build());
+ }
+
+ auto payloadsNode = ctx.NewList(node->Pos(), std::move(payloadItems));
+ TExprNode::TListType keysItems;
+ if (groupBy) {
+ for (const auto& group : groupBy->Tail().Children()) {
+ const auto& lambda = group->Tail();
+ YQL_ENSURE(lambda.IsLambda());
+ YQL_ENSURE(lambda.Tail().IsCallable("Member"));
+ keysItems.push_back(lambda.Tail().TailPtr());
+ }
+ }
+
+ auto keys = ctx.NewList(node->Pos(), std::move(keysItems));
+
+ list = ctx.Builder(node->Pos())
+ .Callable("Aggregate")
+ .Add(0, list)
+ .Add(1, keys)
+ .Add(2, payloadsNode)
+ .List(3) // options
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto rewriteAggs = [&](auto& lambda) {
+ auto status = OptimizeExpr(lambda, lambda, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ auto it = aggId.find(node.Get());
+ if (it != aggId.end()) {
+ auto ret = ctx.Builder(node->Pos())
+ .Callable("Member")
+ .Add(0, lambda->Head().HeadPtr())
+ .Atom(1, "_yql_agg_" + ToString(it->second))
+ .Seal()
+ .Build();
+ if (!optCtx.Types->PgTypes && node->Head().Content() == "count") {
+ ret = ctx.Builder(node->Pos())
+ .Callable("SafeCast")
+ .Add(0, ret)
+ .Atom(1, "Int64")
+ .Seal()
+ .Build();
+ }
+
+ return ret;
+ }
+
+ return node;
+ }, ctx, TOptimizeExprSettings(optCtx.Types));
+
+ return status.Level != IGraphTransformer::TStatus::Error;
+ };
+
+ if (!rewriteAggs(projectionLambda)) {
+ return {};
+ }
+
+ if (having) {
+ auto havingLambda = having->Tail().TailPtr();
+ if (!rewriteAggs(havingLambda)) {
+ return {};
+ }
+
+ list = ctx.Builder(node->Pos())
+ .Callable("Filter")
+ .Add(0, list)
+ .Lambda(1)
+ .Param("row")
+ .Callable("Coalesce")
+ .Callable(0, "FromPg")
+ .Apply(0, havingLambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Callable(1, "Bool")
+ .Atom(0, "0")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+ }
+
+ TVector<std::pair<TExprNode::TPtr, TExprNode::TPtr>> winFuncs;
+ TMap<ui32, TVector<ui32>> window2funcs;
+ TNodeMap<ui32> winFuncsId;
+ bool hasAggsOverWindow = false;
+ VisitExpr(projectionLambda->TailPtr(), [&](const TExprNode::TPtr& node) {
+ if (node->IsCallable("PgWindowCall") || node->IsCallable("PgAggWindowCall")) {
+ hasAggsOverWindow = hasAggsOverWindow || node->IsCallable("PgAggWindowCall");
+ YQL_ENSURE(window);
+ ui32 windowIndex;
+ if (node->Child(1)->IsCallable("PgAnonWindow")) {
+ windowIndex = FromString<ui32>(node->Child(1)->Head().Content());
+ } else {
+ auto name = node->Child(1)->Content();
+ bool found = false;
+ for (ui32 index = 0; index < window->Tail().ChildrenSize(); ++index) {
+ if (window->Tail().Child(index)->Head().Content() == name) {
+ windowIndex = index;
+ found = true;
+ break;
+ }
+ }
+
+ YQL_ENSURE(found);
+ }
+
+ window2funcs[windowIndex].push_back(winFuncs.size());
+ winFuncsId[node.Get()] = winFuncs.size();
+ winFuncs.push_back({ node, projectionLambda->Head().HeadPtr() });
+ }
+
+ return true;
+ });
+
+ if (!winFuncs.empty()) {
+ auto listTypeNode = ctx.Builder(node->Pos())
+ .Callable("TypeOf")
+ .Add(0, list)
+ .Seal()
+ .Build();
+
+ TNodeOnNodeOwnedMap deepClones;
+ const TExportTable* exportsPtr = nullptr;
+ if (hasAggsOverWindow) {
+ exportsPtr = optCtx.Types->Modules->GetModule("/lib/yql/window.yql");
+ YQL_ENSURE(exportsPtr);
+ }
+
+ for (const auto& x : window2funcs) {
+ auto win = window->Tail().Child(x.first);
+ const auto& frameSettings = win->Tail();
+
+ TExprNode::TListType args;
+ // default frame
+ auto begin = ctx.NewCallable(node->Pos(), "Void", {});
+ auto end = win->Child(3)->ChildrenSize() > 0 ?
+ ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), "0") }) :
+ ctx.NewCallable(node->Pos(), "Void", {});
+ if (HasSetting(frameSettings, "type")) {
+ const auto& from = GetSetting(frameSettings, "from");
+ const auto& fromValue = GetSetting(frameSettings, "from_value");
+
+ auto fromName = from->Tail().Content();
+ if (fromName == "up") {
+ begin = ctx.NewCallable(node->Pos(), "Void", {});
+ } else if (fromName == "p") {
+ auto val = FromString<i32>(fromValue->Tail().Head().Content());
+ begin = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), ToString(-val)) });
+ } else if (fromName == "c") {
+ begin = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), "0") });
+ } else {
+ YQL_ENSURE(fromName == "f");
+ auto val = FromString<i32>(fromValue->Tail().Head().Content());
+ begin = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), ToString(val)) });
+ }
+
+ const auto& to = GetSetting(frameSettings, "to");
+ const auto& toValue = GetSetting(frameSettings, "to_value");
+
+ auto toName = to->Tail().Content();
+ if (toName == "p") {
+ auto val = FromString<i32>(toValue->Tail().Head().Content());
+ end = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), ToString(-val)) });
+ } else if (toName == "c") {
+ end = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), "0") });
+ } else if (toName == "f") {
+ auto val = FromString<i32>(toValue->Tail().Head().Content());
+ end = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), ToString(val)) });
+ } else {
+ YQL_ENSURE(toName == "uf");
+ end = ctx.NewCallable(node->Pos(), "Void", {});
+ }
+ }
+
+ args.push_back(ctx.Builder(node->Pos())
+ .List()
+ .List(0)
+ .Atom(0, "begin")
+ .Add(1, begin)
+ .Seal()
+ .List(1)
+ .Atom(0, "end")
+ .Add(1, end)
+ .Seal()
+ .Seal()
+ .Build());
+
+ for (const auto& index : x.second) {
+ auto p = winFuncs[index];
+ auto name = p.first->Head().Content();
+ bool isAgg = p.first->IsCallable("PgAggWindowCall");
+ TExprNode::TPtr value;
+ if (isAgg) {
+ const auto& exports = exportsPtr->Symbols();
+ if (name == "count" && p.first->ChildrenSize() == 2) {
+ name = "count_all";
+ }
+
+ TString factory = TString(name) + "_traits_factory";
+ const auto ex = exports.find(factory);
+ YQL_ENSURE(exports.cend() != ex);
+ auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
+ auto arg = ctx.NewArgument(node->Pos(), "row");
+ auto arguments = ctx.NewArguments(node->Pos(), { arg });
+ auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments),
+ ctx.ReplaceNode(p.first->TailPtr(), *p.second, arg));
+
+ auto traits = ctx.ReplaceNodes(lambda->TailPtr(), {
+ {lambda->Head().Child(0), listTypeNode},
+ {lambda->Head().Child(1), extractor}
+ });
+
+ ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
+ auto status = ExpandApply(traits, traits, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return {};
+ }
+
+ value = traits;
+ } else {
+ if (name == "row_number") {
+ value = ctx.Builder(node->Pos())
+ .Callable("RowNumber")
+ .Callable(0, "TypeOf")
+ .Add(0, list)
+ .Seal()
+ .Seal()
+ .Build();
+ } else if (name == "lead" || name == "lag") {
+ auto arg = ctx.NewArgument(node->Pos(), "row");
+ auto arguments = ctx.NewArguments(node->Pos(), { arg });
+ auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments),
+ ctx.ReplaceNode(p.first->TailPtr(), *p.second, arg));
+
+ value = ctx.Builder(node->Pos())
+ .Callable(name == "lead" ? "Lead" : "Lag")
+ .Callable(0, "TypeOf")
+ .Add(0, list)
+ .Seal()
+ .Add(1, extractor)
+ .Seal()
+ .Build();
+ } else {
+ ythrow yexception() << "Not supported function: " << name;
+ }
+ }
+
+ args.push_back(ctx.Builder(node->Pos())
+ .List()
+ .Atom(0, "_yql_win_" + ToString(index))
+ .Add(1, value)
+ .Seal()
+ .Build());
+ }
+
+ auto winOnRows = ctx.NewCallable(node->Pos(), "WinOnRows", std::move(args));
+
+ auto frames = ctx.Builder(node->Pos())
+ .List()
+ .Add(0, winOnRows)
+ .Seal()
+ .Build();
+
+ TExprNode::TListType keys;
+ for (auto p : win->Child(2)->Children()) {
+ YQL_ENSURE(p->IsCallable("PgGroup"));
+ const auto& member = p->Tail().Tail();
+ YQL_ENSURE(member.IsCallable("Member"));
+ keys.push_back(member.TailPtr());
+ }
+
+ auto keysNode = ctx.NewList(node->Pos(), std::move(keys));
+ auto sortNode = ctx.NewCallable(node->Pos(), "Void", {});
+ if (win->Child(3)->ChildrenSize() > 0) {
+ if (win->Child(3)->ChildrenSize() == 1) {
+ sortNode = ctx.Builder(node->Pos())
+ .Callable("SortTraits")
+ .Callable(0, "TypeOf")
+ .Add(0, list)
+ .Seal()
+ .Callable(1, "Bool")
+ .Atom(0, win->Child(3)->Head().Tail().Content() == "asc" ? "true" : "false")
+ .Seal()
+ .Lambda(2)
+ .Param("row")
+ .Apply(win->Child(3)->Head().ChildPtr(1))
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ } else {
+ sortNode = ctx.Builder(node->Pos())
+ .Callable("SortTraits")
+ .Callable(0, "TypeOf")
+ .Add(0, list)
+ .Seal()
+ .List(1)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0; i < win->Child(3)->ChildrenSize(); ++i) {
+ parent.Callable(i, "Bool")
+ .Atom(0, win->Child(3)->Child(i)->Tail().Content() == "asc" ? "true" : "false")
+ .Seal();
+ }
+ return parent;
+ })
+ .Seal()
+ .Lambda(2)
+ .Param("row")
+ .List()
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0; i < win->Child(3)->ChildrenSize(); ++i) {
+ parent.Apply(i, win->Child(3)->Child(i)->ChildPtr(1))
+ .With(0, "row")
+ .Seal();
+ }
+
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+ }
+
+ list = ctx.Builder(node->Pos())
+ .Callable("CalcOverWindow")
+ .Add(0, list)
+ .Add(1, keysNode)
+ .Add(2, sortNode)
+ .Add(3, frames)
+ .Seal()
+ .Build();
+ }
+
+ auto status = OptimizeExpr(projectionLambda, projectionLambda, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ auto it = winFuncsId.find(node.Get());
+ if (it != winFuncsId.end()) {
+ auto ret = ctx.Builder(node->Pos())
+ .Callable("Member")
+ .Add(0, projectionLambda->Head().HeadPtr())
+ .Atom(1, "_yql_win_" + ToString(it->second))
+ .Seal()
+ .Build();
+
+ if (!optCtx.Types->PgTypes && (node->Head().Content() == "row_number" || node->Head().Content() == "count")) {
+ ret = ctx.Builder(node->Pos())
+ .Callable("SafeCast")
+ .Add(0, ret)
+ .Atom(1, "Int64")
+ .Seal()
+ .Build();
+ }
+
+ return ret;
+ }
+
+ return node;
+ }, ctx, TOptimizeExprSettings(optCtx.Types));
+
+ if (status.Level == IGraphTransformer::TStatus::Error) {
+ return nullptr;
+ }
+ }
+
+ list = ctx.Builder(node->Pos())
+ .Callable("Map")
+ .Add(0, list)
+ .Add(1, projectionLambda)
+ .Seal()
+ .Build();
+ }
+
+ setItemNodes.push_back(list);
+ }
+
+ TExprNode::TPtr list;
+ if (setItemNodes.size() == 1) {
+ list = setItemNodes.front();
+ } else {
+ list = ExpandPositionalUnionAll(*node, columnOrders, setItemNodes, ctx, optCtx);
+ }
+
+ auto sort = GetSetting(node->Head(), "sort");
+ if (sort && sort->Tail().ChildrenSize() > 0) {
+ const auto& keys = sort->Tail();
+ auto argNode = ctx.NewArgument(node->Pos(), "row");
+ auto argsNode = ctx.NewArguments(node->Pos(), { argNode });
+
+ TExprNode::TListType dirItems;
+ TExprNode::TListType rootItems;
+ for (const auto& key : keys.Children()) {
+ dirItems.push_back(ctx.Builder(node->Pos())
+ .Callable("Bool")
+ .Atom(0, key->Tail().Content() == "asc" ? "true" : "false")
+ .Seal()
+ .Build());
+
+ auto keyLambda = key->ChildPtr(1);
+ rootItems.push_back(ctx.ReplaceNode(keyLambda->TailPtr(), keyLambda->Head().Head(), argNode));
+ }
+
+ auto root = ctx.NewList(node->Pos(), std::move(rootItems));
+ auto dir = ctx.NewList(node->Pos(), std::move(dirItems));
+ auto lambda = ctx.NewLambda(node->Pos(), std::move(argsNode), std::move(root));
+
+ list = ctx.Builder(node->Pos())
+ .Callable("Sort")
+ .Add(0, list)
+ .Add(1, dir)
+ .Add(2, lambda)
+ .Seal()
+ .Build();
+ }
+
+ auto limit = GetSetting(node->Head(), "limit");
+ auto offset = GetSetting(node->Head(), "offset");
+
+ if (offset) {
+ list = ctx.Builder(node->Pos())
+ .Callable("Skip")
+ .Add(0, list)
+ .Callable(1, "Unwrap")
+ .Callable(0, "SafeCast")
+ .Callable(0, "Coalesce")
+ .Callable(0,"FromPg")
+ .Add(0, offset->ChildPtr(1))
+ .Seal()
+ .Callable(1, "Int64")
+ .Atom(0, "0")
+ .Seal()
+ .Seal()
+ .Atom(1, "Uint64")
+ .Seal()
+ .Callable(1, "String")
+ .Atom(0, "Negative offset")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ if (limit) {
+ list = ctx.Builder(node->Pos())
+ .Callable("Take")
+ .Add(0, list)
+ .Callable(1, "Unwrap")
+ .Callable(0, "SafeCast")
+ .Callable(0, "Coalesce")
+ .Callable(0,"FromPg")
+ .Add(0, limit->ChildPtr(1))
+ .Seal()
+ .Callable(1, "Int64")
+ .Atom(0, "9223372036854775807") // 2**63-1
+ .Seal()
+ .Seal()
+ .Atom(1, "Uint64")
+ .Seal()
+ .Callable(1, "String")
+ .Atom(0, "Negative limit")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ return ctx.Builder(node->Pos())
+ .Callable("AssumeColumnOrder")
+ .Add(0, list)
+ .Add(1, columns)
+ .Seal()
+ .Build();
+}
+
+} // namespace NYql
diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.h b/ydb/library/yql/core/common_opt/yql_co_pgselect.h
new file mode 100644
index 00000000000..b84bf752c9f
--- /dev/null
+++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include "yql_co.h"
+
+namespace NYql {
+
+TExprNode::TPtr ExpandPgSelect(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx);
+
+TExprNode::TPtr ExpandPositionalUnionAll(const TExprNode& node, const TVector<TColumnOrder>& columnOrders,
+ TExprNode::TListType children, TExprContext& ctx, TOptimizeContext& optCtx);
+
+} // namespace NYql
diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
index 7408d750ce4..69fe01caac8 100644
--- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
@@ -1,5 +1,6 @@
#include "yql_co.h"
#include "yql_co_sqlin.h"
+#include "yql_co_pgselect.h"
#include <ydb/library/yql/core/yql_atom_enums.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
@@ -164,48 +165,6 @@ TExprNode::TPtr ConstFoldNodeIntAggregate(const TExprNode::TPtr& node, TExprCont
return node;
}
-TExprNode::TPtr ExpandPositionalUnionAll(const TExprNode& node, const TVector<TColumnOrder>& columnOrders,
- TExprNode::TListType children, TExprContext& ctx, TOptimizeContext& optCtx) {
- auto targetColumnOrder = optCtx.Types->LookupColumnOrder(node);
- YQL_ENSURE(targetColumnOrder);
-
- for (ui32 childIndex = 0; childIndex < children.size(); ++childIndex) {
- const auto& childColumnOrder = columnOrders[childIndex];
- auto& child = children[childIndex];
- if (childColumnOrder == *targetColumnOrder) {
- continue;
- }
-
- YQL_ENSURE(childColumnOrder.size() == targetColumnOrder->size());
- child = ctx.Builder(child->Pos())
- .Callable("Map")
- .Add(0, child)
- .Lambda(1)
- .Param("row")
- .Callable("AsStruct")
- .Do([&](TExprNodeBuilder &parent) -> TExprNodeBuilder & {
- for (size_t i = 0; i < childColumnOrder.size(); ++i) {
- parent
- .List(i)
- .Atom(0, child->Pos(), (*targetColumnOrder)[i])
- .Callable(1, "Member")
- .Arg(0, "row")
- .Atom(1, childColumnOrder[i])
- .Seal()
- .Seal();
- }
- return parent;
- })
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
-
- auto res = ctx.NewCallable(node.Pos(), "UnionAll", std::move(children));
- return KeepColumnOrder(res, node, ctx, *optCtx.Types);
-}
-
TExprNode::TPtr ExpandFlattenEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx) {
auto settings = node->Children().back();
TExprNode::TListType settingsChildren;
@@ -6259,985 +6218,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
return node;
};
- map["PgSelect"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) -> TExprNode::TPtr {
- auto setItems = GetSetting(node->Head(), "set_items");
- auto order = optCtx.Types->LookupColumnOrder(*node);
- YQL_ENSURE(order);
- TExprNode::TListType columnsItems;
- for (const auto& x : *order) {
- columnsItems.push_back(ctx.NewAtom(node->Pos(), x));
- }
-
- auto columns = ctx.NewList(node->Pos(), std::move(columnsItems));
- TExprNode::TListType setItemNodes;
- TVector<TColumnOrder> columnOrders;
- for (auto setItem : setItems->Tail().Children()) {
- auto childOrder = optCtx.Types->LookupColumnOrder(*setItem);
- YQL_ENSURE(*childOrder);
- columnOrders.push_back(*childOrder);
- auto result = GetSetting(setItem->Tail(), "result");
- auto values = GetSetting(setItem->Tail(), "values");
- auto from = GetSetting(setItem->Tail(), "from");
- auto filter = GetSetting(setItem->Tail(), "where");
- auto joinOps = GetSetting(setItem->Tail(), "join_ops");
- auto groupBy = GetSetting(setItem->Tail(), "group_by");
- auto having = GetSetting(setItem->Tail(), "having");
- auto window = GetSetting(setItem->Tail(), "window");
- bool oneRow = !from;
- TExprNode::TPtr list;
- if (values) {
- YQL_ENSURE(!result);
- list = ctx.Builder(node->Pos())
- .Callable("Map")
- .Add(0, values->ChildPtr(2))
- .Lambda(1)
- .Param("row")
- .Callable("AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 index = 0; index < values->Child(1)->ChildrenSize(); ++index) {
- parent
- .List(index)
- .Atom(0, values->Child(1)->Child(index)->Content())
- .Callable(1, "Nth")
- .Arg(0, "row")
- .Atom(1, ToString(index))
- .Seal()
- .Seal();
- }
-
- return parent;
- })
- .Seal()
- .Seal()
- .Seal()
- .Build();
- } else {
- YQL_ENSURE(result);
- TExprNode::TPtr projectionLambda;
- if (oneRow) {
- TExprNode::TListType rowItems;
- for (const auto& x : result->Tail().Children()) {
- rowItems.push_back(ctx.NewList(x->Pos(), { x->HeadPtr(), x->Tail().TailPtr() }));
- }
-
- auto row = ctx.NewCallable(node->Pos(), "AsStruct", std::move(rowItems));
- list = ctx.Builder(node->Pos())
- .Callable("AsList")
- .Callable(0, "AsStruct")
- .Seal()
- .Seal()
- .Build();
-
- projectionLambda = ctx.Builder(node->Pos())
- .Lambda()
- .Param("row")
- .Set(row)
- .Seal()
- .Build();
- } else {
- // extract all used columns
- TMap<TString,std::pair<ui32, TString>> usedColumns;
- for (const auto& x : result->Tail().Children()) {
- auto type = x->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
- for (auto item : type->GetItems()) {
- usedColumns.insert(std::make_pair(TString(item->GetName()), std::make_pair(Max<ui32>(), TString())));
- }
- }
-
- for (ui32 groupNo = 0; groupNo < joinOps->Tail().ChildrenSize(); ++groupNo) {
- auto groupTuple = joinOps->Tail().Child(groupNo);
- for (ui32 i = 0; i < groupTuple->ChildrenSize(); ++i) {
- auto join = groupTuple->Child(i);
- auto joinType = join->Child(0)->Content();
- if (joinType != "cross") {
- auto type = join->Tail().Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
- for (auto item : type->GetItems()) {
- usedColumns.insert(std::make_pair(TString(item->GetName()), std::make_pair(Max<ui32>(), TString())));
- }
- }
- }
- }
-
- // fill index of input for each column
- for (auto& x : usedColumns) {
- bool foundColumn = false;
- for (ui32 inputIndex = 0; inputIndex < from->Tail().ChildrenSize(); ++inputIndex) {
- const auto& read = from->Tail().Child(inputIndex)->Head();
- const auto& columns = from->Tail().Child(inputIndex)->Tail();
- if (columns.ChildrenSize() > 0) {
- auto readOrder = optCtx.Types->LookupColumnOrder(read);
- YQL_ENSURE(*readOrder);
- for (ui32 i = 0; i < columns.ChildrenSize(); ++i) {
- if (columns.Child(i)->Content() == x.first) {
- foundColumn = true;
- x.second.second = (*readOrder)[i];
- break;
- }
- }
- } else {
- auto type = read.GetTypeAnn()->
- Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
- auto pos = type->FindItem(x.first);
- foundColumn = pos.Defined();
- }
-
- if (foundColumn) {
- x.second.first = inputIndex;
- break;
- }
- }
-
- YQL_ENSURE(foundColumn, "Missing column: " << x.first);
- }
-
- TVector<TExprNode::TPtr> cleanedInputs;
- for (ui32 i = 0; i < from->Tail().ChildrenSize(); ++i) {
- auto cleaned = ctx.Builder(node->Pos())
- .Callable("OrderedMap")
- .Add(0, from->Tail().Child(i)->HeadPtr())
- .Lambda(1)
- .Param("row")
- .Callable("AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- ui32 index = 0;
- for (const auto& x : usedColumns) {
- if (x.second.first != i) {
- continue;
- }
-
- auto listBuilder = parent.List(index++);
- listBuilder.Atom(0, x.first);
- listBuilder.Callable(1, "Member")
- .Arg(0, "row")
- .Atom(1, x.second.second ? x.second.second : x.first)
- .Seal();
- listBuilder.Seal();
- }
-
- return parent;
- })
- .Seal()
- .Seal()
- .Seal()
- .Build();
-
- cleanedInputs.push_back(cleaned);
- }
-
- if (cleanedInputs.size() == 1) {
- list = cleanedInputs.front();
- } else {
- TVector<ui32> groupForIndex;
- TExprNode::TListType joinGroups;
- ui32 inputIndex = 0;
- for (ui32 groupNo = 0; groupNo < joinOps->Tail().ChildrenSize(); ++groupNo) {
- groupForIndex.push_back(groupNo);
- auto groupTuple = joinOps->Tail().Child(groupNo);
- if (groupTuple->ChildrenSize() == 0) {
- joinGroups.push_back(cleanedInputs[inputIndex++]);
- continue;
- }
-
- auto current = cleanedInputs[inputIndex++];
- for (ui32 i = 0; i < groupTuple->ChildrenSize(); ++i) {
- groupForIndex.push_back(groupNo);
- auto with = cleanedInputs[inputIndex++];
- // current = join current & with
- auto join = groupTuple->Child(i);
- auto joinType = join->Child(0)->Content();
- auto cartesian = ctx.Builder(node->Pos())
- .Callable("OrderedFlatMap")
- .Add(0, current)
- .Lambda(1)
- .Param("x")
- .Callable("Map")
- .Add(0, with)
- .Lambda(1)
- .Param("y")
- .Callable("FlattenMembers")
- .List(0)
- .Atom(0, "")
- .Arg(1,"x")
- .Seal()
- .List(1)
- .Atom(0, "")
- .Arg(1, "y")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
-
- auto buildMinus = [&](auto left, auto right) {
- return ctx.Builder(node->Pos())
- .Callable("OrderedFlatMap")
- .Add(0, left)
- .Lambda(1)
- .Param("x")
- .Callable("OptionalIf")
- .Callable(0, "Not")
- .Callable(0, "HasItems")
- .Callable(0, "Filter")
- .Add(0, right)
- .Lambda(1)
- .Param("y")
- .Apply(join->Tail().TailPtr())
- .With(0)
- .Callable("FlattenMembers")
- .List(0)
- .Atom(0, "")
- .Arg(1,"x")
- .Seal()
- .List(1)
- .Atom(0, "")
- .Arg(1, "y")
- .Seal()
- .Seal()
- .Done()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Arg(1, "x")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- };
-
- TExprNode::TPtr filteredCartesian;
- if (joinType != "cross") {
- filteredCartesian = ctx.Builder(node->Pos())
- .Callable("OrderedFilter")
- .Add(0, cartesian)
- .Lambda(1)
- .Param("row")
- .Apply(join->Tail().TailPtr())
- .With(0, "row")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
-
- if (joinType == "cross") {
- current = cartesian;
- } else if (joinType == "inner") {
- current = filteredCartesian;
- } else if (joinType == "left") {
- current = ctx.Builder(node->Pos())
- .Callable("UnionAll")
- .Add(0, filteredCartesian)
- .Add(1, buildMinus(current, with))
- .Seal()
- .Build();
- } else if (joinType == "right") {
- current = ctx.Builder(node->Pos())
- .Callable("UnionAll")
- .Add(0, filteredCartesian)
- .Add(1, buildMinus(with, current))
- .Seal()
- .Build();
- } else {
- YQL_ENSURE(joinType == "full");
- current = ctx.Builder(node->Pos())
- .Callable("UnionAll")
- .Add(0, filteredCartesian)
- .Add(1, buildMinus(current, with))
- .Add(2, buildMinus(with, current))
- .Seal()
- .Build();
- }
- }
-
- joinGroups.push_back(current);
- }
-
- if (joinGroups.size() == 1) {
- list = joinGroups.front();
- } else {
- TExprNode::TListType args;
- for (ui32 i = 0; i < joinGroups.size(); ++i) {
- args.push_back(ctx.Builder(node->Pos())
- .List()
- .Add(0, joinGroups[i])
- .Atom(1, ToString(i))
- .Seal()
- .Build());
- }
-
- auto tree = ctx.Builder(node->Pos())
- .List()
- .Atom(0, "Cross")
- .Atom(1, "0")
- .Atom(2, "1")
- .List(3)
- .Seal()
- .List(4)
- .Seal()
- .List(5)
- .Seal()
- .Seal()
- .Build();
-
- for (ui32 i = 2; i < joinGroups.size(); ++i) {
- tree = ctx.Builder(node->Pos())
- .List()
- .Atom(0, "Cross")
- .Add(1, tree)
- .Atom(2, ToString(i))
- .List(3)
- .Seal()
- .List(4)
- .Seal()
- .List(5)
- .Seal()
- .Seal()
- .Build();
- }
-
- args.push_back(tree);
- TExprNode::TListType settings;
- for (const auto& x : usedColumns) {
- settings.push_back(ctx.Builder(node->Pos())
- .List()
- .Atom(0, "rename")
- .Atom(1, ToString(groupForIndex[x.second.first]) + "." + x.first)
- .Atom(2, x.first)
- .Seal()
- .Build());
- }
-
- auto settingsNode = ctx.NewList(node->Pos(), std::move(settings));
- args.push_back(settingsNode);
- list = ctx.NewCallable(node->Pos(), "EquiJoin", std::move(args));
- }
- }
-
- projectionLambda = ctx.Builder(node->Pos())
- .Lambda()
- .Param("row")
- .Callable("AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- ui32 index = 0;
- for (const auto& x : result->Tail().Children()) {
- if (x->HeadPtr()->IsAtom()) {
- auto listBuilder = parent.List(index++);
- listBuilder.Add(0, x->HeadPtr());
- listBuilder.Apply(1, x->TailPtr())
- .With(0, "row")
- .Seal();
- listBuilder.Seal();
- } else {
- for (ui32 i = 0; i < x->Head().ChildrenSize(); ++i) {
- auto listBuilder = parent.List(index++);
- listBuilder.Add(0, x->Head().ChildPtr(i));
- listBuilder.Callable(1, "Member")
- .Arg(0, "row")
- .Add(1, x->Head().ChildPtr(i));
- listBuilder.Seal();
- }
- }
- }
-
- return parent;
- })
- .Seal()
- .Seal()
- .Build();
- }
-
- if (filter) {
- list = ctx.Builder(node->Pos())
- .Callable("Filter")
- .Add(0, list)
- .Lambda(1)
- .Param("row")
- .Callable("Coalesce")
- .Callable(0, "FromPg")
- .Apply(0, filter->Tail().TailPtr())
- .With(0, "row")
- .Seal()
- .Seal()
- .Callable(1, "Bool")
- .Atom(0, "0")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
-
- TVector<std::pair<TExprNode::TPtr, TExprNode::TPtr>> aggs;
- TNodeMap<ui32> aggId;
- VisitExpr(projectionLambda->TailPtr(), [&](const TExprNode::TPtr& node) {
- if (node->IsCallable("PgAgg") || node->IsCallable("PgAggAll")) {
- aggId[node.Get()] = aggs.size();
- aggs.push_back({ node, projectionLambda->Head().HeadPtr() });
- }
-
- return true;
- });
-
- if (having) {
- auto havingLambda = having->Tail().TailPtr();
- VisitExpr(having->Tail().TailPtr(), [&](const TExprNode::TPtr& node) {
- if (node->IsCallable("PgAgg") || node->IsCallable("PgAggAll")) {
- aggId[node.Get()] = aggs.size();
- aggs.push_back({ node, havingLambda->Head().HeadPtr() });
- }
-
- return true;
- });
- }
-
- if (!aggs.empty() || groupBy) {
- auto listTypeNode = ctx.Builder(node->Pos())
- .Callable("TypeOf")
- .Add(0, list)
- .Seal()
- .Build();
-
- auto exportsPtr = optCtx.Types->Modules->GetModule("/lib/yql/aggregate.yql");
- YQL_ENSURE(exportsPtr);
-
- TNodeOnNodeOwnedMap deepClones;
- TExprNode::TListType payloadItems;
- for (ui32 i = 0; i < aggs.size(); ++i) {
- auto func = aggs[i].first->Head().Content();
- TExprNode::TPtr traits;
- if (optCtx.Types->PgTypes) {
- auto arg = ctx.NewArgument(node->Pos(), "row");
- auto arguments = ctx.NewArguments(node->Pos(), { arg });
- TExprNode::TListType aggFuncArgs;
- for (ui32 j = 1; j < aggs[i].first->ChildrenSize(); ++j) {
- aggFuncArgs.push_back(ctx.ReplaceNode(aggs[i].first->ChildPtr(j), *aggs[i].second, arg));
- }
-
- auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments), std::move(aggFuncArgs));
-
- traits = ctx.Builder(node->Pos())
- .Callable("PgAggregationTraits")
- .Atom(0, func)
- .Callable(1, "ListItemType")
- .Add(0, listTypeNode)
- .Seal()
- .Add(2, extractor)
- .Seal()
- .Build();
- } else {
- const auto& exports = exportsPtr->Symbols();
- if (func == "count" && aggs[i].first->ChildrenSize() == 1) {
- func = "count_all";
- }
-
- TString factory = TString(func) + "_traits_factory";
- const auto ex = exports.find(factory);
- YQL_ENSURE(exports.cend() != ex);
- auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
- auto arg = ctx.NewArgument(node->Pos(), "row");
- auto arguments = ctx.NewArguments(node->Pos(), { arg });
- auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments),
- ctx.ReplaceNode(aggs[i].first->TailPtr(), *aggs[i].second, arg));
-
- traits = ctx.ReplaceNodes(lambda->TailPtr(), {
- {lambda->Head().Child(0), listTypeNode},
- {lambda->Head().Child(1), extractor}
- });
-
- ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
- auto status = ExpandApply(traits, traits, ctx);
- if (status == IGraphTransformer::TStatus::Error) {
- return {};
- }
- }
-
- payloadItems.push_back(ctx.Builder(node->Pos())
- .List()
- .Atom(0, "_yql_agg_" + ToString(i))
- .Add(1, traits)
- .Seal()
- .Build());
- }
-
- auto payloadsNode = ctx.NewList(node->Pos(), std::move(payloadItems));
- TExprNode::TListType keysItems;
- if (groupBy) {
- for (const auto& group : groupBy->Tail().Children()) {
- const auto& lambda = group->Tail();
- YQL_ENSURE(lambda.IsLambda());
- YQL_ENSURE(lambda.Tail().IsCallable("Member"));
- keysItems.push_back(lambda.Tail().TailPtr());
- }
- }
-
- auto keys = ctx.NewList(node->Pos(), std::move(keysItems));
-
- list = ctx.Builder(node->Pos())
- .Callable("Aggregate")
- .Add(0, list)
- .Add(1, keys)
- .Add(2, payloadsNode)
- .List(3) // options
- .Seal()
- .Seal()
- .Build();
-
- auto rewriteAggs = [&](auto& lambda) {
- auto status = OptimizeExpr(lambda, lambda, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
- auto it = aggId.find(node.Get());
- if (it != aggId.end()) {
- auto ret = ctx.Builder(node->Pos())
- .Callable("Member")
- .Add(0, lambda->Head().HeadPtr())
- .Atom(1, "_yql_agg_" + ToString(it->second))
- .Seal()
- .Build();
- if (!optCtx.Types->PgTypes && node->Head().Content() == "count") {
- ret = ctx.Builder(node->Pos())
- .Callable("SafeCast")
- .Add(0, ret)
- .Atom(1, "Int64")
- .Seal()
- .Build();
- }
-
- return ret;
- }
-
- return node;
- }, ctx, TOptimizeExprSettings(optCtx.Types));
-
- return status.Level != IGraphTransformer::TStatus::Error;
- };
-
- if (!rewriteAggs(projectionLambda)) {
- return {};
- }
-
- if (having) {
- auto havingLambda = having->Tail().TailPtr();
- if (!rewriteAggs(havingLambda)) {
- return {};
- }
-
- list = ctx.Builder(node->Pos())
- .Callable("Filter")
- .Add(0, list)
- .Lambda(1)
- .Param("row")
- .Callable("Coalesce")
- .Callable(0, "FromPg")
- .Apply(0, havingLambda)
- .With(0, "row")
- .Seal()
- .Seal()
- .Callable(1, "Bool")
- .Atom(0, "0")
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- }
-
- TVector<std::pair<TExprNode::TPtr, TExprNode::TPtr>> winFuncs;
- TMap<ui32, TVector<ui32>> window2funcs;
- TNodeMap<ui32> winFuncsId;
- bool hasAggsOverWindow = false;
- VisitExpr(projectionLambda->TailPtr(), [&](const TExprNode::TPtr& node) {
- if (node->IsCallable("PgWindowCall") || node->IsCallable("PgAggWindowCall")) {
- hasAggsOverWindow = hasAggsOverWindow || node->IsCallable("PgAggWindowCall");
- YQL_ENSURE(window);
- ui32 windowIndex;
- if (node->Child(1)->IsCallable("PgAnonWindow")) {
- windowIndex = FromString<ui32>(node->Child(1)->Head().Content());
- } else {
- auto name = node->Child(1)->Content();
- bool found = false;
- for (ui32 index = 0; index < window->Tail().ChildrenSize(); ++index) {
- if (window->Tail().Child(index)->Head().Content() == name) {
- windowIndex = index;
- found = true;
- break;
- }
- }
-
- YQL_ENSURE(found);
- }
-
- window2funcs[windowIndex].push_back(winFuncs.size());
- winFuncsId[node.Get()] = winFuncs.size();
- winFuncs.push_back({ node, projectionLambda->Head().HeadPtr() });
- }
-
- return true;
- });
-
- if (!winFuncs.empty()) {
- auto listTypeNode = ctx.Builder(node->Pos())
- .Callable("TypeOf")
- .Add(0, list)
- .Seal()
- .Build();
-
- TNodeOnNodeOwnedMap deepClones;
- const TExportTable* exportsPtr = nullptr;
- if (hasAggsOverWindow) {
- exportsPtr = optCtx.Types->Modules->GetModule("/lib/yql/window.yql");
- YQL_ENSURE(exportsPtr);
- }
-
- for (const auto& x : window2funcs) {
- auto win = window->Tail().Child(x.first);
- const auto& frameSettings = win->Tail();
-
- TExprNode::TListType args;
- // default frame
- auto begin = ctx.NewCallable(node->Pos(), "Void", {});
- auto end = win->Child(3)->ChildrenSize() > 0 ?
- ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), "0") }) :
- ctx.NewCallable(node->Pos(), "Void", {});
- if (HasSetting(frameSettings, "type")) {
- const auto& from = GetSetting(frameSettings, "from");
- const auto& fromValue = GetSetting(frameSettings, "from_value");
-
- auto fromName = from->Tail().Content();
- if (fromName == "up") {
- begin = ctx.NewCallable(node->Pos(), "Void", {});
- } else if (fromName == "p") {
- auto val = FromString<i32>(fromValue->Tail().Head().Content());
- begin = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), ToString(-val)) });
- } else if (fromName == "c") {
- begin = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), "0") });
- } else {
- YQL_ENSURE(fromName == "f");
- auto val = FromString<i32>(fromValue->Tail().Head().Content());
- begin = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), ToString(val)) });
- }
-
- const auto& to = GetSetting(frameSettings, "to");
- const auto& toValue = GetSetting(frameSettings, "to_value");
-
- auto toName = to->Tail().Content();
- if (toName == "p") {
- auto val = FromString<i32>(toValue->Tail().Head().Content());
- end = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), ToString(-val)) });
- } else if (toName == "c") {
- end = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), "0") });
- } else if (toName == "f") {
- auto val = FromString<i32>(toValue->Tail().Head().Content());
- end = ctx.NewCallable(node->Pos(), "Int32", { ctx.NewAtom(node->Pos(), ToString(val)) });
- } else {
- YQL_ENSURE(toName == "uf");
- end = ctx.NewCallable(node->Pos(), "Void", {});
- }
- }
-
- args.push_back(ctx.Builder(node->Pos())
- .List()
- .List(0)
- .Atom(0, "begin")
- .Add(1, begin)
- .Seal()
- .List(1)
- .Atom(0, "end")
- .Add(1, end)
- .Seal()
- .Seal()
- .Build());
-
- for (const auto& index : x.second) {
- auto p = winFuncs[index];
- auto name = p.first->Head().Content();
- bool isAgg = p.first->IsCallable("PgAggWindowCall");
- TExprNode::TPtr value;
- if (isAgg) {
- const auto& exports = exportsPtr->Symbols();
- if (name == "count" && p.first->ChildrenSize() == 2) {
- name = "count_all";
- }
-
- TString factory = TString(name) + "_traits_factory";
- const auto ex = exports.find(factory);
- YQL_ENSURE(exports.cend() != ex);
- auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
- auto arg = ctx.NewArgument(node->Pos(), "row");
- auto arguments = ctx.NewArguments(node->Pos(), { arg });
- auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments),
- ctx.ReplaceNode(p.first->TailPtr(), *p.second, arg));
-
- auto traits = ctx.ReplaceNodes(lambda->TailPtr(), {
- {lambda->Head().Child(0), listTypeNode},
- {lambda->Head().Child(1), extractor}
- });
-
- ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
- auto status = ExpandApply(traits, traits, ctx);
- if (status == IGraphTransformer::TStatus::Error) {
- return {};
- }
-
- value = traits;
- } else {
- if (name == "row_number") {
- value = ctx.Builder(node->Pos())
- .Callable("RowNumber")
- .Callable(0, "TypeOf")
- .Add(0, list)
- .Seal()
- .Seal()
- .Build();
- } else if (name == "lead" || name == "lag") {
- auto arg = ctx.NewArgument(node->Pos(), "row");
- auto arguments = ctx.NewArguments(node->Pos(), { arg });
- auto extractor = ctx.NewLambda(node->Pos(), std::move(arguments),
- ctx.ReplaceNode(p.first->TailPtr(), *p.second, arg));
-
- value = ctx.Builder(node->Pos())
- .Callable(name == "lead" ? "Lead" : "Lag")
- .Callable(0, "TypeOf")
- .Add(0, list)
- .Seal()
- .Add(1, extractor)
- .Seal()
- .Build();
- } else {
- ythrow yexception() << "Not supported function: " << name;
- }
- }
-
- args.push_back(ctx.Builder(node->Pos())
- .List()
- .Atom(0, "_yql_win_" + ToString(index))
- .Add(1, value)
- .Seal()
- .Build());
- }
-
- auto winOnRows = ctx.NewCallable(node->Pos(), "WinOnRows", std::move(args));
-
- auto frames = ctx.Builder(node->Pos())
- .List()
- .Add(0, winOnRows)
- .Seal()
- .Build();
-
- TExprNode::TListType keys;
- for (auto p : win->Child(2)->Children()) {
- YQL_ENSURE(p->IsCallable("PgGroup"));
- const auto& member = p->Tail().Tail();
- YQL_ENSURE(member.IsCallable("Member"));
- keys.push_back(member.TailPtr());
- }
-
- auto keysNode = ctx.NewList(node->Pos(), std::move(keys));
- auto sortNode = ctx.NewCallable(node->Pos(), "Void", {});
- if (win->Child(3)->ChildrenSize() > 0) {
- if (win->Child(3)->ChildrenSize() == 1) {
- sortNode = ctx.Builder(node->Pos())
- .Callable("SortTraits")
- .Callable(0, "TypeOf")
- .Add(0, list)
- .Seal()
- .Callable(1, "Bool")
- .Atom(0, win->Child(3)->Head().Tail().Content() == "asc" ? "true" : "false")
- .Seal()
- .Lambda(2)
- .Param("row")
- .Apply(win->Child(3)->Head().ChildPtr(1))
- .With(0, "row")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- } else {
- sortNode = ctx.Builder(node->Pos())
- .Callable("SortTraits")
- .Callable(0, "TypeOf")
- .Add(0, list)
- .Seal()
- .List(1)
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < win->Child(3)->ChildrenSize(); ++i) {
- parent.Callable(i, "Bool")
- .Atom(0, win->Child(3)->Child(i)->Tail().Content() == "asc" ? "true" : "false")
- .Seal();
- }
- return parent;
- })
- .Seal()
- .Lambda(2)
- .Param("row")
- .List()
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < win->Child(3)->ChildrenSize(); ++i) {
- parent.Apply(i, win->Child(3)->Child(i)->ChildPtr(1))
- .With(0, "row")
- .Seal();
- }
-
- return parent;
- })
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
- }
-
- list = ctx.Builder(node->Pos())
- .Callable("CalcOverWindow")
- .Add(0, list)
- .Add(1, keysNode)
- .Add(2, sortNode)
- .Add(3, frames)
- .Seal()
- .Build();
- }
-
- auto status = OptimizeExpr(projectionLambda, projectionLambda, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
- auto it = winFuncsId.find(node.Get());
- if (it != winFuncsId.end()) {
- auto ret = ctx.Builder(node->Pos())
- .Callable("Member")
- .Add(0, projectionLambda->Head().HeadPtr())
- .Atom(1, "_yql_win_" + ToString(it->second))
- .Seal()
- .Build();
-
- if (!optCtx.Types->PgTypes && (node->Head().Content() == "row_number" || node->Head().Content() == "count")) {
- ret = ctx.Builder(node->Pos())
- .Callable("SafeCast")
- .Add(0, ret)
- .Atom(1, "Int64")
- .Seal()
- .Build();
- }
-
- return ret;
- }
-
- return node;
- }, ctx, TOptimizeExprSettings(optCtx.Types));
-
- if (status.Level == IGraphTransformer::TStatus::Error) {
- return nullptr;
- }
- }
-
- list = ctx.Builder(node->Pos())
- .Callable("Map")
- .Add(0, list)
- .Add(1, projectionLambda)
- .Seal()
- .Build();
- }
-
- setItemNodes.push_back(list);
- }
-
- TExprNode::TPtr list;
- if (setItemNodes.size() == 1) {
- list = setItemNodes.front();
- } else {
- list = ExpandPositionalUnionAll(*node, columnOrders, setItemNodes, ctx, optCtx);
- }
-
- auto sort = GetSetting(node->Head(), "sort");
- if (sort && sort->Tail().ChildrenSize() > 0) {
- const auto& keys = sort->Tail();
- auto argNode = ctx.NewArgument(node->Pos(), "row");
- auto argsNode = ctx.NewArguments(node->Pos(), { argNode });
-
- TExprNode::TListType dirItems;
- TExprNode::TListType rootItems;
- for (const auto& key : keys.Children()) {
- dirItems.push_back(ctx.Builder(node->Pos())
- .Callable("Bool")
- .Atom(0, key->Tail().Content() == "asc" ? "true" : "false")
- .Seal()
- .Build());
-
- auto keyLambda = key->ChildPtr(1);
- rootItems.push_back(ctx.ReplaceNode(keyLambda->TailPtr(), keyLambda->Head().Head(), argNode));
- }
-
- auto root = ctx.NewList(node->Pos(), std::move(rootItems));
- auto dir = ctx.NewList(node->Pos(), std::move(dirItems));
- auto lambda = ctx.NewLambda(node->Pos(), std::move(argsNode), std::move(root));
-
- list = ctx.Builder(node->Pos())
- .Callable("Sort")
- .Add(0, list)
- .Add(1, dir)
- .Add(2, lambda)
- .Seal()
- .Build();
- }
-
- auto limit = GetSetting(node->Head(), "limit");
- auto offset = GetSetting(node->Head(), "offset");
-
- if (offset) {
- list = ctx.Builder(node->Pos())
- .Callable("Skip")
- .Add(0, list)
- .Callable(1, "Unwrap")
- .Callable(0, "SafeCast")
- .Callable(0, "Coalesce")
- .Callable(0,"FromPg")
- .Add(0, offset->ChildPtr(1))
- .Seal()
- .Callable(1, "Int64")
- .Atom(0, "0")
- .Seal()
- .Seal()
- .Atom(1, "Uint64")
- .Seal()
- .Callable(1, "String")
- .Atom(0, "Negative offset")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
-
- if (limit) {
- list = ctx.Builder(node->Pos())
- .Callable("Take")
- .Add(0, list)
- .Callable(1, "Unwrap")
- .Callable(0, "SafeCast")
- .Callable(0, "Coalesce")
- .Callable(0,"FromPg")
- .Add(0, limit->ChildPtr(1))
- .Seal()
- .Callable(1, "Int64")
- .Atom(0, "9223372036854775807") // 2**63-1
- .Seal()
- .Seal()
- .Atom(1, "Uint64")
- .Seal()
- .Callable(1, "String")
- .Atom(0, "Negative limit")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
-
- return ctx.Builder(node->Pos())
- .Callable("AssumeColumnOrder")
- .Add(0, list)
- .Add(1, columns)
- .Seal()
- .Build();
- };
+ map["PgSelect"] = &ExpandPgSelect;
map["SqlColumnOrType"] = map["SqlPlainColumnOrType"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) {
YQL_CLOG(DEBUG, Core) << "Decay of never inspected " << node->Content();
diff --git a/ydb/library/yql/core/type_ann/CMakeLists.txt b/ydb/library/yql/core/type_ann/CMakeLists.txt
index 9f71af3f105..8f751e5e20e 100644
--- a/ydb/library/yql/core/type_ann/CMakeLists.txt
+++ b/ydb/library/yql/core/type_ann/CMakeLists.txt
@@ -26,11 +26,12 @@ target_link_libraries(yql-core-type_ann PUBLIC
yql-parser-pg_catalog
)
target_sources(yql-core-type_ann PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/type_ann/type_ann_core.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/type_ann/type_ann_core.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/type_ann/type_ann_expr.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/type_ann/type_ann_join.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/type_ann/type_ann_list.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/type_ann/type_ann_pg.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/type_ann/type_ann_types.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/type_ann/type_ann_wide.cpp
)
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 7c6473734be..5d4000e8b39 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -5,6 +5,7 @@
#include "type_ann_columnorder.h"
#include "type_ann_wide.h"
#include "type_ann_types.h"
+#include "type_ann_pg.h"
#include <ydb/library/yql/core/yql_atom_enums.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
@@ -12,7 +13,6 @@
#include <ydb/library/yql/core/yql_callable_transform.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_type_helpers.h>
-#include <ydb/library/yql/core/yql_pg_utils.h>
#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
#include <ydb/library/yql/core/issue/yql_issue.h>
#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h>
@@ -22,7 +22,6 @@
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
#include <ydb/library/yql/utils/utf8.h>
-#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <ydb/library/yql/minikql/mkql_type_ops.h>
@@ -8796,2544 +8795,6 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
return IGraphTransformer::TStatus::Ok;
}
- IGraphTransformer::TStatus PgStarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 0, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
- return IGraphTransformer::TStatus::Ok;
- }
-
- struct TPgFuncDesc {
- ui32 MinArgs;
- ui32 MaxArgs;
- EDataSlot ReturnType;
- TVector<EDataSlot> DataTypes;
- };
-
- class TPgFuncMap {
- public:
- static const TPgFuncMap& Instance() {
- return *Singleton<TPgFuncMap>();
- }
-
- THashMap<TString, TPgFuncDesc> Funcs;
-
- TPgFuncMap() {
- Funcs["substring"] = { 3, 3, EDataSlot::Utf8, { EDataSlot::Utf8, EDataSlot::Int32, EDataSlot::Int32 } };
- }
- };
-
- IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
- bool isResolved = input->Content().StartsWith("PgResolvedCall");
- if (!EnsureMinArgsCount(*input, isResolved ? 2 : 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto name = input->Head().Content();
-
- if (isResolved) {
- if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- if (ctx.Types.PgTypes || isResolved) {
- TVector<ui32> argTypes;
- bool needRetype = false;
- for (ui32 i = isResolved ? 2 : 1; i < input->ChildrenSize(); ++i) {
- auto type = input->Child(i)->GetTypeAnn();
- ui32 argType;
- bool convertToPg;
- if (!ExtractPgType(type, argType, convertToPg, input->Child(i)->Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (convertToPg) {
- input->ChildRef(i) = ctx.Expr.NewCallable(input->Child(i)->Pos(), "ToPg", { input->ChildPtr(i) });
- needRetype = true;
- }
-
- argTypes.push_back(argType);
- }
-
- if (needRetype) {
- return IGraphTransformer::TStatus::Repeat;
- }
-
- if (isResolved) {
- auto procId = FromString<ui32>(input->Child(1)->Content());
- const auto& proc = NPg::LookupProc(procId, argTypes);
- if (proc.Name != name) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Mismatch of resolved function name, expected: " << name << ", but got:" << proc.Name));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto result = ctx.Expr.MakeType<TPgExprType>(proc.ResultType);
- input->SetTypeAnn(result);
- return IGraphTransformer::TStatus::Ok;
- } else {
- const auto& proc = NPg::LookupProc(TString(name), argTypes);
- auto children = input->ChildrenList();
- auto idNode = ctx.Expr.NewAtom(input->Pos(), ToString(proc.ProcId));
- children.insert(children.begin() + 1, idNode);
- output = ctx.Expr.NewCallable(input->Pos(), "PgResolvedCall", std::move(children));
- return IGraphTransformer::TStatus::Repeat;
- }
- } else {
- const TTypeAnnotationNode* result = nullptr;
- TVector<const TTypeAnnotationNode*> argTypes;
- bool isNull = false;
- bool isOptional = false;
- for (ui32 i = 1; i < input->ChildrenSize(); ++i) {
- auto type = input->Child(i)->GetTypeAnn();
- if (type->GetKind() == ETypeAnnotationKind::Null) {
- argTypes.push_back(type);
- isNull = true;
- result = type;
- continue;
- }
-
- if (type->GetKind() == ETypeAnnotationKind::Optional) {
- type = RemoveOptionalType(type);
- isOptional = true;
- }
-
- argTypes.push_back(type);
- }
-
- const auto& funcs = TPgFuncMap::Instance().Funcs;
- auto it = funcs.find(name);
- if (it != funcs.end()) {
- const auto& desc = it->second;
- if (argTypes.size() > desc.MaxArgs || argTypes.size() < desc.MinArgs) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Incorrect arguments count: " << argTypes.size() << " for function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- for (ui32 i = 0; i < argTypes.size(); ++i) {
- auto expectedType = desc.DataTypes[i];
- if (argTypes[i]->GetKind() != ETypeAnnotationKind::Null) {
- if (argTypes[i]->GetKind() != ETypeAnnotationKind::Data) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected type " << expectedType << " for argument " << (i + 1) << ", but got: " << argTypes[i]->GetKind() << " for function: " << name));
- return IGraphTransformer::TStatus::Error;
- } else {
- auto dataType = argTypes[i]->Cast<TDataExprType>()->GetSlot();
- if (dataType != expectedType) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected type " << expectedType << " for argument " << (i + 1) << ", but got: " << dataType << " for function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
- }
- }
- }
-
- result = ctx.Expr.MakeType<TDataExprType>(desc.ReturnType);
- } else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Unsupported function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!isNull && isOptional && result->GetKind() != ETypeAnnotationKind::Optional) {
- result = ctx.Expr.MakeType<TOptionalExprType>(result);
- }
-
- input->SetTypeAnn(result);
- return IGraphTransformer::TStatus::Ok;
- }
- }
-
- IGraphTransformer::TStatus FromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureComputable(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (IsNull(input->Head())) {
- output = input->TailPtr();
- return IGraphTransformer::TStatus::Repeat;
- }
-
- if (input->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) {
- output = input->HeadPtr();
- return IGraphTransformer::TStatus::Repeat;
- }
-
- auto name = input->Head().GetTypeAnn()->Cast<TPgExprType>()->GetName();
- const TDataExprType* dataType;
- if (name == "bool") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Bool);
- } else if (name == "int2") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int16);
- } else if (name == "int4") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int32);
- } else if (name == "int8") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64);
- } else if (name == "float4") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Float);
- } else if (name == "float8") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Double);
- } else if (name == "text" || name == "varchar" || name == "cstring") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Utf8);
- } else if (name == "bytea") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::String);
- } else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Unsupported type: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto result = ctx.Expr.MakeType<TOptionalExprType>(dataType);
- input->SetTypeAnn(result);
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureComputable(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (IsNull(input->Head())) {
- output = input->TailPtr();
- return IGraphTransformer::TStatus::Repeat;
- }
-
- if (input->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) {
- output = input->HeadPtr();
- return IGraphTransformer::TStatus::Repeat;
- }
-
- bool isOptional;
- const TDataExprType* dataType;
- if (!EnsureDataOrOptionalOfData(input->Head(), isOptional, dataType, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- TString pgType;
- switch (dataType->GetSlot()) {
- case NUdf::EDataSlot::Bool:
- pgType = "bool";
- break;
- case NUdf::EDataSlot::Int16:
- pgType = "int2";
- break;
- case NUdf::EDataSlot::Int32:
- pgType = "int4";
- break;
- case NUdf::EDataSlot::Int64:
- pgType = "int8";
- break;
- case NUdf::EDataSlot::Float:
- pgType = "float4";
- break;
- case NUdf::EDataSlot::Double:
- pgType = "float8";
- break;
- case NUdf::EDataSlot::String:
- pgType = "bytea";
- break;
- case NUdf::EDataSlot::Utf8:
- pgType = "text";
- break;
- default:
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Unsupported type: " << dataType->GetName()));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto result = ctx.Expr.MakeType<TPgExprType>(NPg::LookupType(pgType).TypeId);
- input->SetTypeAnn(result);
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
- bool isResolved = input->IsCallable("PgResolvedOp");
- if (!EnsureMinArgsCount(*input, isResolved ? 3 : 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureMaxArgsCount(*input, isResolved ? 4 : 3, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto name = input->Head().Content();
- if (isResolved) {
- if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- TVector<ui32> argTypes;
- bool needRetype = false;
- for (ui32 i = isResolved ? 2 : 1; i < input->ChildrenSize(); ++i) {
- auto type = input->Child(i)->GetTypeAnn();
- ui32 argType;
- bool convertToPg;
- if (!ExtractPgType(type, argType, convertToPg, input->Child(i)->Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (convertToPg) {
- input->ChildRef(i) = ctx.Expr.NewCallable(input->Child(i)->Pos(), "ToPg", { input->ChildPtr(i) });
- needRetype = true;
- }
-
- argTypes.push_back(argType);
- }
-
- if (needRetype) {
- return IGraphTransformer::TStatus::Repeat;
- }
-
- if (isResolved) {
- auto operId = FromString<ui32>(input->Child(1)->Content());
- const auto& oper = NPg::LookupOper(operId, argTypes);
- if (oper.Name != name) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Mismatch of resolved operator name, expected: " << name << ", but got:" << oper.Name));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto result = ctx.Expr.MakeType<TPgExprType>(oper.ResultType);
- input->SetTypeAnn(result);
- return IGraphTransformer::TStatus::Ok;
- } else {
- const auto& oper = NPg::LookupOper(TString(name), argTypes);
- auto children = input->ChildrenList();
- auto idNode = ctx.Expr.NewAtom(input->Pos(), ToString(oper.OperId));
- children.insert(children.begin() + 1, idNode);
- output = ctx.Expr.NewCallable(input->Pos(), "PgResolvedOp", std::move(children));
- return IGraphTransformer::TStatus::Repeat;
- }
- }
-
- IGraphTransformer::TStatus PgWindowCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureMinArgsCount(*input, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto name = input->Child(0)->Content();
- if (!input->Child(1)->IsAtom() && !input->Child(1)->IsCallable("PgAnonWindow")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- "Expected either window name or reference to an inline window"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (name == "lead" || name == "lag") {
- if (input->ChildrenSize() != 3) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected one argument in " << name << " function"));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto arg = input->Child(2)->GetTypeAnn();
- if (arg->GetKind() == ETypeAnnotationKind::Null) {
- input->SetTypeAnn(arg);
- } else if (arg->GetKind() == ETypeAnnotationKind::Optional) {
- input->SetTypeAnn(arg);
- } else {
- input->SetTypeAnn(ctx.Expr.MakeType<TOptionalExprType>(arg));
- }
- } else if (name == "row_number") {
- if (input->ChildrenSize() != 2) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- "Expected no arguments in row_number function"));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto result = ctx.Expr.MakeType<TOptionalExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64));
- input->SetTypeAnn(result);
- } else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Unsupported function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgAggWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
- Y_UNUSED(output);
- bool overWindow = (input->Content() == "PgAggWindowCall");
- if (!EnsureMinArgsCount(*input, overWindow ? 2 : 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto name = input->Head().Content();
- if (overWindow) {
- if (!input->Child(1)->IsAtom() && !input->Child(1)->IsCallable("PgAnonWindow")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- "Expected either window name or reference to an inline window"));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- if (ctx.Types.PgTypes) {
- TVector<ui32> argTypes;
- bool needRetype = false;
- for (ui32 i = overWindow ? 2 : 1; i < input->ChildrenSize(); ++i) {
- auto type = input->Child(i)->GetTypeAnn();
- ui32 argType;
- bool convertToPg;
- if (!ExtractPgType(type, argType, convertToPg, input->Child(i)->Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (convertToPg) {
- input->ChildRef(i) = ctx.Expr.NewCallable(input->Child(i)->Pos(), "ToPg", { input->ChildPtr(i) });
- needRetype = true;
- }
-
- argTypes.push_back(argType);
- }
-
- if (needRetype) {
- return IGraphTransformer::TStatus::Repeat;
- }
-
- const auto& aggDesc = NPg::LookupAggregation(name, argTypes);
- if (aggDesc.Kind != NPg::EAggKind::Normal) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- "Only normal aggregation supported"));
- return IGraphTransformer::TStatus::Error;
- }
-
- ui32 resultType;
- if (!aggDesc.FinalFuncId) {
- resultType = aggDesc.TransTypeId;
- } else {
- resultType = NPg::LookupProc(aggDesc.FinalFuncId).ResultType;
- }
-
- auto result = ctx.Expr.MakeType<TPgExprType>(resultType);
- input->SetTypeAnn(result);
- return IGraphTransformer::TStatus::Ok;
- } else {
- const TTypeAnnotationNode* result = nullptr;
- TVector<const TTypeAnnotationNode*> argTypes;
- bool isNull = false;
- bool isOptional = false;
- for (ui32 i = overWindow ? 2 : 1; i < input->ChildrenSize(); ++i) {
- auto type = input->Child(i)->GetTypeAnn();
- if (type->GetKind() == ETypeAnnotationKind::Null) {
- argTypes.push_back(type);
- isNull = true;
- result = type;
- continue;
- }
-
- if (type->GetKind() == ETypeAnnotationKind::Optional) {
- type = RemoveOptionalType(type);
- isOptional = true;
- }
-
- argTypes.push_back(type);
- }
-
- if (name == "count") {
- if (argTypes.size() > 1) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Too many arguments for function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- isNull = false;
- isOptional = true;
- result = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64);
- } else if (name == "min" || name == "max") {
- if (argTypes.size() != 1) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected one argument for function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!isNull) {
- auto argType = argTypes[0];
- if (argType->GetKind() != ETypeAnnotationKind::Data) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected comparable type, but got: " << argType->GetKind() << " for function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto slot = argType->Cast<TDataExprType>()->GetSlot();
- if (slot == EDataSlot::Utf8 || slot == EDataSlot::Int32 || slot == EDataSlot::Double || slot == EDataSlot::Bool) {
- result = argType;
- } else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected comparable type, but got: " << slot << " for function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- isOptional = true;
- } else if (name == "sum") {
- if (argTypes.size() != 1) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected one argument for function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!isNull) {
- auto argType = argTypes[0];
- if (argType->GetKind() != ETypeAnnotationKind::Data) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected additive type, but got: " << argType->GetKind() << " for function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto slot = argType->Cast<TDataExprType>()->GetSlot();
- if (slot == EDataSlot::Int32) {
- result = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64);
- } else if (slot == EDataSlot::Double) {
- result = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Double);
- } else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Expected additive type, but got: " << slot << " for function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- isOptional = true;
- } else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Unsupported function: " << name));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!isNull && isOptional && result->GetKind() != ETypeAnnotationKind::Optional) {
- result = ctx.Expr.MakeType<TOptionalExprType>(result);
- }
-
- input->SetTypeAnn(result);
- return IGraphTransformer::TStatus::Ok;
- }
- }
-
- IGraphTransformer::TStatus PgQualifiedStarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgColumnRefWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureMaxArgsCount(*input, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- for (const auto& child : input->Children()) {
- if (!EnsureAtom(*child, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgResultItemWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 3, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (input->Head().IsList()) {
- for (const auto& x : input->Head().Children()) {
- if (!EnsureAtom(*x, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- }
- } else {
- if (!EnsureAtom(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- bool hasType = false;
- if (!input->Child(1)->IsCallable("Void")) {
- hasType = true;
- if (auto status = EnsureTypeRewrite(input->ChildRef(1), ctx.Expr); status != IGraphTransformer::TStatus::Ok) {
- return status;
- }
- }
-
- auto& lambda = input->ChildRef(2);
- const auto status = ConvertToLambda(lambda, ctx.Expr, hasType ? 1 : 0);
- if (status.Level != IGraphTransformer::TStatus::Ok) {
- return status;
- }
-
- if (!hasType) {
- input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
- return IGraphTransformer::TStatus::Ok;
- }
-
- if (!UpdateLambdaAllArgumentsTypes(lambda, { input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType() }, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!lambda->GetTypeAnn()) {
- return IGraphTransformer::TStatus::Repeat;
- }
-
- input->SetTypeAnn(lambda->GetTypeAnn());
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgWhereWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- bool hasType = false;
- if (!input->Child(0)->IsCallable("Void")) {
- hasType = true;
- if (auto status = EnsureTypeRewrite(input->ChildRef(0), ctx.Expr); status != IGraphTransformer::TStatus::Ok) {
- return status;
- }
- }
-
- auto& lambda = input->ChildRef(1);
- const auto status = ConvertToLambda(lambda, ctx.Expr, hasType ? 1 : 0);
- if (status.Level != IGraphTransformer::TStatus::Ok) {
- return status;
- }
-
- if (!hasType) {
- input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
- return IGraphTransformer::TStatus::Ok;
- }
-
- if (!UpdateLambdaAllArgumentsTypes(lambda, { input->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType() }, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!lambda->GetTypeAnn()) {
- return IGraphTransformer::TStatus::Repeat;
- }
-
- input->SetTypeAnn(lambda->GetTypeAnn());
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgSortWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 3, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(*input->Child(2), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (input->Child(2)->Content() != "asc" && input->Child(2)->Content() != "desc") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()),
- TStringBuilder() << "Unsupported sort direction: " << input->Child(2)->Content()));
- return IGraphTransformer::TStatus::Error;
- }
-
- bool hasType = false;
- if (!input->Child(0)->IsCallable("Void")) {
- hasType = true;
- if (auto status = EnsureTypeRewrite(input->ChildRef(0), ctx.Expr); status != IGraphTransformer::TStatus::Ok) {
- return status;
- }
- }
-
- auto& lambda = input->ChildRef(1);
- const auto status = ConvertToLambda(lambda, ctx.Expr, hasType ? 1 : 0);
- if (status.Level != IGraphTransformer::TStatus::Ok) {
- return status;
- }
-
- if (!hasType) {
- input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
- return IGraphTransformer::TStatus::Ok;
- }
-
- if (!UpdateLambdaAllArgumentsTypes(lambda, { input->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType() }, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!lambda->GetTypeAnn()) {
- return IGraphTransformer::TStatus::Repeat;
- }
-
- input->SetTypeAnn(lambda->GetTypeAnn());
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgWindowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 5, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (input->Child(1)->Content()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), "Window reference is not supported"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureTuple(*input->Child(2), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- for (const auto& x : input->Child(2)->Children()) {
- if (!x->IsCallable("PgGroup")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected PgGroup"));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- if (!EnsureTuple(*input->Child(3), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- for (const auto& x : input->Child(3)->Children()) {
- if (!x->IsCallable("PgSort")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected PgSort"));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- if (!EnsureTuple(*input->Child(4), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- bool hasType = false;
- bool hasFrom = false;
- bool hasTo = false;
- bool hasFromValue = false;
- bool hasToValue = false;
- bool needFromValue = false;
- bool needToValue = false;
-
- for (const auto& x : input->Child(4)->Children()) {
- if (!EnsureTupleMinSize(*x, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(x->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto optionName = x->Head().Content();
- if (optionName == "exclude") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Excludes are not supported"));
- return IGraphTransformer::TStatus::Error;
- } else if (optionName == "from_value" || optionName == "to_value") {
- hasFromValue = hasFromValue || (optionName == "from_value");
- hasToValue = hasToValue || (optionName == "to_value");
- if (!EnsureTupleSize(*x, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!x->Tail().IsCallable("Int32")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected Int32 as frame offset"));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto val = FromString<i32>(x->Tail().Head().Content());
- if (val < 0) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected non-negative value as frame offset"));
- return IGraphTransformer::TStatus::Error;
- }
- } else if (optionName == "type") {
- hasType = true;
- if (!EnsureTupleSize(*x, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(x->Tail(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto type = x->Tail().Content();
- if (type != "rows") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), TStringBuilder() << "Unsupported frame type: " << type));
- return IGraphTransformer::TStatus::Error;
- }
- } else if (optionName == "from" || optionName == "to") {
- hasFrom = hasFrom || (optionName == "from");
- hasTo = hasTo || (optionName == "to");
- if (!EnsureTupleSize(*x, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(x->Tail(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto bound = x->Tail().Content();
- if (!(bound == "up" || bound == "p" || bound == "c" || bound == "f" || bound == "uf")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), TStringBuilder() << "Unsupported frame bound: " << bound));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (bound == "p" || bound == "f") {
- needFromValue = needFromValue || (optionName == "from");
- needToValue = needToValue || (optionName == "to");
- }
-
- if (optionName == "from" && bound == "uf") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Unbounded following is unsupported as start offset"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (optionName == "to" && bound == "up") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Unbounded preceding is unsupported as end offset"));
- return IGraphTransformer::TStatus::Error;
- }
- } else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), TStringBuilder() << "Unknown option: " << optionName));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- if (hasType) {
- if (!hasFrom || !hasTo) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing offset specification in the frame"));
- return IGraphTransformer::TStatus::Error;
- }
- } else {
- if (hasFrom || hasTo) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Unexpected offset specification in the frame"));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- if (needFromValue != hasFromValue || needToValue != hasToValue) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Wrong offset value in the frame"));
- return IGraphTransformer::TStatus::Error;
- }
-
- input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgAnonWindowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- ui32 n;
- if (!TryFromString(input->Child(0)->Content(), n)) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Expected number"));
- return IGraphTransformer::TStatus::Error;
- }
-
- input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgConstWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureTypePg(input->Tail(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- // TODO: validate value
- if (!EnsureAtom(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- input->SetTypeAnn(input->Tail().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgInternal0Wrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 0, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto internalId = NPg::LookupType("internal").TypeId;
- input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(internalId));
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto type = input->Head().GetTypeAnn();
- ui32 inputTypeId = 0;
- bool convertToPg;
- if (!ExtractPgType(type, inputTypeId, convertToPg, input->Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (convertToPg) {
- input->ChildRef(0) = ctx.Expr.NewCallable(input->Child(0)->Pos(), "ToPg", { input->ChildPtr(0) });
- return IGraphTransformer::TStatus::Repeat;
- }
-
- if (!EnsureTypePg(input->Tail(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- auto targetTypeId = input->Tail().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TPgExprType>()->GetId();
-
- if (inputTypeId != 0 && inputTypeId != targetTypeId) {
- if (NPg::LookupType(inputTypeId).Category != 'S' &&
- NPg::LookupType(targetTypeId).Category != 'S') {
- Y_UNUSED(NPg::LookupCast(inputTypeId, targetTypeId));
- }
- }
-
- input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(targetTypeId));
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- if (!EnsureArgsCount(*input, 3, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto func = input->Child(0)->Content();
- if (!EnsureType(*input->Child(1), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto itemType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- auto& lambda = input->ChildRef(2);
- auto convertStatus = ConvertToLambda(lambda, ctx.Expr, 1);
- if (convertStatus.Level != IGraphTransformer::TStatus::Ok) {
- return convertStatus;
- }
-
- if (!UpdateLambdaAllArgumentsTypes(lambda, { itemType }, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto lambdaResult = lambda->GetTypeAnn();
- if (!lambdaResult) {
- return IGraphTransformer::TStatus::Repeat;
- }
-
- TVector<ui32> argTypes;
- bool needRetype = false;
- for (ui32 i = 1; i < lambda->ChildrenSize(); ++i) {
- auto type = lambda->Child(i)->GetTypeAnn();
- ui32 argType;
- bool convertToPg;
- if (!ExtractPgType(type, argType, convertToPg, lambda->Child(i)->Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (convertToPg) {
- needRetype = true;
- }
-
- argTypes.push_back(argType);
- }
-
- if (needRetype) {
- lambda = ctx.Expr.DeepCopyLambda(*lambda);
- for (ui32 i = 1; i < lambda->ChildrenSize(); ++i) {
- auto type = lambda->Child(i)->GetTypeAnn();
- ui32 argType;
- bool convertToPg;
- if (!ExtractPgType(type, argType, convertToPg, lambda->Child(i)->Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (convertToPg) {
- lambda->ChildRef(i) = ctx.Expr.NewCallable(lambda->Child(i)->Pos(), "ToPg", { lambda->ChildPtr(i) });
- }
- }
-
- return IGraphTransformer::TStatus::Repeat;
- }
-
- const auto& aggDesc = NPg::LookupAggregation(func, argTypes);
- if (aggDesc.Kind != NPg::EAggKind::Normal) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- "Only normal aggregation supported"));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto idLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("state")
- .Arg("state")
- .Seal()
- .Build();
-
- auto saveLambda = idLambda;
- auto loadLambda = idLambda;
- auto finishLambda = idLambda;
- if (aggDesc.FinalFuncId) {
- finishLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("state")
- .Callable("PgResolvedCallCtx")
- .Atom(0, NPg::LookupProc(aggDesc.FinalFuncId).Name)
- .Atom(1, ToString(aggDesc.FinalFuncId))
- .Arg(2, "state")
- .Seal()
- .Seal()
- .Build();
- }
-
- auto nullValue = ctx.Expr.NewCallable(input->Pos(), "Null", {});
- auto initValue = nullValue;
- if (aggDesc.InitValue) {
- initValue = ctx.Expr.Builder(input->Pos())
- .Callable("PgCast")
- .Callable(0, "PgConst")
- .Atom(0, aggDesc.InitValue)
- .Callable(1, "PgType")
- .Atom(0, "text")
- .Seal()
- .Seal()
- .Callable(1, "PgType")
- .Atom(0, NPg::LookupType(aggDesc.TransTypeId).Name)
- .Seal()
- .Seal()
- .Build();
- }
-
- const auto& transFuncDesc = NPg::LookupProc(aggDesc.TransFuncId);
- // use first non-null value as state if transFunc is strict
- bool searchNonNullForState = false;
- if (transFuncDesc.IsStrict && !aggDesc.InitValue) {
- Y_ENSURE(argTypes.size() == 1);
- searchNonNullForState = true;
- }
-
- TExprNode::TPtr initLambda, updateLambda;
- if (!searchNonNullForState) {
- initLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("row")
- .Callable("PgResolvedCallCtx")
- .Atom(0, transFuncDesc.Name)
- .Atom(1, ToString(aggDesc.TransFuncId))
- .Add(2, initValue)
- .Apply(3, lambda)
- .With(0, "row")
- .Seal()
- .Seal()
- .Seal()
- .Build();
-
- updateLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("row")
- .Param("state")
- .Callable("Coalesce")
- .Callable(0, "PgResolvedCallCtx")
- .Atom(0, transFuncDesc.Name)
- .Atom(1, ToString(aggDesc.TransFuncId))
- .Callable(2, "Coalesce")
- .Arg(0, "state")
- .Add(1, initValue)
- .Seal()
- .Apply(3, lambda)
- .With(0, "row")
- .Seal()
- .Seal()
- .Arg(1, "state")
- .Seal()
- .Seal()
- .Build();
- } else {
- initLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("row")
- .Apply(lambda)
- .With(0, "row")
- .Seal()
- .Seal()
- .Build();
-
- updateLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("row")
- .Param("state")
- .Callable("If")
- .Callable(0, "Exists")
- .Arg(0, "state")
- .Seal()
- .Callable(1, "Coalesce")
- .Callable(0, "PgResolvedCallCtx")
- .Atom(0, transFuncDesc.Name)
- .Atom(1, ToString(aggDesc.TransFuncId))
- .Arg(2, "state")
- .Apply(3, lambda)
- .With(0, "row")
- .Seal()
- .Seal()
- .Arg(1, "state")
- .Seal()
- .Apply(2, lambda)
- .With(0, "row")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
-
- auto mergeLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("state1")
- .Param("state2")
- .Callable("Void")
- .Seal()
- .Seal()
- .Build();
-
- auto zero = ctx.Expr.Builder(input->Pos())
- .Callable("PgConst")
- .Atom(0, "0")
- .Callable(1, "PgType")
- .Atom(0, "int8")
- .Seal()
- .Seal()
- .Build();
-
- auto defaultValue = (func != "count") ? nullValue : zero;
-
- if (aggDesc.SerializeFuncId) {
- const auto& serializeFuncDesc = NPg::LookupProc(aggDesc.SerializeFuncId);
- saveLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("state")
- .Callable("PgResolvedCallCtx")
- .Atom(0, serializeFuncDesc.Name)
- .Atom(1, ToString(aggDesc.SerializeFuncId))
- .Arg(2, "state")
- .Seal()
- .Seal()
- .Build();
- }
-
- if (aggDesc.DeserializeFuncId) {
- const auto& deserializeFuncDesc = NPg::LookupProc(aggDesc.DeserializeFuncId);
- loadLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("state")
- .Callable("PgResolvedCallCtx")
- .Atom(0, deserializeFuncDesc.Name)
- .Atom(1, ToString(aggDesc.DeserializeFuncId))
- .Arg(2, "state")
- .Callable(3, "PgInternal0")
- .Seal()
- .Seal()
- .Seal()
- .Build();
- }
-
- if (aggDesc.CombineFuncId) {
- const auto& combineFuncDesc = NPg::LookupProc(aggDesc.CombineFuncId);
- if (combineFuncDesc.IsStrict) {
- mergeLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("state1")
- .Param("state2")
- .Callable("If")
- .Callable(0, "Exists")
- .Arg(0, "state1")
- .Seal()
- .Callable(1, "Coalesce")
- .Callable(0, "PgResolvedCallCtx")
- .Atom(0, combineFuncDesc.Name)
- .Atom(1, ToString(aggDesc.CombineFuncId))
- .Arg(2, "state1")
- .Arg(3, "state2")
- .Seal()
- .Arg(1, "state1")
- .Seal()
- .Arg(2, "state2")
- .Seal()
- .Seal()
- .Build();
- } else {
- mergeLambda = ctx.Expr.Builder(input->Pos())
- .Lambda()
- .Param("state1")
- .Param("state2")
- .Callable("PgResolvedCallCtx")
- .Atom(0, combineFuncDesc.Name)
- .Atom(1, ToString(aggDesc.CombineFuncId))
- .Arg(2, "state1")
- .Arg(3, "state2")
- .Seal()
- .Seal()
- .Build();
- }
- }
-
- auto typeNode = ExpandType(input->Pos(), *itemType, ctx.Expr);
- output = ctx.Expr.Builder(input->Pos())
- .Callable("AggregationTraits")
- .Add(0, typeNode)
- .Add(1, initLambda)
- .Add(2, updateLambda)
- .Add(3, saveLambda)
- .Add(4, loadLambda)
- .Add(5, mergeLambda)
- .Add(6, finishLambda)
- .Add(7, defaultValue)
- .Seal()
- .Build();
-
- return IGraphTransformer::TStatus::Repeat;
- }
-
- IGraphTransformer::TStatus PgTypeWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto type = TString(input->Child(0)->Content());
- if (!NPg::HasType(type)) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(0)->Pos()),
- TStringBuilder() << "Unknown type: '" << type << "'"));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto typeId = NPg::LookupType(type).TypeId;
- input->SetTypeAnn(ctx.Expr.MakeType<TTypeExprType>(ctx.Expr.MakeType<TPgExprType>(typeId)));
- return IGraphTransformer::TStatus::Ok;
- }
-
- using TInputs = TVector<std::tuple<TString, const TStructExprType*, TMaybe<TColumnOrder>>>;
-
- bool ScanColumns(TExprNode::TPtr root, const TInputs& inputs, const THashSet<TString>& possibleAliases,
- bool* hasStar, bool& hasColumnRef, THashSet<TString>& refs, THashMap<TString, THashSet<TString>>* qualifiedRefs,
- TExtContext& ctx) {
- bool isError = false;
- VisitExpr(root, [&](const TExprNode::TPtr& node) {
- if (node->IsCallable("PgStar")) {
- if (!hasStar) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is not allowed here"));
- isError = true;
- return false;
- }
-
- if (*hasStar) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Duplicate star"));
- isError = true;
- return false;
- }
-
- if (hasColumnRef) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is incompatible to column reference"));
- isError = true;
- return false;
- }
-
- *hasStar = true;
- if (inputs.empty()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star can't be used without FROM"));
- isError = true;
- return false;
- }
- }
- else if (node->IsCallable("PgQualifiedStar")) {
- if (!hasStar || !qualifiedRefs) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is not allowed here"));
- isError = true;
- return false;
- }
-
- if (*hasStar) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is incompatible to column reference"));
- isError = true;
- return false;
- }
-
- hasColumnRef = true;
- if (inputs.empty()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Column reference can't be used without FROM"));
- isError = true;
- return false;
- }
-
- TString alias(node->Head().Content());
- if (possibleAliases.find(alias) == possibleAliases.end()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), TStringBuilder() << "Unknown alias: " << alias));
- isError = true;
- return false;
- }
-
- for (const auto& x : inputs) {
- if (std::get<0>(x).empty() || alias != std::get<0>(x)) {
- continue;
- }
-
- for (const auto& item : std::get<1>(x)->GetItems()) {
- if (!item->GetName().StartsWith("_yql_")) {
- (*qualifiedRefs)[alias].insert(TString(item->GetName()));
- }
- }
- }
- }
- else if (node->IsCallable("PgColumnRef")) {
- if (hasStar && *hasStar) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is incompatible to column reference"));
- isError = true;
- return false;
- }
-
- hasColumnRef = true;
- if (inputs.empty()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Column reference can't be used without FROM"));
- isError = true;
- return false;
- }
-
- if (node->ChildrenSize() == 2 && possibleAliases.find(node->Head().Content()) == possibleAliases.end()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), TStringBuilder() << "Unknown alias: " << node->Head().Content()));
- isError = true;
- return false;
- }
-
- ui32 matches = 0;
- for (const auto& x : inputs) {
- if (node->ChildrenSize() == 2) {
- if (std::get<0>(x).empty() || node->Head().Content() != std::get<0>(x)) {
- continue;
- }
- }
-
- auto pos = std::get<1>(x)->FindItem(node->Tail().Content());
- if (pos) {
- ++matches;
- if (matches > 1) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()),
- TStringBuilder() << "Column reference is ambiguous: " << node->Tail().Content()));
- isError = true;
- return false;
- }
- }
- }
-
- refs.insert(TString(node->Tail().Content()));
- }
-
- return true;
- });
-
- return !isError;
- }
-
- bool ValidateWindowRefs(const TExprNode::TPtr& root, const TExprNode* windows, TExprContext& ctx) {
- bool isError = false;
- VisitExpr(root, [&](const TExprNode::TPtr& node) {
- if (node->IsCallable("PgWindowCall")) {
- if (!windows) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
- "No window definitions"));
- isError = true;
- return false;
- }
-
- auto ref = node->Child(1);
- if (ref->IsAtom()) {
- auto name = ref->Content();
- if (!name) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
- "Empty window name is not allowed"));
- isError = true;
- return false;
- }
-
- bool found = false;
- for (const auto& x : windows->Children()) {
- if (x->Head().Content() == name) {
- found = true;
- break;
- }
- }
-
- if (!found) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
- TStringBuilder() << "Not found window name: " << name));
- isError = true;
- return false;
- }
- } else {
- YQL_ENSURE(ref->IsCallable("PgAnonWindow"));
- auto index = FromString<ui32>(ref->Head().Content());
- if (index >= windows->ChildrenSize()) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
- "Wrong index of window"));
- isError = true;
- return false;
- }
- }
- }
-
- return true;
- });
-
- return !isError;
- }
-
- void AddColumns(const TInputs& inputs, const bool* hasStar, const THashSet<TString>& refs,
- const THashMap<TString, THashSet<TString>>* qualifiedRefs,
- TVector<const TItemExprType*>& items) {
- for (const auto& x : inputs) {
- if (hasStar && *hasStar) {
- for (ui32 i = 0; i < std::get<1>(x)->GetSize(); ++i) {
- auto item = std::get<1>(x)->GetItems()[i];
- if (!item->GetName().StartsWith("_yql_")) {
- items.push_back(item);
- }
- }
-
- continue;
- }
-
- for (const auto& ref : refs) {
- auto pos = std::get<1>(x)->FindItem(ref);
- if (pos) {
- items.push_back(std::get<1>(x)->GetItems()[*pos]);
- }
- }
-
- if (qualifiedRefs && qualifiedRefs->contains(std::get<0>(x))) {
- for (const auto& ref : qualifiedRefs->find(std::get<0>(x))->second) {
- auto pos = std::get<1>(x)->FindItem(ref);
- if (pos) {
- items.push_back(std::get<1>(x)->GetItems()[*pos]);
- }
- }
- }
- }
- }
-
- IGraphTransformer::TStatus RebuildLambdaColumns(const TExprNode::TPtr& root, const TExprNode::TPtr& argNode,
- TExprNode::TPtr& newRoot, const TInputs& inputs, TExprNode::TPtr* expandedColumns, TExtContext& ctx) {
- return OptimizeExpr(root, newRoot, [&](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
- if (node->IsCallable("PgStar")) {
- TExprNode::TListType orderAtoms;
- for (const auto& x : inputs) {
- auto order = std::get<2>(x);
- for (const auto& item : std::get<1>(x)->GetItems()) {
- if (!item->GetName().StartsWith("_yql_")) {
- if (!order) {
- orderAtoms.push_back(ctx.Expr.NewAtom(node->Pos(), item->GetName()));
- }
- }
- }
-
- if (order) {
- for (const auto& o : *order) {
- if (!o.StartsWith("_yql_")) {
- orderAtoms.push_back(ctx.Expr.NewAtom(node->Pos(), o));
- }
- }
- }
- }
-
- if (expandedColumns) {
- *expandedColumns = ctx.Expr.NewList(node->Pos(), std::move(orderAtoms));
- }
-
- return argNode;
- }
-
- if (node->IsCallable("PgColumnRef")) {
- return ctx.Expr.Builder(node->Pos())
- .Callable("Member")
- .Add(0, argNode)
- .Atom(1, node->Tail().Content())
- .Seal()
- .Build();
- }
-
- if (node->IsCallable("PgQualifiedStar")) {
- TExprNode::TListType members;
- for (const auto& x : inputs) {
- if (std::get<0>(x).empty() || node->Head().Content() != std::get<0>(x)) {
- continue;
- }
-
- auto order = std::get<2>(x);
- TExprNode::TListType orderAtoms;
- for (const auto& item : std::get<1>(x)->GetItems()) {
- if (!item->GetName().StartsWith("_yql_")) {
- if (!order) {
- orderAtoms.push_back(ctx.Expr.NewAtom(node->Pos(), item->GetName()));
- }
-
- members.push_back(ctx.Expr.Builder(node->Pos())
- .List()
- .Atom(0, item->GetName())
- .Callable(1, "Member")
- .Add(0, argNode)
- .Atom(1, item->GetName())
- .Seal()
- .Seal()
- .Build());
- }
- }
-
- if (order) {
- for (const auto& o : *order) {
- if (!o.StartsWith("_yql_")) {
- orderAtoms.push_back(ctx.Expr.NewAtom(node->Pos(), o));
- }
- }
- }
-
- if (expandedColumns) {
- *expandedColumns = ctx.Expr.NewList(node->Pos(), std::move(orderAtoms));
- }
-
- return ctx.Expr.NewCallable(node->Pos(), "AsStruct", std::move(members));
- }
-
- YQL_ENSURE(false, "missing input");
- }
-
- return node;
- }, ctx.Expr, TOptimizeExprSettings(nullptr));
- }
-
- void MakeOptionalColumns(const TStructExprType*& structType, TExprContext& ctx) {
- bool needRebuild = false;
- for (const auto& item : structType->GetItems()) {
- if (item->GetItemType()->GetKind() != ETypeAnnotationKind::Optional
- && item->GetItemType()->GetKind() != ETypeAnnotationKind::Null) {
- needRebuild = true;
- break;
- }
- }
-
- if (!needRebuild) {
- return;
- }
-
- auto newItems = structType->GetItems();
- for (auto& item : newItems) {
- if (item->GetItemType()->GetKind() != ETypeAnnotationKind::Optional
- && item->GetItemType()->GetKind() != ETypeAnnotationKind::Null) {
- item = ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TOptionalExprType>(item->GetItemType()));
- }
- }
-
- structType = ctx.MakeType<TStructExprType>(newItems);
- }
-
- bool ValidateGroups(const TInputs& inputs, const THashSet<TString>& possibleAliases,
- const TExprNode& data, TExtContext& ctx, TExprNode::TListType& newGroups) {
- newGroups.clear();
- bool hasColumnRef = false;
- for (const auto& group : data.Children()) {
- if (!group->IsCallable("PgGroup")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(group->Pos()), "Expected PgGroup"));
- return false;
- }
-
- YQL_ENSURE(group->Tail().IsLambda());
- THashSet<TString> refs;
- THashMap<TString, THashSet<TString>> qualifiedRefs;
- if (group->Child(0)->IsCallable("Void")) {
- // no effective type yet, scan lambda body
- if (!ScanColumns(group->Tail().TailPtr(), inputs, possibleAliases, nullptr, hasColumnRef,
- refs, &qualifiedRefs, ctx)) {
- return false;
- }
-
- TVector<const TItemExprType*> items;
- AddColumns(inputs, nullptr, refs, &qualifiedRefs, items);
- auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
- if (!effectiveType->Validate(group->Pos(), ctx.Expr)) {
- return false;
- }
-
- auto typeNode = ExpandType(group->Pos(), *effectiveType, ctx.Expr);
-
- auto argNode = ctx.Expr.NewArgument(group->Pos(), "row");
- auto arguments = ctx.Expr.NewArguments(group->Pos(), { argNode });
- TExprNode::TPtr newRoot;
- auto status = RebuildLambdaColumns(group->Tail().TailPtr(), argNode, newRoot, inputs, nullptr, ctx);
- if (status == IGraphTransformer::TStatus::Error) {
- return false;
- }
-
- auto newLambda = ctx.Expr.NewLambda(group->Pos(), std::move(arguments), std::move(newRoot));
-
- auto newChildren = group->ChildrenList();
- newChildren[0] = typeNode;
- newChildren[1] = newLambda;
- auto newGroup = ctx.Expr.NewCallable(group->Pos(), "PgGroup", std::move(newChildren));
- newGroups.push_back(newGroup);
- }
- }
-
- return true;
- }
-
- bool ValidateSort(const TInputs& inputs, const THashSet<TString>& possibleAliases,
- const TExprNode& data, TExtContext& ctx, TExprNode::TListType& newSorts) {
- newSorts.clear();
- for (auto oneSort : data.Children()) {
- bool hasColumnRef;
- THashSet<TString> refs;
- THashMap<TString, THashSet<TString>> qualifiedRefs;
- if (!ScanColumns(oneSort->Child(1)->TailPtr(), inputs, possibleAliases, nullptr, hasColumnRef,
- refs, &qualifiedRefs, ctx)) {
- return false;
- }
-
- TVector<const TItemExprType*> items;
- AddColumns(inputs, nullptr, refs, &qualifiedRefs, items);
- auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
- if (!effectiveType->Validate(oneSort->Pos(), ctx.Expr)) {
- return false;
- }
-
- auto typeNode = ExpandType(oneSort->Pos(), *effectiveType, ctx.Expr);
-
- auto argNode = ctx.Expr.NewArgument(oneSort->Pos(), "row");
- auto arguments = ctx.Expr.NewArguments(oneSort->Pos(), { argNode });
- TExprNode::TPtr newRoot;
- auto status = RebuildLambdaColumns(oneSort->Child(1)->TailPtr(), argNode, newRoot, inputs, nullptr, ctx);
- if (status == IGraphTransformer::TStatus::Error) {
- return false;
- }
-
- auto newLambda = ctx.Expr.NewLambda(oneSort->Pos(), std::move(arguments), std::move(newRoot));
-
- auto newChildren = oneSort->ChildrenList();
- newChildren[0] = typeNode;
- newChildren[1] = newLambda;
- auto newSort = ctx.Expr.ChangeChildren(*oneSort, std::move(newChildren));
- newSorts.push_back(newSort);
- }
-
- return true;
- }
-
- IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
- if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& options = input->Head();
- if (!EnsureTuple(options, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const TStructExprType* outputRowType = nullptr;
- TInputs inputs;
- TInputs joinInputs;
- THashSet<TString> possibleAliases;
- bool hasResult = false;
- bool hasValues = false;
- bool hasJoinOps = false;
-
- // pass 0 - from/values
- // pass 1 - join
- // pass 2 - where, group_by,
- // pass 3 - window
- // pass 4 - result
- for (ui32 pass = 0; pass < 5; ++pass) {
- if (pass > 1 && !inputs.empty() && !hasJoinOps) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing join_ops"));
- return IGraphTransformer::TStatus::Error;
- }
-
- for (const auto& option : options.Children()) {
- if (!EnsureTupleMinSize(*option, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(option->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto optionName = option->Head().Content();
- if (optionName == "values") {
- hasValues = true;
- if (pass != 0) {
- continue;
- }
-
- if (!EnsureTupleSize(*option, 3, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto values = option->Child(2);
- if (!EnsureListType(*values, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto listType = values->GetTypeAnn()->Cast<TListExprType>();
- if (!EnsureTupleType(values->Pos(), *listType->GetItemType(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto tupleType = listType->GetItemType()->Cast<TTupleExprType>();
- auto names = option->Child(1);
- if (!EnsureTupleSize(*names, tupleType->GetSize(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- TVector<const TItemExprType*> outputItems;
- TVector<TString> columns;
- for (ui32 i = 0; i < names->ChildrenSize(); ++i) {
- if (!EnsureAtom(*names->Child(i), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(names->Child(i)->Content(), tupleType->GetItems()[i]));
- }
-
- outputRowType = ctx.Expr.MakeType<TStructExprType>(outputItems);
- if (!outputRowType->Validate(names->Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- } else if (optionName == "result") {
- hasResult = true;
- if (pass != 4) {
- continue;
- }
-
- if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& data = option->Tail();
- if (!EnsureTuple(data, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- TVector<const TItemExprType*> outputItems;
- TExprNode::TListType newResult;
- bool hasStar = false;
- bool hasColumnRef = false;
- for (const auto& column : data.Children()) {
- if (!column->IsCallable("PgResultItem")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(column->Pos()), "Expected PgResultItem"));
- return IGraphTransformer::TStatus::Error;
- }
-
- YQL_ENSURE(column->Tail().IsLambda());
- THashSet<TString> refs;
- THashMap<TString, THashSet<TString>> qualifiedRefs;
- if (column->Child(1)->IsCallable("Void")) {
- // no effective type yet, scan lambda body
- if (!ScanColumns(column->Tail().TailPtr(), joinInputs, possibleAliases, &hasStar, hasColumnRef,
- refs, &qualifiedRefs, ctx)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- TVector<const TItemExprType*> items;
- AddColumns(joinInputs, &hasStar, refs, &qualifiedRefs, items);
- auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
- if (!effectiveType->Validate(column->Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto expandedColumns = column->HeadPtr();
- auto typeNode = ExpandType(column->Pos(), *effectiveType, ctx.Expr);
-
- auto argNode = ctx.Expr.NewArgument(column->Pos(), "row");
- auto arguments = ctx.Expr.NewArguments(column->Pos(), { argNode });
- TExprNode::TPtr newRoot;
- auto status = RebuildLambdaColumns(column->Tail().TailPtr(), argNode, newRoot, joinInputs, &expandedColumns, ctx);
- if (status == IGraphTransformer::TStatus::Error) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto newLambda = ctx.Expr.NewLambda(column->Pos(), std::move(arguments), std::move(newRoot));
-
- auto newColumnChildren = column->ChildrenList();
- newColumnChildren[0] = expandedColumns;
- newColumnChildren[1] = typeNode;
- newColumnChildren[2] = newLambda;
- auto newColumn = ctx.Expr.NewCallable(column->Pos(), "PgResultItem", std::move(newColumnChildren));
- newResult.push_back(newColumn);
- } else {
- if (column->Head().IsAtom()) {
- outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(column->Head().Content(), column->Tail().GetTypeAnn()));
- } else {
- // star or qualified star
- for (const auto& item : column->Tail().GetTypeAnn()->Cast<TStructExprType>()->GetItems()) {
- outputItems.push_back(item);
- }
- }
-
- // scan lambda for window references
- auto windows = GetSetting(options, "window");
- if (!ValidateWindowRefs(column->TailPtr(), windows ? &windows->Tail() : nullptr, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- }
- }
-
- if (!newResult.empty()) {
- auto resultValue = ctx.Expr.NewList(options.Pos(), std::move(newResult));
- auto newSettings = ReplaceSetting(options, {}, "result", resultValue, ctx.Expr);
- output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
- return IGraphTransformer::TStatus::Repeat;
- }
-
- outputRowType = ctx.Expr.MakeType<TStructExprType>(outputItems);
- if (!outputRowType->Validate(data.Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- } else if (optionName == "from") {
- if (pass != 0) {
- continue;
- }
-
- if (!EnsureTuple(*option, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& data = option->Tail();
- if (!EnsureTupleMinSize(data, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- for (const auto& p : data.Children()) {
- if (!EnsureTupleSize(*p, 3, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(*p->Child(1), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureTuple(*p->Child(2), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- for (const auto& name : p->Child(2)->Children()) {
- if (!EnsureAtom(*name, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- auto columnOrder = ctx.Types.LookupColumnOrder(p->Head());
- if (!EnsureListType(p->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto inputRowType = p->Head().GetTypeAnn()->Cast<TListExprType>()->GetItemType();
- if (!EnsureStructType(p->Head().Pos(), *inputRowType, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto inputStructType = inputRowType->Cast<TStructExprType>();
- auto alias = TString(p->Child(1)->Content());
- if (!alias.empty()) {
- if (!possibleAliases.insert(alias).second) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
- TStringBuilder() << "Duplicated alias: " << alias));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- if (p->Child(2)->ChildrenSize() > 0) {
- // explicit columns
- ui32 realColumns = 0;
- for (const auto& item : inputStructType->GetItems()) {
- if (!item->GetName().StartsWith("_yql_")) {
- ++realColumns;
- }
- }
-
- if (realColumns != p->Child(2)->ChildrenSize()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
- TStringBuilder() << "Wrong number of columns, expected: " << realColumns
- << ", got: " << p->Child(2)->ChildrenSize()));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!columnOrder) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
- "No column order at source"));
- return IGraphTransformer::TStatus::Error;
- }
-
- TVector<const TItemExprType*> newStructItems;
- TColumnOrder newOrder;
- for (ui32 i = 0; i < p->Child(2)->ChildrenSize(); ++i) {
- auto pos = inputStructType->FindItem((*columnOrder)[i]);
- YQL_ENSURE(pos);
- auto type = inputStructType->GetItems()[*pos]->GetItemType();
- newOrder.push_back(TString(p->Child(2)->Child(i)->Content()));
- newStructItems.push_back(ctx.Expr.MakeType<TItemExprType>(p->Child(2)->Child(i)->Content(), type));
- }
-
- auto newStructType = ctx.Expr.MakeType<TStructExprType>(newStructItems);
- if (!newStructType->Validate(p->Child(2)->Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- inputs.push_back(std::make_tuple(alias, newStructType, newOrder));
- } else {
- inputs.push_back(std::make_tuple(alias, inputStructType, columnOrder));
- }
- }
- } else if (optionName == "where" || optionName == "having") {
- if (pass != 2) {
- continue;
- }
-
- if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& data = option->Tail();
- if (!data.IsCallable("PgWhere")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()), "Expected PgWhere"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (data.Child(0)->IsCallable("Void")) {
- // no effective type yet, scan lambda body
- bool hasColumnRef;
- THashSet<TString> refs;
- if (!ScanColumns(data.Child(1)->TailPtr(), joinInputs, possibleAliases, nullptr, hasColumnRef,
- refs, nullptr, ctx)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- TVector<const TItemExprType*> items;
- AddColumns(joinInputs, nullptr, refs, nullptr, items);
- auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
- if (!effectiveType->Validate(data.Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto typeNode = ExpandType(data.Pos(), *effectiveType, ctx.Expr);
-
- auto argNode = ctx.Expr.NewArgument(data.Pos(), "row");
- auto arguments = ctx.Expr.NewArguments(data.Pos(), { argNode });
- TExprNode::TPtr newRoot;
- auto status = RebuildLambdaColumns(data.Child(1)->TailPtr(), argNode, newRoot, joinInputs, nullptr, ctx);
- if (status == IGraphTransformer::TStatus::Error) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto newLambda = ctx.Expr.NewLambda(data.Pos(), std::move(arguments), std::move(newRoot));
-
- auto newChildren = data.ChildrenList();
- newChildren[0] = typeNode;
- newChildren[1] = newLambda;
- auto newWhere= ctx.Expr.NewCallable(data.Pos(), "PgWhere", std::move(newChildren));
- auto newSettings = ReplaceSetting(options, {}, TString(optionName), newWhere, ctx.Expr);
- output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
- return IGraphTransformer::TStatus::Repeat;
- } else {
- if (data.GetTypeAnn() && data.GetTypeAnn()->GetKind() == ETypeAnnotationKind::Null) {
- // nothing to do
- } else if (data.GetTypeAnn() && data.GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) {
- auto name = data.GetTypeAnn()->Cast<TPgExprType>()->GetName();
- if (name != "bool") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(data.Pos()), TStringBuilder() <<
- "Expected bool type, but got: " << name));
- return IGraphTransformer::TStatus::Error;
- }
- } else if (!EnsureSpecificDataType(data, EDataSlot::Bool, ctx.Expr, true)) {
- return IGraphTransformer::TStatus::Error;
- }
- }
- } else if (optionName == "join_ops") {
- if (pass != 1) {
- continue;
- }
-
- hasJoinOps = true;
- if (hasValues) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Join and values options are not compatible"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (inputs.empty()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "At least one input expected"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& data = option->Tail();
- if (!EnsureTuple(data, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- ui32 totalTupleSizes = 0;
- for (auto child: data.Children()) {
- if (!EnsureTuple(*child, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- totalTupleSizes += child->ChildrenSize() + 1;
- }
-
- if (totalTupleSizes != inputs.size()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
- TStringBuilder() << "Unexpected number of joins, got: " << totalTupleSizes
- << ", expected:" << inputs.size()));
- return IGraphTransformer::TStatus::Error;
- }
-
- bool needRewrite = false;
- ui32 inputIndex = 0;
- for (ui32 joinGroupNo = 0; joinGroupNo < data.ChildrenSize(); ++joinGroupNo) {
- joinInputs.push_back(inputs[inputIndex]);
- ++inputIndex;
- for (ui32 i = 0; i < data.Child(joinGroupNo)->ChildrenSize(); ++i) {
- auto child = data.Child(joinGroupNo)->Child(i);
- if (!EnsureTupleMinSize(*child, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(child->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto joinType = child->Head().Content();
- if (joinType != "cross" && joinType != "inner" && joinType != "left"
- && joinType != "right" && joinType != "full") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
- TStringBuilder() << "Unsupported join type: " << joinType));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (joinType == "cross") {
- if (!EnsureTupleSize(*child, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- joinInputs.push_back(inputs[inputIndex]);
- ++inputIndex;
- } else {
- if (!EnsureTupleSize(*child, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- bool leftSideIsOptional = (joinType == "right" || joinType == "full");
- bool rightSideIsOptional = (joinType == "left" || joinType == "full");
- if (leftSideIsOptional) {
- for (ui32 j = 0; j < inputIndex; ++j) {
- MakeOptionalColumns(std::get<1>(joinInputs[j]), ctx.Expr);
- }
- }
-
- joinInputs.push_back(inputs[inputIndex]);
- ++inputIndex;
- if (rightSideIsOptional) {
- MakeOptionalColumns(std::get<1>(joinInputs.back()), ctx.Expr);
- }
-
- const auto& quals = child->Tail();
- if (!quals.IsCallable("PgWhere")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(quals.Pos()), "Expected PgWhere"));
- return IGraphTransformer::TStatus::Error;
- }
-
- needRewrite = needRewrite || quals.Child(0)->IsCallable("Void");
- }
- }
- }
-
- if (needRewrite) {
- TExprNode::TListType newJoinGroups;
- inputIndex = 0;
- for (ui32 joinGroupNo = 0; joinGroupNo < data.ChildrenSize(); ++joinGroupNo) {
- TExprNode::TListType newGroupItems;
- TInputs groupInputs;
- THashSet<TString> groupPossibleAliases;
- if (data.Child(joinGroupNo)->ChildrenSize() > 0) {
- groupInputs.push_back(inputs[inputIndex]);
- auto alias = std::get<0>(inputs[inputIndex]);
- if (!alias.empty()) {
- groupPossibleAliases.insert(alias);
- }
- }
-
- ++inputIndex;
- for (ui32 i = 0; i < data.Child(joinGroupNo)->ChildrenSize(); ++i, ++inputIndex) {
- groupInputs.push_back(inputs[inputIndex]);
- auto alias = std::get<0>(inputs[inputIndex]);
- if (!alias.empty()) {
- groupPossibleAliases.insert(alias);
- }
-
- auto child = data.Child(joinGroupNo)->Child(i);
- auto joinType = child->Head().Content();
- if (joinType == "cross") {
- newGroupItems.push_back(data.Child(joinGroupNo)->ChildPtr(i));
- } else {
- const auto& quals = child->Tail();
- bool hasColumnRef;
- THashSet<TString> refs;
- if (!ScanColumns(quals.Child(1)->TailPtr(), groupInputs, groupPossibleAliases, nullptr, hasColumnRef,
- refs, nullptr, ctx)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- TVector<const TItemExprType*> items;
- AddColumns(groupInputs, nullptr, refs, nullptr, items);
- auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
- if (!effectiveType->Validate(quals.Pos(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto typeNode = ExpandType(quals.Pos(), *effectiveType, ctx.Expr);
-
- auto argNode = ctx.Expr.NewArgument(quals.Pos(), "row");
- auto arguments = ctx.Expr.NewArguments(quals.Pos(), { argNode });
- TExprNode::TPtr newRoot;
- auto status = RebuildLambdaColumns(quals.Child(1)->TailPtr(), argNode, newRoot, groupInputs, nullptr, ctx);
- if (status == IGraphTransformer::TStatus::Error) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto predicate = ctx.Expr.Builder(quals.Pos())
- .Callable("Coalesce")
- .Add(0, newRoot)
- .Callable(1, "Bool")
- .Atom(0, "0")
- .Seal()
- .Seal()
- .Build();
-
- auto newLambda = ctx.Expr.NewLambda(quals.Pos(), std::move(arguments), std::move(predicate));
-
- auto newChildren = quals.ChildrenList();
- newChildren[0] = typeNode;
- newChildren[1] = newLambda;
- auto newWhere= ctx.Expr.NewCallable(quals.Pos(), "PgWhere", std::move(newChildren));
- newGroupItems.push_back(ctx.Expr.ChangeChild(*child, 1, std::move(newWhere)));
- }
-
- // after left,right,full join type of inputs in current group may be changed for next predicates
- bool leftSideIsOptional = (joinType == "right" || joinType == "full");
- bool rightSideIsOptional = (joinType == "left" || joinType == "full");
- if (leftSideIsOptional) {
- for (ui32 j = 0; j < inputIndex; ++j) {
- MakeOptionalColumns(std::get<1>(groupInputs[j]), ctx.Expr);
- }
- }
-
- if (rightSideIsOptional) {
- MakeOptionalColumns(std::get<1>(groupInputs[inputIndex]), ctx.Expr);
- }
- }
-
- auto newGroup = ctx.Expr.NewList(option->Pos(), std::move(newGroupItems));
- newJoinGroups.push_back(newGroup);
- }
-
- auto newJoinGroupsNode = ctx.Expr.NewList(option->Pos(), std::move(newJoinGroups));
- auto newSettings = ReplaceSetting(options, {}, TString(optionName), newJoinGroupsNode, ctx.Expr);
- output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
- return IGraphTransformer::TStatus::Repeat;
- }
- } else if (optionName == "group_by") {
- if (pass != 2) {
- continue;
- }
-
- if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& data = option->Tail();
- if (!EnsureTuple(data, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- TExprNode::TListType newGroups;
- if (!ValidateGroups(joinInputs, possibleAliases, data, ctx, newGroups)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!newGroups.empty()) {
- auto resultValue = ctx.Expr.NewList(options.Pos(), std::move(newGroups));
- auto newSettings = ReplaceSetting(options, {}, "group_by", resultValue, ctx.Expr);
- output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
- return IGraphTransformer::TStatus::Repeat;
- }
- } else if (optionName == "window") {
- if (pass != 3) {
- continue;
- }
-
- if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& data = option->Tail();
- if (!EnsureTupleMinSize(data, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- THashSet<TStringBuf> windowNames;
- TExprNode::TListType newWindow;
- bool hasChanges = false;
- for (ui32 i = 0; i < data.ChildrenSize(); ++i) {
- auto x = data.ChildPtr(i);
- if (!x->IsCallable("PgWindow")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected PgWindow"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (x->Head().Content() && !windowNames.insert(x->Head().Content()).second) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()),
- TStringBuilder() << "Duplicated window name: " << x->Head().Content()));
- return IGraphTransformer::TStatus::Error;
- }
-
- auto partitions = x->Child(2);
- auto sort = x->Child(3);
- bool needRebuildSort = false;
- bool needRebuildPartition = false;
- for (const auto& p : partitions->Children()) {
- if (p->Child(0)->IsCallable("Void")) {
- needRebuildPartition = true;
- break;
- }
- }
-
- for (const auto& s : sort->Children()) {
- if (s->Child(0)->IsCallable("Void")) {
- needRebuildSort = true;
- break;
- }
- }
-
- if (!needRebuildSort && !needRebuildPartition) {
- newWindow.push_back(x);
- continue;
- }
-
- hasChanges = true;
- auto newChildren = x->ChildrenList();
- if (needRebuildPartition) {
- TExprNode::TListType newGroups;
- if (!ValidateGroups(joinInputs, possibleAliases, *partitions, ctx, newGroups)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- newChildren[2] = ctx.Expr.NewList(x->Pos(), std::move(newGroups));
- }
-
- if (needRebuildSort) {
- TExprNode::TListType newSorts;
- if (!ValidateSort(joinInputs, possibleAliases, *sort, ctx, newSorts)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- newChildren[3] = ctx.Expr.NewList(x->Pos(), std::move(newSorts));
- }
-
- newWindow.push_back(ctx.Expr.ChangeChildren(*x, std::move(newChildren)));
- }
-
- if (hasChanges) {
- auto windowValue = ctx.Expr.NewList(options.Pos(), std::move(newWindow));
- auto newSettings = ReplaceSetting(options, {}, "window", windowValue, ctx.Expr);
- output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
- return IGraphTransformer::TStatus::Repeat;
- }
- } else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
- TStringBuilder() << "Unsupported option: " << optionName));
- return IGraphTransformer::TStatus::Error;
- }
- }
- }
-
- if (!hasResult && !hasValues) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing result and values"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (hasResult && hasValues) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Either result or values should be specified"));
- return IGraphTransformer::TStatus::Error;
- }
-
- input->SetTypeAnn(ctx.Expr.MakeType<TListExprType>(outputRowType));
- return IGraphTransformer::TStatus::Ok;
- }
-
- IGraphTransformer::TStatus PgSelectWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
- if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& options = input->Head();
- if (!EnsureTuple(options, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const TStructExprType* outputRowType = nullptr;
- TExprNode* setItems = nullptr;
- TExprNode* setOps = nullptr;
- bool hasSort = false;
-
- for (ui32 pass = 0; pass < 2; ++pass) {
- for (const auto& option : options.Children()) {
- if (!EnsureTupleMinSize(*option, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureAtom(option->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto optionName = option->Head().Content();
- if (optionName == "set_ops") {
- if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (pass == 0) {
- if (!EnsureTupleMinSize(option->Tail(), 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- for (const auto& child : option->Tail().Children()) {
- if (!EnsureAtom(*child, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (child->Content() != "push" && child->Content() != "union_all") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(child->Pos()),
- TStringBuilder() << "Unexpected operation: " << child->Content()));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- setOps = &option->Tail();
- }
- } else if (optionName == "set_items") {
- if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (pass == 0) {
- if (!EnsureTupleMinSize(option->Tail(), 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- for (const auto& child : option->Tail().Children()) {
- if (!child->IsCallable("PgSetItem")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(child->Pos()), "Expected PgSetItem"));
- return IGraphTransformer::TStatus::Error;
- }
- }
-
- setItems = &option->Tail();
- } else {
- outputRowType = option->Tail().Head().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->
- Cast<TStructExprType>();
- }
- } else if (optionName == "limit" || optionName == "offset") {
- if (pass != 0) {
- continue;
- }
-
- if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto& data = option->ChildRef(1);
- if (data->GetTypeAnn() && data->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Null) {
- // nothing to do
- } else if (data->GetTypeAnn() && data->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) {
- auto name = data->GetTypeAnn()->Cast<TPgExprType>()->GetName();
- if (name != "int4" && name != "int8") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(data->Pos()), TStringBuilder() <<
- "Expected int4/int8 type, but got: " << name));
- return IGraphTransformer::TStatus::Error;
- }
- } else {
- const TTypeAnnotationNode* expectedType = ctx.Expr.MakeType<TOptionalExprType>(
- ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64));
- auto convertStatus = TryConvertTo(data, *expectedType, ctx.Expr);
- if (convertStatus.Level == IGraphTransformer::TStatus::Error) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(data->Pos()), "Mismatch argument types"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (convertStatus.Level != IGraphTransformer::TStatus::Ok) {
- auto newSettings = ReplaceSetting(options, {}, TString(optionName), option->ChildPtr(1), ctx.Expr);
- output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
- return IGraphTransformer::TStatus::Repeat;
- }
- }
- } else if (optionName == "sort") {
- if (pass != 1) {
- continue;
- }
-
- if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& data = option->Tail();
- if (!EnsureTuple(data, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- for (const auto& x : data.Children()) {
- if (!x->IsCallable("PgSort")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected PgSort"));
- }
- }
-
- hasSort = true;
- } else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
- TStringBuilder() << "Unsupported option: " << optionName));
- return IGraphTransformer::TStatus::Error;
- }
- }
- }
-
- if (!setItems) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing set_items"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!setOps) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing set_ops"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (setOps->ChildrenSize() != setItems->ChildrenSize() * 2 - 1) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Mismatched count of items in set_items and set_ops"));
- return IGraphTransformer::TStatus::Error;
- }
-
- ui32 balance = 0;
- for (const auto& op : setOps->Children()) {
- if (op->Content() == "push") {
- balance += 1;
- } else {
- if (balance < 2) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Disbalanced set_ops"));
- return IGraphTransformer::TStatus::Error;
- }
-
- balance -= 1;
- }
- }
-
- if (balance != 1) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Disbalanced set_ops"));
- return IGraphTransformer::TStatus::Error;
- }
-
- TColumnOrder resultColumnOrder;
- const TStructExprType* resultStructType = nullptr;
- auto status = InferPositionalUnionType(input->Pos(), setItems->ChildrenList(), resultColumnOrder, resultStructType, ctx);
- if (status != IGraphTransformer::TStatus::Ok) {
- return status;
- }
-
- if (hasSort) {
- auto option = GetSetting(options, "sort");
- YQL_ENSURE(option);
- const auto& data = option->Tail();
- TInputs projectionInputs;
- projectionInputs.push_back(std::make_tuple(TString(), resultStructType, resultColumnOrder));
- TExprNode::TListType newSortTupleItems;
-
- if (data.ChildrenSize() > 0 && data.Child(0)->Child(0)->IsCallable("Void")) {
- // no effective types yet, scan lambda bodies
- if (!ValidateSort(projectionInputs, {}, data, ctx, newSortTupleItems)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- auto newSortTuple = ctx.Expr.NewList(data.Pos(), std::move(newSortTupleItems));
- auto newSettings = ReplaceSetting(options, {}, "sort", newSortTuple, ctx.Expr);
- output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
- return IGraphTransformer::TStatus::Repeat;
- }
- }
-
- input->SetTypeAnn(ctx.Expr.MakeType<TListExprType>(resultStructType));
- return ctx.Types.SetColumnOrder(*input, resultColumnOrder, ctx.Expr);
- }
-
IGraphTransformer::TStatus SqlProjectItemWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
YQL_ENSURE(input->IsCallable({"SqlProjectItem", "SqlProjectStarItem"}));
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
new file mode 100644
index 00000000000..c0313f9bfee
--- /dev/null
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
@@ -0,0 +1,2555 @@
+#include "type_ann_pg.h"
+#include "type_ann_list.h"
+
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+
+#include <ydb/library/yql/core/yql_expr_type_annotation.h>
+#include <ydb/library/yql/core/yql_expr_optimize.h>
+#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/core/yql_pg_utils.h>
+
+#include <ydb/library/yql/parser/pg_catalog/catalog.h>
+
+namespace NYql {
+namespace NTypeAnnImpl {
+
+IGraphTransformer::TStatus PgStarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 0, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+struct TPgFuncDesc {
+ ui32 MinArgs;
+ ui32 MaxArgs;
+ EDataSlot ReturnType;
+ TVector<EDataSlot> DataTypes;
+};
+
+class TPgFuncMap {
+public:
+ static const TPgFuncMap& Instance() {
+ return *Singleton<TPgFuncMap>();
+ }
+
+ THashMap<TString, TPgFuncDesc> Funcs;
+
+ TPgFuncMap() {
+ Funcs["substring"] = { 3, 3, EDataSlot::Utf8, { EDataSlot::Utf8, EDataSlot::Int32, EDataSlot::Int32 } };
+ }
+};
+
+IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
+ bool isResolved = input->Content().StartsWith("PgResolvedCall");
+ if (!EnsureMinArgsCount(*input, isResolved ? 2 : 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto name = input->Head().Content();
+
+ if (isResolved) {
+ if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ if (ctx.Types.PgTypes || isResolved) {
+ TVector<ui32> argTypes;
+ bool needRetype = false;
+ for (ui32 i = isResolved ? 2 : 1; i < input->ChildrenSize(); ++i) {
+ auto type = input->Child(i)->GetTypeAnn();
+ ui32 argType;
+ bool convertToPg;
+ if (!ExtractPgType(type, argType, convertToPg, input->Child(i)->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertToPg) {
+ input->ChildRef(i) = ctx.Expr.NewCallable(input->Child(i)->Pos(), "ToPg", { input->ChildPtr(i) });
+ needRetype = true;
+ }
+
+ argTypes.push_back(argType);
+ }
+
+ if (needRetype) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ if (isResolved) {
+ auto procId = FromString<ui32>(input->Child(1)->Content());
+ const auto& proc = NPg::LookupProc(procId, argTypes);
+ if (proc.Name != name) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Mismatch of resolved function name, expected: " << name << ", but got:" << proc.Name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto result = ctx.Expr.MakeType<TPgExprType>(proc.ResultType);
+ input->SetTypeAnn(result);
+ return IGraphTransformer::TStatus::Ok;
+ } else {
+ const auto& proc = NPg::LookupProc(TString(name), argTypes);
+ auto children = input->ChildrenList();
+ auto idNode = ctx.Expr.NewAtom(input->Pos(), ToString(proc.ProcId));
+ children.insert(children.begin() + 1, idNode);
+ output = ctx.Expr.NewCallable(input->Pos(), "PgResolvedCall", std::move(children));
+ return IGraphTransformer::TStatus::Repeat;
+ }
+ } else {
+ const TTypeAnnotationNode* result = nullptr;
+ TVector<const TTypeAnnotationNode*> argTypes;
+ bool isNull = false;
+ bool isOptional = false;
+ for (ui32 i = 1; i < input->ChildrenSize(); ++i) {
+ auto type = input->Child(i)->GetTypeAnn();
+ if (type->GetKind() == ETypeAnnotationKind::Null) {
+ argTypes.push_back(type);
+ isNull = true;
+ result = type;
+ continue;
+ }
+
+ if (type->GetKind() == ETypeAnnotationKind::Optional) {
+ type = RemoveOptionalType(type);
+ isOptional = true;
+ }
+
+ argTypes.push_back(type);
+ }
+
+ const auto& funcs = TPgFuncMap::Instance().Funcs;
+ auto it = funcs.find(name);
+ if (it != funcs.end()) {
+ const auto& desc = it->second;
+ if (argTypes.size() > desc.MaxArgs || argTypes.size() < desc.MinArgs) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Incorrect arguments count: " << argTypes.size() << " for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (ui32 i = 0; i < argTypes.size(); ++i) {
+ auto expectedType = desc.DataTypes[i];
+ if (argTypes[i]->GetKind() != ETypeAnnotationKind::Null) {
+ if (argTypes[i]->GetKind() != ETypeAnnotationKind::Data) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Expected type " << expectedType << " for argument " << (i + 1) << ", but got: " << argTypes[i]->GetKind() << " for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ } else {
+ auto dataType = argTypes[i]->Cast<TDataExprType>()->GetSlot();
+ if (dataType != expectedType) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Expected type " << expectedType << " for argument " << (i + 1) << ", but got: " << dataType << " for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ }
+ }
+
+ result = ctx.Expr.MakeType<TDataExprType>(desc.ReturnType);
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Unsupported function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!isNull && isOptional && result->GetKind() != ETypeAnnotationKind::Optional) {
+ result = ctx.Expr.MakeType<TOptionalExprType>(result);
+ }
+
+ input->SetTypeAnn(result);
+ return IGraphTransformer::TStatus::Ok;
+ }
+}
+
+IGraphTransformer::TStatus FromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureComputable(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (IsNull(input->Head())) {
+ output = input->TailPtr();
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ if (input->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) {
+ output = input->HeadPtr();
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ auto name = input->Head().GetTypeAnn()->Cast<TPgExprType>()->GetName();
+ const TDataExprType* dataType;
+ if (name == "bool") {
+ dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Bool);
+ } else if (name == "int2") {
+ dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int16);
+ } else if (name == "int4") {
+ dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int32);
+ } else if (name == "int8") {
+ dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64);
+ } else if (name == "float4") {
+ dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Float);
+ } else if (name == "float8") {
+ dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Double);
+ } else if (name == "text" || name == "varchar" || name == "cstring") {
+ dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Utf8);
+ } else if (name == "bytea") {
+ dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::String);
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Unsupported type: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto result = ctx.Expr.MakeType<TOptionalExprType>(dataType);
+ input->SetTypeAnn(result);
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureComputable(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (IsNull(input->Head())) {
+ output = input->TailPtr();
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ if (input->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) {
+ output = input->HeadPtr();
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ bool isOptional;
+ const TDataExprType* dataType;
+ if (!EnsureDataOrOptionalOfData(input->Head(), isOptional, dataType, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TString pgType;
+ switch (dataType->GetSlot()) {
+ case NUdf::EDataSlot::Bool:
+ pgType = "bool";
+ break;
+ case NUdf::EDataSlot::Int16:
+ pgType = "int2";
+ break;
+ case NUdf::EDataSlot::Int32:
+ pgType = "int4";
+ break;
+ case NUdf::EDataSlot::Int64:
+ pgType = "int8";
+ break;
+ case NUdf::EDataSlot::Float:
+ pgType = "float4";
+ break;
+ case NUdf::EDataSlot::Double:
+ pgType = "float8";
+ break;
+ case NUdf::EDataSlot::String:
+ pgType = "bytea";
+ break;
+ case NUdf::EDataSlot::Utf8:
+ pgType = "text";
+ break;
+ default:
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Unsupported type: " << dataType->GetName()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto result = ctx.Expr.MakeType<TPgExprType>(NPg::LookupType(pgType).TypeId);
+ input->SetTypeAnn(result);
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
+ bool isResolved = input->IsCallable("PgResolvedOp");
+ if (!EnsureMinArgsCount(*input, isResolved ? 3 : 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureMaxArgsCount(*input, isResolved ? 4 : 3, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto name = input->Head().Content();
+ if (isResolved) {
+ if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ TVector<ui32> argTypes;
+ bool needRetype = false;
+ for (ui32 i = isResolved ? 2 : 1; i < input->ChildrenSize(); ++i) {
+ auto type = input->Child(i)->GetTypeAnn();
+ ui32 argType;
+ bool convertToPg;
+ if (!ExtractPgType(type, argType, convertToPg, input->Child(i)->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertToPg) {
+ input->ChildRef(i) = ctx.Expr.NewCallable(input->Child(i)->Pos(), "ToPg", { input->ChildPtr(i) });
+ needRetype = true;
+ }
+
+ argTypes.push_back(argType);
+ }
+
+ if (needRetype) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ if (isResolved) {
+ auto operId = FromString<ui32>(input->Child(1)->Content());
+ const auto& oper = NPg::LookupOper(operId, argTypes);
+ if (oper.Name != name) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Mismatch of resolved operator name, expected: " << name << ", but got:" << oper.Name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto result = ctx.Expr.MakeType<TPgExprType>(oper.ResultType);
+ input->SetTypeAnn(result);
+ return IGraphTransformer::TStatus::Ok;
+ } else {
+ const auto& oper = NPg::LookupOper(TString(name), argTypes);
+ auto children = input->ChildrenList();
+ auto idNode = ctx.Expr.NewAtom(input->Pos(), ToString(oper.OperId));
+ children.insert(children.begin() + 1, idNode);
+ output = ctx.Expr.NewCallable(input->Pos(), "PgResolvedOp", std::move(children));
+ return IGraphTransformer::TStatus::Repeat;
+ }
+}
+
+IGraphTransformer::TStatus PgWindowCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureMinArgsCount(*input, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto name = input->Child(0)->Content();
+ if (!input->Child(1)->IsAtom() && !input->Child(1)->IsCallable("PgAnonWindow")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ "Expected either window name or reference to an inline window"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (name == "lead" || name == "lag") {
+ if (input->ChildrenSize() != 3) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Expected one argument in " << name << " function"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto arg = input->Child(2)->GetTypeAnn();
+ if (arg->GetKind() == ETypeAnnotationKind::Null) {
+ input->SetTypeAnn(arg);
+ } else if (arg->GetKind() == ETypeAnnotationKind::Optional) {
+ input->SetTypeAnn(arg);
+ } else {
+ input->SetTypeAnn(ctx.Expr.MakeType<TOptionalExprType>(arg));
+ }
+ } else if (name == "row_number") {
+ if (input->ChildrenSize() != 2) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ "Expected no arguments in row_number function"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto result = ctx.Expr.MakeType<TOptionalExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64));
+ input->SetTypeAnn(result);
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Unsupported function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgAggWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
+ Y_UNUSED(output);
+ bool overWindow = (input->Content() == "PgAggWindowCall");
+ if (!EnsureMinArgsCount(*input, overWindow ? 2 : 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto name = input->Head().Content();
+ if (overWindow) {
+ if (!input->Child(1)->IsAtom() && !input->Child(1)->IsCallable("PgAnonWindow")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ "Expected either window name or reference to an inline window"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ if (ctx.Types.PgTypes) {
+ TVector<ui32> argTypes;
+ bool needRetype = false;
+ for (ui32 i = overWindow ? 2 : 1; i < input->ChildrenSize(); ++i) {
+ auto type = input->Child(i)->GetTypeAnn();
+ ui32 argType;
+ bool convertToPg;
+ if (!ExtractPgType(type, argType, convertToPg, input->Child(i)->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertToPg) {
+ input->ChildRef(i) = ctx.Expr.NewCallable(input->Child(i)->Pos(), "ToPg", { input->ChildPtr(i) });
+ needRetype = true;
+ }
+
+ argTypes.push_back(argType);
+ }
+
+ if (needRetype) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ const auto& aggDesc = NPg::LookupAggregation(name, argTypes);
+ if (aggDesc.Kind != NPg::EAggKind::Normal) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ "Only normal aggregation supported"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ ui32 resultType;
+ if (!aggDesc.FinalFuncId) {
+ resultType = aggDesc.TransTypeId;
+ } else {
+ resultType = NPg::LookupProc(aggDesc.FinalFuncId).ResultType;
+ }
+
+ auto result = ctx.Expr.MakeType<TPgExprType>(resultType);
+ input->SetTypeAnn(result);
+ return IGraphTransformer::TStatus::Ok;
+ } else {
+ const TTypeAnnotationNode* result = nullptr;
+ TVector<const TTypeAnnotationNode*> argTypes;
+ bool isNull = false;
+ bool isOptional = false;
+ for (ui32 i = overWindow ? 2 : 1; i < input->ChildrenSize(); ++i) {
+ auto type = input->Child(i)->GetTypeAnn();
+ if (type->GetKind() == ETypeAnnotationKind::Null) {
+ argTypes.push_back(type);
+ isNull = true;
+ result = type;
+ continue;
+ }
+
+ if (type->GetKind() == ETypeAnnotationKind::Optional) {
+ type = RemoveOptionalType(type);
+ isOptional = true;
+ }
+
+ argTypes.push_back(type);
+ }
+
+ if (name == "count") {
+ if (argTypes.size() > 1) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Too many arguments for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ isNull = false;
+ isOptional = true;
+ result = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64);
+ } else if (name == "min" || name == "max") {
+ if (argTypes.size() != 1) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Expected one argument for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!isNull) {
+ auto argType = argTypes[0];
+ if (argType->GetKind() != ETypeAnnotationKind::Data) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Expected comparable type, but got: " << argType->GetKind() << " for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto slot = argType->Cast<TDataExprType>()->GetSlot();
+ if (slot == EDataSlot::Utf8 || slot == EDataSlot::Int32 || slot == EDataSlot::Double || slot == EDataSlot::Bool) {
+ result = argType;
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Expected comparable type, but got: " << slot << " for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ isOptional = true;
+ } else if (name == "sum") {
+ if (argTypes.size() != 1) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Expected one argument for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!isNull) {
+ auto argType = argTypes[0];
+ if (argType->GetKind() != ETypeAnnotationKind::Data) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Expected additive type, but got: " << argType->GetKind() << " for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto slot = argType->Cast<TDataExprType>()->GetSlot();
+ if (slot == EDataSlot::Int32) {
+ result = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64);
+ } else if (slot == EDataSlot::Double) {
+ result = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Double);
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Expected additive type, but got: " << slot << " for function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ isOptional = true;
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Unsupported function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!isNull && isOptional && result->GetKind() != ETypeAnnotationKind::Optional) {
+ result = ctx.Expr.MakeType<TOptionalExprType>(result);
+ }
+
+ input->SetTypeAnn(result);
+ return IGraphTransformer::TStatus::Ok;
+ }
+}
+
+IGraphTransformer::TStatus PgQualifiedStarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgColumnRefWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureMaxArgsCount(*input, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& child : input->Children()) {
+ if (!EnsureAtom(*child, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgResultItemWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 3, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (input->Head().IsList()) {
+ for (const auto& x : input->Head().Children()) {
+ if (!EnsureAtom(*x, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ } else {
+ if (!EnsureAtom(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ bool hasType = false;
+ if (!input->Child(1)->IsCallable("Void")) {
+ hasType = true;
+ if (auto status = EnsureTypeRewrite(input->ChildRef(1), ctx.Expr); status != IGraphTransformer::TStatus::Ok) {
+ return status;
+ }
+ }
+
+ auto& lambda = input->ChildRef(2);
+ const auto status = ConvertToLambda(lambda, ctx.Expr, hasType ? 1 : 0);
+ if (status.Level != IGraphTransformer::TStatus::Ok) {
+ return status;
+ }
+
+ if (!hasType) {
+ input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
+ return IGraphTransformer::TStatus::Ok;
+ }
+
+ if (!UpdateLambdaAllArgumentsTypes(lambda, { input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType() }, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!lambda->GetTypeAnn()) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ input->SetTypeAnn(lambda->GetTypeAnn());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgWhereWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ bool hasType = false;
+ if (!input->Child(0)->IsCallable("Void")) {
+ hasType = true;
+ if (auto status = EnsureTypeRewrite(input->ChildRef(0), ctx.Expr); status != IGraphTransformer::TStatus::Ok) {
+ return status;
+ }
+ }
+
+ auto& lambda = input->ChildRef(1);
+ const auto status = ConvertToLambda(lambda, ctx.Expr, hasType ? 1 : 0);
+ if (status.Level != IGraphTransformer::TStatus::Ok) {
+ return status;
+ }
+
+ if (!hasType) {
+ input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
+ return IGraphTransformer::TStatus::Ok;
+ }
+
+ if (!UpdateLambdaAllArgumentsTypes(lambda, { input->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType() }, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!lambda->GetTypeAnn()) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ input->SetTypeAnn(lambda->GetTypeAnn());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgSortWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 3, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(2), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (input->Child(2)->Content() != "asc" && input->Child(2)->Content() != "desc") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()),
+ TStringBuilder() << "Unsupported sort direction: " << input->Child(2)->Content()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ bool hasType = false;
+ if (!input->Child(0)->IsCallable("Void")) {
+ hasType = true;
+ if (auto status = EnsureTypeRewrite(input->ChildRef(0), ctx.Expr); status != IGraphTransformer::TStatus::Ok) {
+ return status;
+ }
+ }
+
+ auto& lambda = input->ChildRef(1);
+ const auto status = ConvertToLambda(lambda, ctx.Expr, hasType ? 1 : 0);
+ if (status.Level != IGraphTransformer::TStatus::Ok) {
+ return status;
+ }
+
+ if (!hasType) {
+ input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
+ return IGraphTransformer::TStatus::Ok;
+ }
+
+ if (!UpdateLambdaAllArgumentsTypes(lambda, { input->Child(0)->GetTypeAnn()->Cast<TTypeExprType>()->GetType() }, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!lambda->GetTypeAnn()) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ input->SetTypeAnn(lambda->GetTypeAnn());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgWindowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 5, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(1), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (input->Child(1)->Content()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), "Window reference is not supported"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureTuple(*input->Child(2), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& x : input->Child(2)->Children()) {
+ if (!x->IsCallable("PgGroup")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected PgGroup"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ if (!EnsureTuple(*input->Child(3), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& x : input->Child(3)->Children()) {
+ if (!x->IsCallable("PgSort")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected PgSort"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ if (!EnsureTuple(*input->Child(4), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ bool hasType = false;
+ bool hasFrom = false;
+ bool hasTo = false;
+ bool hasFromValue = false;
+ bool hasToValue = false;
+ bool needFromValue = false;
+ bool needToValue = false;
+
+ for (const auto& x : input->Child(4)->Children()) {
+ if (!EnsureTupleMinSize(*x, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(x->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto optionName = x->Head().Content();
+ if (optionName == "exclude") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Excludes are not supported"));
+ return IGraphTransformer::TStatus::Error;
+ } else if (optionName == "from_value" || optionName == "to_value") {
+ hasFromValue = hasFromValue || (optionName == "from_value");
+ hasToValue = hasToValue || (optionName == "to_value");
+ if (!EnsureTupleSize(*x, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!x->Tail().IsCallable("Int32")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected Int32 as frame offset"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto val = FromString<i32>(x->Tail().Head().Content());
+ if (val < 0) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected non-negative value as frame offset"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else if (optionName == "type") {
+ hasType = true;
+ if (!EnsureTupleSize(*x, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(x->Tail(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto type = x->Tail().Content();
+ if (type != "rows") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), TStringBuilder() << "Unsupported frame type: " << type));
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else if (optionName == "from" || optionName == "to") {
+ hasFrom = hasFrom || (optionName == "from");
+ hasTo = hasTo || (optionName == "to");
+ if (!EnsureTupleSize(*x, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(x->Tail(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto bound = x->Tail().Content();
+ if (!(bound == "up" || bound == "p" || bound == "c" || bound == "f" || bound == "uf")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), TStringBuilder() << "Unsupported frame bound: " << bound));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (bound == "p" || bound == "f") {
+ needFromValue = needFromValue || (optionName == "from");
+ needToValue = needToValue || (optionName == "to");
+ }
+
+ if (optionName == "from" && bound == "uf") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Unbounded following is unsupported as start offset"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (optionName == "to" && bound == "up") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Unbounded preceding is unsupported as end offset"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), TStringBuilder() << "Unknown option: " << optionName));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ if (hasType) {
+ if (!hasFrom || !hasTo) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing offset specification in the frame"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else {
+ if (hasFrom || hasTo) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Unexpected offset specification in the frame"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ if (needFromValue != hasFromValue || needToValue != hasToValue) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Wrong offset value in the frame"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgAnonWindowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ ui32 n;
+ if (!TryFromString(input->Child(0)->Content(), n)) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Expected number"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgConstWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureTypePg(input->Tail(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ // TODO: validate value
+ if (!EnsureAtom(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(input->Tail().GetTypeAnn()->Cast<TTypeExprType>()->GetType());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgInternal0Wrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 0, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto internalId = NPg::LookupType("internal").TypeId;
+ input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(internalId));
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto type = input->Head().GetTypeAnn();
+ ui32 inputTypeId = 0;
+ bool convertToPg;
+ if (!ExtractPgType(type, inputTypeId, convertToPg, input->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertToPg) {
+ input->ChildRef(0) = ctx.Expr.NewCallable(input->Child(0)->Pos(), "ToPg", { input->ChildPtr(0) });
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ if (!EnsureTypePg(input->Tail(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ auto targetTypeId = input->Tail().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TPgExprType>()->GetId();
+
+ if (inputTypeId != 0 && inputTypeId != targetTypeId) {
+ if (NPg::LookupType(inputTypeId).Category != 'S' &&
+ NPg::LookupType(targetTypeId).Category != 'S') {
+ Y_UNUSED(NPg::LookupCast(inputTypeId, targetTypeId));
+ }
+ }
+
+ input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(targetTypeId));
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 3, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto func = input->Child(0)->Content();
+ if (!EnsureType(*input->Child(1), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto itemType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ auto& lambda = input->ChildRef(2);
+ auto convertStatus = ConvertToLambda(lambda, ctx.Expr, 1);
+ if (convertStatus.Level != IGraphTransformer::TStatus::Ok) {
+ return convertStatus;
+ }
+
+ if (!UpdateLambdaAllArgumentsTypes(lambda, { itemType }, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto lambdaResult = lambda->GetTypeAnn();
+ if (!lambdaResult) {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ TVector<ui32> argTypes;
+ bool needRetype = false;
+ for (ui32 i = 1; i < lambda->ChildrenSize(); ++i) {
+ auto type = lambda->Child(i)->GetTypeAnn();
+ ui32 argType;
+ bool convertToPg;
+ if (!ExtractPgType(type, argType, convertToPg, lambda->Child(i)->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertToPg) {
+ needRetype = true;
+ }
+
+ argTypes.push_back(argType);
+ }
+
+ if (needRetype) {
+ lambda = ctx.Expr.DeepCopyLambda(*lambda);
+ for (ui32 i = 1; i < lambda->ChildrenSize(); ++i) {
+ auto type = lambda->Child(i)->GetTypeAnn();
+ ui32 argType;
+ bool convertToPg;
+ if (!ExtractPgType(type, argType, convertToPg, lambda->Child(i)->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertToPg) {
+ lambda->ChildRef(i) = ctx.Expr.NewCallable(lambda->Child(i)->Pos(), "ToPg", { lambda->ChildPtr(i) });
+ }
+ }
+
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ const auto& aggDesc = NPg::LookupAggregation(func, argTypes);
+ if (aggDesc.Kind != NPg::EAggKind::Normal) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ "Only normal aggregation supported"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto idLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state")
+ .Arg("state")
+ .Seal()
+ .Build();
+
+ auto saveLambda = idLambda;
+ auto loadLambda = idLambda;
+ auto finishLambda = idLambda;
+ if (aggDesc.FinalFuncId) {
+ finishLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state")
+ .Callable("PgResolvedCallCtx")
+ .Atom(0, NPg::LookupProc(aggDesc.FinalFuncId).Name)
+ .Atom(1, ToString(aggDesc.FinalFuncId))
+ .Arg(2, "state")
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ auto nullValue = ctx.Expr.NewCallable(input->Pos(), "Null", {});
+ auto initValue = nullValue;
+ if (aggDesc.InitValue) {
+ initValue = ctx.Expr.Builder(input->Pos())
+ .Callable("PgCast")
+ .Callable(0, "PgConst")
+ .Atom(0, aggDesc.InitValue)
+ .Callable(1, "PgType")
+ .Atom(0, "text")
+ .Seal()
+ .Seal()
+ .Callable(1, "PgType")
+ .Atom(0, NPg::LookupType(aggDesc.TransTypeId).Name)
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ const auto& transFuncDesc = NPg::LookupProc(aggDesc.TransFuncId);
+ // use first non-null value as state if transFunc is strict
+ bool searchNonNullForState = false;
+ if (transFuncDesc.IsStrict && !aggDesc.InitValue) {
+ Y_ENSURE(argTypes.size() == 1);
+ searchNonNullForState = true;
+ }
+
+ TExprNode::TPtr initLambda, updateLambda;
+ if (!searchNonNullForState) {
+ initLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("row")
+ .Callable("PgResolvedCallCtx")
+ .Atom(0, transFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.TransFuncId))
+ .Add(2, initValue)
+ .Apply(3, lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ updateLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("row")
+ .Param("state")
+ .Callable("Coalesce")
+ .Callable(0, "PgResolvedCallCtx")
+ .Atom(0, transFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.TransFuncId))
+ .Callable(2, "Coalesce")
+ .Arg(0, "state")
+ .Add(1, initValue)
+ .Seal()
+ .Apply(3, lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Arg(1, "state")
+ .Seal()
+ .Seal()
+ .Build();
+ } else {
+ initLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("row")
+ .Apply(lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Build();
+
+ updateLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("row")
+ .Param("state")
+ .Callable("If")
+ .Callable(0, "Exists")
+ .Arg(0, "state")
+ .Seal()
+ .Callable(1, "Coalesce")
+ .Callable(0, "PgResolvedCallCtx")
+ .Atom(0, transFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.TransFuncId))
+ .Arg(2, "state")
+ .Apply(3, lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Arg(1, "state")
+ .Seal()
+ .Apply(2, lambda)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ auto mergeLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state1")
+ .Param("state2")
+ .Callable("Void")
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto zero = ctx.Expr.Builder(input->Pos())
+ .Callable("PgConst")
+ .Atom(0, "0")
+ .Callable(1, "PgType")
+ .Atom(0, "int8")
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto defaultValue = (func != "count") ? nullValue : zero;
+
+ if (aggDesc.SerializeFuncId) {
+ const auto& serializeFuncDesc = NPg::LookupProc(aggDesc.SerializeFuncId);
+ saveLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state")
+ .Callable("PgResolvedCallCtx")
+ .Atom(0, serializeFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.SerializeFuncId))
+ .Arg(2, "state")
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ if (aggDesc.DeserializeFuncId) {
+ const auto& deserializeFuncDesc = NPg::LookupProc(aggDesc.DeserializeFuncId);
+ loadLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state")
+ .Callable("PgResolvedCallCtx")
+ .Atom(0, deserializeFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.DeserializeFuncId))
+ .Arg(2, "state")
+ .Callable(3, "PgInternal0")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ if (aggDesc.CombineFuncId) {
+ const auto& combineFuncDesc = NPg::LookupProc(aggDesc.CombineFuncId);
+ if (combineFuncDesc.IsStrict) {
+ mergeLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state1")
+ .Param("state2")
+ .Callable("If")
+ .Callable(0, "Exists")
+ .Arg(0, "state1")
+ .Seal()
+ .Callable(1, "Coalesce")
+ .Callable(0, "PgResolvedCallCtx")
+ .Atom(0, combineFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.CombineFuncId))
+ .Arg(2, "state1")
+ .Arg(3, "state2")
+ .Seal()
+ .Arg(1, "state1")
+ .Seal()
+ .Arg(2, "state2")
+ .Seal()
+ .Seal()
+ .Build();
+ } else {
+ mergeLambda = ctx.Expr.Builder(input->Pos())
+ .Lambda()
+ .Param("state1")
+ .Param("state2")
+ .Callable("PgResolvedCallCtx")
+ .Atom(0, combineFuncDesc.Name)
+ .Atom(1, ToString(aggDesc.CombineFuncId))
+ .Arg(2, "state1")
+ .Arg(3, "state2")
+ .Seal()
+ .Seal()
+ .Build();
+ }
+ }
+
+ auto typeNode = ExpandType(input->Pos(), *itemType, ctx.Expr);
+ output = ctx.Expr.Builder(input->Pos())
+ .Callable("AggregationTraits")
+ .Add(0, typeNode)
+ .Add(1, initLambda)
+ .Add(2, updateLambda)
+ .Add(3, saveLambda)
+ .Add(4, loadLambda)
+ .Add(5, mergeLambda)
+ .Add(6, finishLambda)
+ .Add(7, defaultValue)
+ .Seal()
+ .Build();
+
+ return IGraphTransformer::TStatus::Repeat;
+}
+
+IGraphTransformer::TStatus PgTypeWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(0), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto type = TString(input->Child(0)->Content());
+ if (!NPg::HasType(type)) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(0)->Pos()),
+ TStringBuilder() << "Unknown type: '" << type << "'"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto typeId = NPg::LookupType(type).TypeId;
+ input->SetTypeAnn(ctx.Expr.MakeType<TTypeExprType>(ctx.Expr.MakeType<TPgExprType>(typeId)));
+ return IGraphTransformer::TStatus::Ok;
+}
+
+using TInputs = TVector<std::tuple<TString, const TStructExprType*, TMaybe<TColumnOrder>>>;
+
+bool ScanColumns(TExprNode::TPtr root, const TInputs& inputs, const THashSet<TString>& possibleAliases,
+ bool* hasStar, bool& hasColumnRef, THashSet<TString>& refs, THashMap<TString, THashSet<TString>>* qualifiedRefs,
+ TExtContext& ctx) {
+ bool isError = false;
+ VisitExpr(root, [&](const TExprNode::TPtr& node) {
+ if (node->IsCallable("PgStar")) {
+ if (!hasStar) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is not allowed here"));
+ isError = true;
+ return false;
+ }
+
+ if (*hasStar) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Duplicate star"));
+ isError = true;
+ return false;
+ }
+
+ if (hasColumnRef) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is incompatible to column reference"));
+ isError = true;
+ return false;
+ }
+
+ *hasStar = true;
+ if (inputs.empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star can't be used without FROM"));
+ isError = true;
+ return false;
+ }
+ }
+ else if (node->IsCallable("PgQualifiedStar")) {
+ if (!hasStar || !qualifiedRefs) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is not allowed here"));
+ isError = true;
+ return false;
+ }
+
+ if (*hasStar) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is incompatible to column reference"));
+ isError = true;
+ return false;
+ }
+
+ hasColumnRef = true;
+ if (inputs.empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Column reference can't be used without FROM"));
+ isError = true;
+ return false;
+ }
+
+ TString alias(node->Head().Content());
+ if (possibleAliases.find(alias) == possibleAliases.end()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), TStringBuilder() << "Unknown alias: " << alias));
+ isError = true;
+ return false;
+ }
+
+ for (const auto& x : inputs) {
+ if (std::get<0>(x).empty() || alias != std::get<0>(x)) {
+ continue;
+ }
+
+ for (const auto& item : std::get<1>(x)->GetItems()) {
+ if (!item->GetName().StartsWith("_yql_")) {
+ (*qualifiedRefs)[alias].insert(TString(item->GetName()));
+ }
+ }
+ }
+ }
+ else if (node->IsCallable("PgColumnRef")) {
+ if (hasStar && *hasStar) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is incompatible to column reference"));
+ isError = true;
+ return false;
+ }
+
+ hasColumnRef = true;
+ if (inputs.empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Column reference can't be used without FROM"));
+ isError = true;
+ return false;
+ }
+
+ if (node->ChildrenSize() == 2 && possibleAliases.find(node->Head().Content()) == possibleAliases.end()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), TStringBuilder() << "Unknown alias: " << node->Head().Content()));
+ isError = true;
+ return false;
+ }
+
+ ui32 matches = 0;
+ for (const auto& x : inputs) {
+ if (node->ChildrenSize() == 2) {
+ if (std::get<0>(x).empty() || node->Head().Content() != std::get<0>(x)) {
+ continue;
+ }
+ }
+
+ auto pos = std::get<1>(x)->FindItem(node->Tail().Content());
+ if (pos) {
+ ++matches;
+ if (matches > 1) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()),
+ TStringBuilder() << "Column reference is ambiguous: " << node->Tail().Content()));
+ isError = true;
+ return false;
+ }
+ }
+ }
+
+ refs.insert(TString(node->Tail().Content()));
+ }
+
+ return true;
+ });
+
+ return !isError;
+}
+
+bool ValidateWindowRefs(const TExprNode::TPtr& root, const TExprNode* windows, TExprContext& ctx) {
+ bool isError = false;
+ VisitExpr(root, [&](const TExprNode::TPtr& node) {
+ if (node->IsCallable("PgWindowCall")) {
+ if (!windows) {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
+ "No window definitions"));
+ isError = true;
+ return false;
+ }
+
+ auto ref = node->Child(1);
+ if (ref->IsAtom()) {
+ auto name = ref->Content();
+ if (!name) {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
+ "Empty window name is not allowed"));
+ isError = true;
+ return false;
+ }
+
+ bool found = false;
+ for (const auto& x : windows->Children()) {
+ if (x->Head().Content() == name) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
+ TStringBuilder() << "Not found window name: " << name));
+ isError = true;
+ return false;
+ }
+ } else {
+ YQL_ENSURE(ref->IsCallable("PgAnonWindow"));
+ auto index = FromString<ui32>(ref->Head().Content());
+ if (index >= windows->ChildrenSize()) {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
+ "Wrong index of window"));
+ isError = true;
+ return false;
+ }
+ }
+ }
+
+ return true;
+ });
+
+ return !isError;
+}
+
+void AddColumns(const TInputs& inputs, const bool* hasStar, const THashSet<TString>& refs,
+ const THashMap<TString, THashSet<TString>>* qualifiedRefs,
+ TVector<const TItemExprType*>& items) {
+ for (const auto& x : inputs) {
+ if (hasStar && *hasStar) {
+ for (ui32 i = 0; i < std::get<1>(x)->GetSize(); ++i) {
+ auto item = std::get<1>(x)->GetItems()[i];
+ if (!item->GetName().StartsWith("_yql_")) {
+ items.push_back(item);
+ }
+ }
+
+ continue;
+ }
+
+ for (const auto& ref : refs) {
+ auto pos = std::get<1>(x)->FindItem(ref);
+ if (pos) {
+ items.push_back(std::get<1>(x)->GetItems()[*pos]);
+ }
+ }
+
+ if (qualifiedRefs && qualifiedRefs->contains(std::get<0>(x))) {
+ for (const auto& ref : qualifiedRefs->find(std::get<0>(x))->second) {
+ auto pos = std::get<1>(x)->FindItem(ref);
+ if (pos) {
+ items.push_back(std::get<1>(x)->GetItems()[*pos]);
+ }
+ }
+ }
+ }
+}
+
+IGraphTransformer::TStatus RebuildLambdaColumns(const TExprNode::TPtr& root, const TExprNode::TPtr& argNode,
+ TExprNode::TPtr& newRoot, const TInputs& inputs, TExprNode::TPtr* expandedColumns, TExtContext& ctx) {
+ return OptimizeExpr(root, newRoot, [&](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
+ if (node->IsCallable("PgStar")) {
+ TExprNode::TListType orderAtoms;
+ for (const auto& x : inputs) {
+ auto order = std::get<2>(x);
+ for (const auto& item : std::get<1>(x)->GetItems()) {
+ if (!item->GetName().StartsWith("_yql_")) {
+ if (!order) {
+ orderAtoms.push_back(ctx.Expr.NewAtom(node->Pos(), item->GetName()));
+ }
+ }
+ }
+
+ if (order) {
+ for (const auto& o : *order) {
+ if (!o.StartsWith("_yql_")) {
+ orderAtoms.push_back(ctx.Expr.NewAtom(node->Pos(), o));
+ }
+ }
+ }
+ }
+
+ if (expandedColumns) {
+ *expandedColumns = ctx.Expr.NewList(node->Pos(), std::move(orderAtoms));
+ }
+
+ return argNode;
+ }
+
+ if (node->IsCallable("PgColumnRef")) {
+ return ctx.Expr.Builder(node->Pos())
+ .Callable("Member")
+ .Add(0, argNode)
+ .Atom(1, node->Tail().Content())
+ .Seal()
+ .Build();
+ }
+
+ if (node->IsCallable("PgQualifiedStar")) {
+ TExprNode::TListType members;
+ for (const auto& x : inputs) {
+ if (std::get<0>(x).empty() || node->Head().Content() != std::get<0>(x)) {
+ continue;
+ }
+
+ auto order = std::get<2>(x);
+ TExprNode::TListType orderAtoms;
+ for (const auto& item : std::get<1>(x)->GetItems()) {
+ if (!item->GetName().StartsWith("_yql_")) {
+ if (!order) {
+ orderAtoms.push_back(ctx.Expr.NewAtom(node->Pos(), item->GetName()));
+ }
+
+ members.push_back(ctx.Expr.Builder(node->Pos())
+ .List()
+ .Atom(0, item->GetName())
+ .Callable(1, "Member")
+ .Add(0, argNode)
+ .Atom(1, item->GetName())
+ .Seal()
+ .Seal()
+ .Build());
+ }
+ }
+
+ if (order) {
+ for (const auto& o : *order) {
+ if (!o.StartsWith("_yql_")) {
+ orderAtoms.push_back(ctx.Expr.NewAtom(node->Pos(), o));
+ }
+ }
+ }
+
+ if (expandedColumns) {
+ *expandedColumns = ctx.Expr.NewList(node->Pos(), std::move(orderAtoms));
+ }
+
+ return ctx.Expr.NewCallable(node->Pos(), "AsStruct", std::move(members));
+ }
+
+ YQL_ENSURE(false, "missing input");
+ }
+
+ return node;
+ }, ctx.Expr, TOptimizeExprSettings(nullptr));
+}
+
+void MakeOptionalColumns(const TStructExprType*& structType, TExprContext& ctx) {
+ bool needRebuild = false;
+ for (const auto& item : structType->GetItems()) {
+ if (item->GetItemType()->GetKind() != ETypeAnnotationKind::Optional
+ && item->GetItemType()->GetKind() != ETypeAnnotationKind::Null) {
+ needRebuild = true;
+ break;
+ }
+ }
+
+ if (!needRebuild) {
+ return;
+ }
+
+ auto newItems = structType->GetItems();
+ for (auto& item : newItems) {
+ if (item->GetItemType()->GetKind() != ETypeAnnotationKind::Optional
+ && item->GetItemType()->GetKind() != ETypeAnnotationKind::Null) {
+ item = ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TOptionalExprType>(item->GetItemType()));
+ }
+ }
+
+ structType = ctx.MakeType<TStructExprType>(newItems);
+}
+
+bool ValidateGroups(const TInputs& inputs, const THashSet<TString>& possibleAliases,
+ const TExprNode& data, TExtContext& ctx, TExprNode::TListType& newGroups) {
+ newGroups.clear();
+ bool hasColumnRef = false;
+ for (const auto& group : data.Children()) {
+ if (!group->IsCallable("PgGroup")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(group->Pos()), "Expected PgGroup"));
+ return false;
+ }
+
+ YQL_ENSURE(group->Tail().IsLambda());
+ THashSet<TString> refs;
+ THashMap<TString, THashSet<TString>> qualifiedRefs;
+ if (group->Child(0)->IsCallable("Void")) {
+ // no effective type yet, scan lambda body
+ if (!ScanColumns(group->Tail().TailPtr(), inputs, possibleAliases, nullptr, hasColumnRef,
+ refs, &qualifiedRefs, ctx)) {
+ return false;
+ }
+
+ TVector<const TItemExprType*> items;
+ AddColumns(inputs, nullptr, refs, &qualifiedRefs, items);
+ auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
+ if (!effectiveType->Validate(group->Pos(), ctx.Expr)) {
+ return false;
+ }
+
+ auto typeNode = ExpandType(group->Pos(), *effectiveType, ctx.Expr);
+
+ auto argNode = ctx.Expr.NewArgument(group->Pos(), "row");
+ auto arguments = ctx.Expr.NewArguments(group->Pos(), { argNode });
+ TExprNode::TPtr newRoot;
+ auto status = RebuildLambdaColumns(group->Tail().TailPtr(), argNode, newRoot, inputs, nullptr, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return false;
+ }
+
+ auto newLambda = ctx.Expr.NewLambda(group->Pos(), std::move(arguments), std::move(newRoot));
+
+ auto newChildren = group->ChildrenList();
+ newChildren[0] = typeNode;
+ newChildren[1] = newLambda;
+ auto newGroup = ctx.Expr.NewCallable(group->Pos(), "PgGroup", std::move(newChildren));
+ newGroups.push_back(newGroup);
+ }
+ }
+
+ return true;
+}
+
+bool ValidateSort(const TInputs& inputs, const THashSet<TString>& possibleAliases,
+ const TExprNode& data, TExtContext& ctx, TExprNode::TListType& newSorts) {
+ newSorts.clear();
+ for (auto oneSort : data.Children()) {
+ bool hasColumnRef;
+ THashSet<TString> refs;
+ THashMap<TString, THashSet<TString>> qualifiedRefs;
+ if (!ScanColumns(oneSort->Child(1)->TailPtr(), inputs, possibleAliases, nullptr, hasColumnRef,
+ refs, &qualifiedRefs, ctx)) {
+ return false;
+ }
+
+ TVector<const TItemExprType*> items;
+ AddColumns(inputs, nullptr, refs, &qualifiedRefs, items);
+ auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
+ if (!effectiveType->Validate(oneSort->Pos(), ctx.Expr)) {
+ return false;
+ }
+
+ auto typeNode = ExpandType(oneSort->Pos(), *effectiveType, ctx.Expr);
+
+ auto argNode = ctx.Expr.NewArgument(oneSort->Pos(), "row");
+ auto arguments = ctx.Expr.NewArguments(oneSort->Pos(), { argNode });
+ TExprNode::TPtr newRoot;
+ auto status = RebuildLambdaColumns(oneSort->Child(1)->TailPtr(), argNode, newRoot, inputs, nullptr, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return false;
+ }
+
+ auto newLambda = ctx.Expr.NewLambda(oneSort->Pos(), std::move(arguments), std::move(newRoot));
+
+ auto newChildren = oneSort->ChildrenList();
+ newChildren[0] = typeNode;
+ newChildren[1] = newLambda;
+ auto newSort = ctx.Expr.ChangeChildren(*oneSort, std::move(newChildren));
+ newSorts.push_back(newSort);
+ }
+
+ return true;
+}
+
+IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto& options = input->Head();
+ if (!EnsureTuple(options, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const TStructExprType* outputRowType = nullptr;
+ TInputs inputs;
+ TInputs joinInputs;
+ THashSet<TString> possibleAliases;
+ bool hasResult = false;
+ bool hasValues = false;
+ bool hasJoinOps = false;
+
+ // pass 0 - from/values
+ // pass 1 - join
+ // pass 2 - where, group_by,
+ // pass 3 - window
+ // pass 4 - result
+ for (ui32 pass = 0; pass < 5; ++pass) {
+ if (pass > 1 && !inputs.empty() && !hasJoinOps) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing join_ops"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& option : options.Children()) {
+ if (!EnsureTupleMinSize(*option, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(option->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto optionName = option->Head().Content();
+ if (optionName == "values") {
+ hasValues = true;
+ if (pass != 0) {
+ continue;
+ }
+
+ if (!EnsureTupleSize(*option, 3, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto values = option->Child(2);
+ if (!EnsureListType(*values, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto listType = values->GetTypeAnn()->Cast<TListExprType>();
+ if (!EnsureTupleType(values->Pos(), *listType->GetItemType(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto tupleType = listType->GetItemType()->Cast<TTupleExprType>();
+ auto names = option->Child(1);
+ if (!EnsureTupleSize(*names, tupleType->GetSize(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TVector<const TItemExprType*> outputItems;
+ TVector<TString> columns;
+ for (ui32 i = 0; i < names->ChildrenSize(); ++i) {
+ if (!EnsureAtom(*names->Child(i), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(names->Child(i)->Content(), tupleType->GetItems()[i]));
+ }
+
+ outputRowType = ctx.Expr.MakeType<TStructExprType>(outputItems);
+ if (!outputRowType->Validate(names->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else if (optionName == "result") {
+ hasResult = true;
+ if (pass != 4) {
+ continue;
+ }
+
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto& data = option->Tail();
+ if (!EnsureTuple(data, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TVector<const TItemExprType*> outputItems;
+ TExprNode::TListType newResult;
+ bool hasStar = false;
+ bool hasColumnRef = false;
+ for (const auto& column : data.Children()) {
+ if (!column->IsCallable("PgResultItem")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(column->Pos()), "Expected PgResultItem"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ YQL_ENSURE(column->Tail().IsLambda());
+ THashSet<TString> refs;
+ THashMap<TString, THashSet<TString>> qualifiedRefs;
+ if (column->Child(1)->IsCallable("Void")) {
+ // no effective type yet, scan lambda body
+ if (!ScanColumns(column->Tail().TailPtr(), joinInputs, possibleAliases, &hasStar, hasColumnRef,
+ refs, &qualifiedRefs, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TVector<const TItemExprType*> items;
+ AddColumns(joinInputs, &hasStar, refs, &qualifiedRefs, items);
+ auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
+ if (!effectiveType->Validate(column->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto expandedColumns = column->HeadPtr();
+ auto typeNode = ExpandType(column->Pos(), *effectiveType, ctx.Expr);
+
+ auto argNode = ctx.Expr.NewArgument(column->Pos(), "row");
+ auto arguments = ctx.Expr.NewArguments(column->Pos(), { argNode });
+ TExprNode::TPtr newRoot;
+ auto status = RebuildLambdaColumns(column->Tail().TailPtr(), argNode, newRoot, joinInputs, &expandedColumns, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto newLambda = ctx.Expr.NewLambda(column->Pos(), std::move(arguments), std::move(newRoot));
+
+ auto newColumnChildren = column->ChildrenList();
+ newColumnChildren[0] = expandedColumns;
+ newColumnChildren[1] = typeNode;
+ newColumnChildren[2] = newLambda;
+ auto newColumn = ctx.Expr.NewCallable(column->Pos(), "PgResultItem", std::move(newColumnChildren));
+ newResult.push_back(newColumn);
+ } else {
+ if (column->Head().IsAtom()) {
+ outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(column->Head().Content(), column->Tail().GetTypeAnn()));
+ } else {
+ // star or qualified star
+ for (const auto& item : column->Tail().GetTypeAnn()->Cast<TStructExprType>()->GetItems()) {
+ outputItems.push_back(item);
+ }
+ }
+
+ // scan lambda for window references
+ auto windows = GetSetting(options, "window");
+ if (!ValidateWindowRefs(column->TailPtr(), windows ? &windows->Tail() : nullptr, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ }
+
+ if (!newResult.empty()) {
+ auto resultValue = ctx.Expr.NewList(options.Pos(), std::move(newResult));
+ auto newSettings = ReplaceSetting(options, {}, "result", resultValue, ctx.Expr);
+ output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ outputRowType = ctx.Expr.MakeType<TStructExprType>(outputItems);
+ if (!outputRowType->Validate(data.Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else if (optionName == "from") {
+ if (pass != 0) {
+ continue;
+ }
+
+ if (!EnsureTuple(*option, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto& data = option->Tail();
+ if (!EnsureTupleMinSize(data, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& p : data.Children()) {
+ if (!EnsureTupleSize(*p, 3, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(*p->Child(1), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureTuple(*p->Child(2), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& name : p->Child(2)->Children()) {
+ if (!EnsureAtom(*name, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ auto columnOrder = ctx.Types.LookupColumnOrder(p->Head());
+ if (!EnsureListType(p->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto inputRowType = p->Head().GetTypeAnn()->Cast<TListExprType>()->GetItemType();
+ if (!EnsureStructType(p->Head().Pos(), *inputRowType, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto inputStructType = inputRowType->Cast<TStructExprType>();
+ auto alias = TString(p->Child(1)->Content());
+ if (!alias.empty()) {
+ if (!possibleAliases.insert(alias).second) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ TStringBuilder() << "Duplicated alias: " << alias));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ if (p->Child(2)->ChildrenSize() > 0) {
+ // explicit columns
+ ui32 realColumns = 0;
+ for (const auto& item : inputStructType->GetItems()) {
+ if (!item->GetName().StartsWith("_yql_")) {
+ ++realColumns;
+ }
+ }
+
+ if (realColumns != p->Child(2)->ChildrenSize()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ TStringBuilder() << "Wrong number of columns, expected: " << realColumns
+ << ", got: " << p->Child(2)->ChildrenSize()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!columnOrder) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ "No column order at source"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TVector<const TItemExprType*> newStructItems;
+ TColumnOrder newOrder;
+ for (ui32 i = 0; i < p->Child(2)->ChildrenSize(); ++i) {
+ auto pos = inputStructType->FindItem((*columnOrder)[i]);
+ YQL_ENSURE(pos);
+ auto type = inputStructType->GetItems()[*pos]->GetItemType();
+ newOrder.push_back(TString(p->Child(2)->Child(i)->Content()));
+ newStructItems.push_back(ctx.Expr.MakeType<TItemExprType>(p->Child(2)->Child(i)->Content(), type));
+ }
+
+ auto newStructType = ctx.Expr.MakeType<TStructExprType>(newStructItems);
+ if (!newStructType->Validate(p->Child(2)->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ inputs.push_back(std::make_tuple(alias, newStructType, newOrder));
+ } else {
+ inputs.push_back(std::make_tuple(alias, inputStructType, columnOrder));
+ }
+ }
+ } else if (optionName == "where" || optionName == "having") {
+ if (pass != 2) {
+ continue;
+ }
+
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto& data = option->Tail();
+ if (!data.IsCallable("PgWhere")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()), "Expected PgWhere"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (data.Child(0)->IsCallable("Void")) {
+ // no effective type yet, scan lambda body
+ bool hasColumnRef;
+ THashSet<TString> refs;
+ if (!ScanColumns(data.Child(1)->TailPtr(), joinInputs, possibleAliases, nullptr, hasColumnRef,
+ refs, nullptr, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TVector<const TItemExprType*> items;
+ AddColumns(joinInputs, nullptr, refs, nullptr, items);
+ auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
+ if (!effectiveType->Validate(data.Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto typeNode = ExpandType(data.Pos(), *effectiveType, ctx.Expr);
+
+ auto argNode = ctx.Expr.NewArgument(data.Pos(), "row");
+ auto arguments = ctx.Expr.NewArguments(data.Pos(), { argNode });
+ TExprNode::TPtr newRoot;
+ auto status = RebuildLambdaColumns(data.Child(1)->TailPtr(), argNode, newRoot, joinInputs, nullptr, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto newLambda = ctx.Expr.NewLambda(data.Pos(), std::move(arguments), std::move(newRoot));
+
+ auto newChildren = data.ChildrenList();
+ newChildren[0] = typeNode;
+ newChildren[1] = newLambda;
+ auto newWhere= ctx.Expr.NewCallable(data.Pos(), "PgWhere", std::move(newChildren));
+ auto newSettings = ReplaceSetting(options, {}, TString(optionName), newWhere, ctx.Expr);
+ output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
+ return IGraphTransformer::TStatus::Repeat;
+ } else {
+ if (data.GetTypeAnn() && data.GetTypeAnn()->GetKind() == ETypeAnnotationKind::Null) {
+ // nothing to do
+ } else if (data.GetTypeAnn() && data.GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) {
+ auto name = data.GetTypeAnn()->Cast<TPgExprType>()->GetName();
+ if (name != "bool") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(data.Pos()), TStringBuilder() <<
+ "Expected bool type, but got: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else if (!EnsureSpecificDataType(data, EDataSlot::Bool, ctx.Expr, true)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ } else if (optionName == "join_ops") {
+ if (pass != 1) {
+ continue;
+ }
+
+ hasJoinOps = true;
+ if (hasValues) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Join and values options are not compatible"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (inputs.empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "At least one input expected"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto& data = option->Tail();
+ if (!EnsureTuple(data, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ ui32 totalTupleSizes = 0;
+ for (auto child: data.Children()) {
+ if (!EnsureTuple(*child, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ totalTupleSizes += child->ChildrenSize() + 1;
+ }
+
+ if (totalTupleSizes != inputs.size()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ TStringBuilder() << "Unexpected number of joins, got: " << totalTupleSizes
+ << ", expected:" << inputs.size()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ bool needRewrite = false;
+ ui32 inputIndex = 0;
+ for (ui32 joinGroupNo = 0; joinGroupNo < data.ChildrenSize(); ++joinGroupNo) {
+ joinInputs.push_back(inputs[inputIndex]);
+ ++inputIndex;
+ for (ui32 i = 0; i < data.Child(joinGroupNo)->ChildrenSize(); ++i) {
+ auto child = data.Child(joinGroupNo)->Child(i);
+ if (!EnsureTupleMinSize(*child, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(child->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto joinType = child->Head().Content();
+ if (joinType != "cross" && joinType != "inner" && joinType != "left"
+ && joinType != "right" && joinType != "full") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ TStringBuilder() << "Unsupported join type: " << joinType));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (joinType == "cross") {
+ if (!EnsureTupleSize(*child, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ joinInputs.push_back(inputs[inputIndex]);
+ ++inputIndex;
+ } else {
+ if (!EnsureTupleSize(*child, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ bool leftSideIsOptional = (joinType == "right" || joinType == "full");
+ bool rightSideIsOptional = (joinType == "left" || joinType == "full");
+ if (leftSideIsOptional) {
+ for (ui32 j = 0; j < inputIndex; ++j) {
+ MakeOptionalColumns(std::get<1>(joinInputs[j]), ctx.Expr);
+ }
+ }
+
+ joinInputs.push_back(inputs[inputIndex]);
+ ++inputIndex;
+ if (rightSideIsOptional) {
+ MakeOptionalColumns(std::get<1>(joinInputs.back()), ctx.Expr);
+ }
+
+ const auto& quals = child->Tail();
+ if (!quals.IsCallable("PgWhere")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(quals.Pos()), "Expected PgWhere"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ needRewrite = needRewrite || quals.Child(0)->IsCallable("Void");
+ }
+ }
+ }
+
+ if (needRewrite) {
+ TExprNode::TListType newJoinGroups;
+ inputIndex = 0;
+ for (ui32 joinGroupNo = 0; joinGroupNo < data.ChildrenSize(); ++joinGroupNo) {
+ TExprNode::TListType newGroupItems;
+ TInputs groupInputs;
+ THashSet<TString> groupPossibleAliases;
+ if (data.Child(joinGroupNo)->ChildrenSize() > 0) {
+ groupInputs.push_back(inputs[inputIndex]);
+ auto alias = std::get<0>(inputs[inputIndex]);
+ if (!alias.empty()) {
+ groupPossibleAliases.insert(alias);
+ }
+ }
+
+ ++inputIndex;
+ for (ui32 i = 0; i < data.Child(joinGroupNo)->ChildrenSize(); ++i, ++inputIndex) {
+ groupInputs.push_back(inputs[inputIndex]);
+ auto alias = std::get<0>(inputs[inputIndex]);
+ if (!alias.empty()) {
+ groupPossibleAliases.insert(alias);
+ }
+
+ auto child = data.Child(joinGroupNo)->Child(i);
+ auto joinType = child->Head().Content();
+ if (joinType == "cross") {
+ newGroupItems.push_back(data.Child(joinGroupNo)->ChildPtr(i));
+ } else {
+ const auto& quals = child->Tail();
+ bool hasColumnRef;
+ THashSet<TString> refs;
+ if (!ScanColumns(quals.Child(1)->TailPtr(), groupInputs, groupPossibleAliases, nullptr, hasColumnRef,
+ refs, nullptr, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TVector<const TItemExprType*> items;
+ AddColumns(groupInputs, nullptr, refs, nullptr, items);
+ auto effectiveType = ctx.Expr.MakeType<TStructExprType>(items);
+ if (!effectiveType->Validate(quals.Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto typeNode = ExpandType(quals.Pos(), *effectiveType, ctx.Expr);
+
+ auto argNode = ctx.Expr.NewArgument(quals.Pos(), "row");
+ auto arguments = ctx.Expr.NewArguments(quals.Pos(), { argNode });
+ TExprNode::TPtr newRoot;
+ auto status = RebuildLambdaColumns(quals.Child(1)->TailPtr(), argNode, newRoot, groupInputs, nullptr, ctx);
+ if (status == IGraphTransformer::TStatus::Error) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto predicate = ctx.Expr.Builder(quals.Pos())
+ .Callable("Coalesce")
+ .Add(0, newRoot)
+ .Callable(1, "Bool")
+ .Atom(0, "0")
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto newLambda = ctx.Expr.NewLambda(quals.Pos(), std::move(arguments), std::move(predicate));
+
+ auto newChildren = quals.ChildrenList();
+ newChildren[0] = typeNode;
+ newChildren[1] = newLambda;
+ auto newWhere= ctx.Expr.NewCallable(quals.Pos(), "PgWhere", std::move(newChildren));
+ newGroupItems.push_back(ctx.Expr.ChangeChild(*child, 1, std::move(newWhere)));
+ }
+
+ // after left,right,full join type of inputs in current group may be changed for next predicates
+ bool leftSideIsOptional = (joinType == "right" || joinType == "full");
+ bool rightSideIsOptional = (joinType == "left" || joinType == "full");
+ if (leftSideIsOptional) {
+ for (ui32 j = 0; j < inputIndex; ++j) {
+ MakeOptionalColumns(std::get<1>(groupInputs[j]), ctx.Expr);
+ }
+ }
+
+ if (rightSideIsOptional) {
+ MakeOptionalColumns(std::get<1>(groupInputs[inputIndex]), ctx.Expr);
+ }
+ }
+
+ auto newGroup = ctx.Expr.NewList(option->Pos(), std::move(newGroupItems));
+ newJoinGroups.push_back(newGroup);
+ }
+
+ auto newJoinGroupsNode = ctx.Expr.NewList(option->Pos(), std::move(newJoinGroups));
+ auto newSettings = ReplaceSetting(options, {}, TString(optionName), newJoinGroupsNode, ctx.Expr);
+ output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
+ return IGraphTransformer::TStatus::Repeat;
+ }
+ } else if (optionName == "group_by") {
+ if (pass != 2) {
+ continue;
+ }
+
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto& data = option->Tail();
+ if (!EnsureTuple(data, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TExprNode::TListType newGroups;
+ if (!ValidateGroups(joinInputs, possibleAliases, data, ctx, newGroups)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!newGroups.empty()) {
+ auto resultValue = ctx.Expr.NewList(options.Pos(), std::move(newGroups));
+ auto newSettings = ReplaceSetting(options, {}, "group_by", resultValue, ctx.Expr);
+ output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
+ return IGraphTransformer::TStatus::Repeat;
+ }
+ } else if (optionName == "window") {
+ if (pass != 3) {
+ continue;
+ }
+
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto& data = option->Tail();
+ if (!EnsureTupleMinSize(data, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ THashSet<TStringBuf> windowNames;
+ TExprNode::TListType newWindow;
+ bool hasChanges = false;
+ for (ui32 i = 0; i < data.ChildrenSize(); ++i) {
+ auto x = data.ChildPtr(i);
+ if (!x->IsCallable("PgWindow")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected PgWindow"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (x->Head().Content() && !windowNames.insert(x->Head().Content()).second) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()),
+ TStringBuilder() << "Duplicated window name: " << x->Head().Content()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto partitions = x->Child(2);
+ auto sort = x->Child(3);
+ bool needRebuildSort = false;
+ bool needRebuildPartition = false;
+ for (const auto& p : partitions->Children()) {
+ if (p->Child(0)->IsCallable("Void")) {
+ needRebuildPartition = true;
+ break;
+ }
+ }
+
+ for (const auto& s : sort->Children()) {
+ if (s->Child(0)->IsCallable("Void")) {
+ needRebuildSort = true;
+ break;
+ }
+ }
+
+ if (!needRebuildSort && !needRebuildPartition) {
+ newWindow.push_back(x);
+ continue;
+ }
+
+ hasChanges = true;
+ auto newChildren = x->ChildrenList();
+ if (needRebuildPartition) {
+ TExprNode::TListType newGroups;
+ if (!ValidateGroups(joinInputs, possibleAliases, *partitions, ctx, newGroups)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ newChildren[2] = ctx.Expr.NewList(x->Pos(), std::move(newGroups));
+ }
+
+ if (needRebuildSort) {
+ TExprNode::TListType newSorts;
+ if (!ValidateSort(joinInputs, possibleAliases, *sort, ctx, newSorts)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ newChildren[3] = ctx.Expr.NewList(x->Pos(), std::move(newSorts));
+ }
+
+ newWindow.push_back(ctx.Expr.ChangeChildren(*x, std::move(newChildren)));
+ }
+
+ if (hasChanges) {
+ auto windowValue = ctx.Expr.NewList(options.Pos(), std::move(newWindow));
+ auto newSettings = ReplaceSetting(options, {}, "window", windowValue, ctx.Expr);
+ output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
+ return IGraphTransformer::TStatus::Repeat;
+ }
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ TStringBuilder() << "Unsupported option: " << optionName));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ }
+
+ if (!hasResult && !hasValues) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing result and values"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (hasResult && hasValues) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Either result or values should be specified"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.Expr.MakeType<TListExprType>(outputRowType));
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus PgSelectWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto& options = input->Head();
+ if (!EnsureTuple(options, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const TStructExprType* outputRowType = nullptr;
+ TExprNode* setItems = nullptr;
+ TExprNode* setOps = nullptr;
+ bool hasSort = false;
+
+ for (ui32 pass = 0; pass < 2; ++pass) {
+ for (const auto& option : options.Children()) {
+ if (!EnsureTupleMinSize(*option, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureAtom(option->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto optionName = option->Head().Content();
+ if (optionName == "set_ops") {
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (pass == 0) {
+ if (!EnsureTupleMinSize(option->Tail(), 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& child : option->Tail().Children()) {
+ if (!EnsureAtom(*child, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (child->Content() != "push" && child->Content() != "union_all") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(child->Pos()),
+ TStringBuilder() << "Unexpected operation: " << child->Content()));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ setOps = &option->Tail();
+ }
+ } else if (optionName == "set_items") {
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (pass == 0) {
+ if (!EnsureTupleMinSize(option->Tail(), 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& child : option->Tail().Children()) {
+ if (!child->IsCallable("PgSetItem")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(child->Pos()), "Expected PgSetItem"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ setItems = &option->Tail();
+ } else {
+ outputRowType = option->Tail().Head().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->
+ Cast<TStructExprType>();
+ }
+ } else if (optionName == "limit" || optionName == "offset") {
+ if (pass != 0) {
+ continue;
+ }
+
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto& data = option->ChildRef(1);
+ if (data->GetTypeAnn() && data->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Null) {
+ // nothing to do
+ } else if (data->GetTypeAnn() && data->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) {
+ auto name = data->GetTypeAnn()->Cast<TPgExprType>()->GetName();
+ if (name != "int4" && name != "int8") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(data->Pos()), TStringBuilder() <<
+ "Expected int4/int8 type, but got: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else {
+ const TTypeAnnotationNode* expectedType = ctx.Expr.MakeType<TOptionalExprType>(
+ ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64));
+ auto convertStatus = TryConvertTo(data, *expectedType, ctx.Expr);
+ if (convertStatus.Level == IGraphTransformer::TStatus::Error) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(data->Pos()), "Mismatch argument types"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertStatus.Level != IGraphTransformer::TStatus::Ok) {
+ auto newSettings = ReplaceSetting(options, {}, TString(optionName), option->ChildPtr(1), ctx.Expr);
+ output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
+ return IGraphTransformer::TStatus::Repeat;
+ }
+ }
+ } else if (optionName == "sort") {
+ if (pass != 1) {
+ continue;
+ }
+
+ if (!EnsureTupleSize(*option, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto& data = option->Tail();
+ if (!EnsureTuple(data, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ for (const auto& x : data.Children()) {
+ if (!x->IsCallable("PgSort")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(x->Pos()), "Expected PgSort"));
+ }
+ }
+
+ hasSort = true;
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ TStringBuilder() << "Unsupported option: " << optionName));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ }
+
+ if (!setItems) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing set_items"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!setOps) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Missing set_ops"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (setOps->ChildrenSize() != setItems->ChildrenSize() * 2 - 1) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Mismatched count of items in set_items and set_ops"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ ui32 balance = 0;
+ for (const auto& op : setOps->Children()) {
+ if (op->Content() == "push") {
+ balance += 1;
+ } else {
+ if (balance < 2) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Disbalanced set_ops"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ balance -= 1;
+ }
+ }
+
+ if (balance != 1) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Disbalanced set_ops"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TColumnOrder resultColumnOrder;
+ const TStructExprType* resultStructType = nullptr;
+ auto status = InferPositionalUnionType(input->Pos(), setItems->ChildrenList(), resultColumnOrder, resultStructType, ctx);
+ if (status != IGraphTransformer::TStatus::Ok) {
+ return status;
+ }
+
+ if (hasSort) {
+ auto option = GetSetting(options, "sort");
+ YQL_ENSURE(option);
+ const auto& data = option->Tail();
+ TInputs projectionInputs;
+ projectionInputs.push_back(std::make_tuple(TString(), resultStructType, resultColumnOrder));
+ TExprNode::TListType newSortTupleItems;
+
+ if (data.ChildrenSize() > 0 && data.Child(0)->Child(0)->IsCallable("Void")) {
+ // no effective types yet, scan lambda bodies
+ if (!ValidateSort(projectionInputs, {}, data, ctx, newSortTupleItems)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto newSortTuple = ctx.Expr.NewList(data.Pos(), std::move(newSortTupleItems));
+ auto newSettings = ReplaceSetting(options, {}, "sort", newSortTuple, ctx.Expr);
+ output = ctx.Expr.ChangeChild(*input, 0, std::move(newSettings));
+ return IGraphTransformer::TStatus::Repeat;
+ }
+ }
+
+ input->SetTypeAnn(ctx.Expr.MakeType<TListExprType>(resultStructType));
+ return ctx.Types.SetColumnOrder(*input, resultColumnOrder, ctx.Expr);
+}
+
+} // namespace NTypeAnnImpl
+}
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.h b/ydb/library/yql/core/type_ann/type_ann_pg.h
new file mode 100644
index 00000000000..9034916df84
--- /dev/null
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include "type_ann_impl.h"
+
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/ast/yql_expr_types.h>
+
+namespace NYql {
+namespace NTypeAnnImpl {
+
+IGraphTransformer::TStatus PgStarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
+IGraphTransformer::TStatus FromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
+IGraphTransformer::TStatus PgWindowCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgAggWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
+IGraphTransformer::TStatus PgQualifiedStarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgColumnRefWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgResultItemWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgWhereWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgSortWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgWindowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgAnonWindowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgConstWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgInternal0Wrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgTypeWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
+IGraphTransformer::TStatus PgSelectWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
+
+} // namespace NTypeAnnImpl
+} // namespace NYql