aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-05-11 11:02:33 +0300
committervvvv <vvvv@ydb.tech>2023-05-11 11:02:33 +0300
commitbbc9f26922e5aebda5ba85c50c00c3423245bd7b (patch)
tree9acaf8202a33323632a84c718ecb56a7480ed7ad
parentcdafe4800200f0bf59e607b0ee20d0cb9064e953 (diff)
downloadydb-bbc9f26922e5aebda5ba85c50c00c3423245bd7b.tar.gz
fixed Pg aggregations with init values
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp1
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.cpp43
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.h1
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp14
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h1
-rw-r--r--ydb/library/yql/minikql/mkql_runtime_version.h2
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp56
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp12
8 files changed, 126 insertions, 4 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 7e734738532..d2998399056 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -11965,6 +11965,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["FromPg"] = &FromPgWrapper;
Functions["ToPg"] = &ToPgWrapper;
+ Functions["PgClone"] = &PgCloneWrapper;
ExtFunctions["PgAgg"] = &PgAggWrapper;
ExtFunctions["PgAggWindowCall"] = &PgAggWrapper;
ExtFunctions["PgCall"] = &PgCallWrapper;
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
index 89392486334..54895428c3d 100644
--- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
@@ -284,6 +284,33 @@ IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::
return IGraphTransformer::TStatus::Ok;
}
+IGraphTransformer::TStatus PgCloneWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureDependsOnTail(*input, ctx.Expr, 1)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (IsNull(input->Head())) {
+ output = input->HeadPtr();
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ auto type = input->Head().GetTypeAnn();
+ ui32 argType;
+ bool convertToPg;
+ if (!ExtractPgType(type, argType, convertToPg, input->Head().Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertToPg) {
+ input->ChildRef(0) = ctx.Expr.NewCallable(input->Head().Pos(), "ToPg", { input->ChildPtr(0) });
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ auto result = ctx.Expr.MakeType<TPgExprType>(argType);
+ input->SetTypeAnn(result);
+ return IGraphTransformer::TStatus::Ok;
+}
+
IGraphTransformer::TStatus PgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
bool isResolved = input->IsCallable("PgResolvedOp");
if (!EnsureMinArgsCount(*input, isResolved ? 3 : 2, ctx.Expr)) {
@@ -1150,12 +1177,18 @@ IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& inp
initLambda = ctx.Expr.Builder(input->Pos())
.Lambda()
.Param("row")
+ .Param("parent")
.Callable("PgResolvedCallCtx")
.Atom(0, transFuncDesc.Name)
.Atom(1, ToString(aggDesc.TransFuncId))
.List(2)
.Seal()
- .Add(3, initValue)
+ .Callable(3, "PgClone")
+ .Add(0, initValue)
+ .Callable(1, "DependsOn")
+ .Arg(0, "parent")
+ .Seal()
+ .Seal()
.Apply(4, lambda)
.With(0, "row")
.Seal()
@@ -1167,6 +1200,7 @@ IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& inp
.Lambda()
.Param("row")
.Param("state")
+ .Param("parent")
.Callable("Coalesce")
.Callable(0, "PgResolvedCallCtx")
.Atom(0, transFuncDesc.Name)
@@ -1175,7 +1209,12 @@ IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& inp
.Seal()
.Callable(3, "Coalesce")
.Arg(0, "state")
- .Add(1, initValue)
+ .Callable(1, "PgClone")
+ .Add(0, initValue)
+ .Callable(1, "DependsOn")
+ .Arg(0, "parent")
+ .Seal()
+ .Seal()
.Seal()
.Apply(4, lambda)
.With(0, "row")
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.h b/ydb/library/yql/core/type_ann/type_ann_pg.h
index a699db1cf5d..86c9b533476 100644
--- a/ydb/library/yql/core/type_ann/type_ann_pg.h
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.h
@@ -23,6 +23,7 @@ IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode
IGraphTransformer::TStatus PgBoolOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus FromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+IGraphTransformer::TStatus PgCloneWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus PgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus PgWindowCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus PgAggWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 8670090ea89..e9fc8d5bf8d 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -5462,6 +5462,20 @@ TRuntimeNode TProgramBuilder::ToPg(TRuntimeNode input, TType* returnType) {
return TRuntimeNode(callableBuilder.Build(), false);
}
+TRuntimeNode TProgramBuilder::PgClone(TRuntimeNode input, const TArrayRef<const TRuntimeNode>& dependentNodes) {
+ if constexpr (RuntimeVersion < 38U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ TCallableBuilder callableBuilder(Env, __func__, input.GetStaticType());
+ callableBuilder.Add(input);
+ for (const auto& node : dependentNodes) {
+ callableBuilder.Add(node);
+ }
+
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
TRuntimeNode TProgramBuilder::WithContext(TRuntimeNode input, const std::string_view& contextType) {
if constexpr (RuntimeVersion < 30U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index cb1bedc770c..3f611aad9db 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -685,6 +685,7 @@ public:
TRuntimeNode PgCast(TRuntimeNode input, TType* returnType, TRuntimeNode typeMod = {});
TRuntimeNode FromPg(TRuntimeNode input, TType* returnType);
TRuntimeNode ToPg(TRuntimeNode input, TType* returnType);
+ TRuntimeNode PgClone(TRuntimeNode input, const TArrayRef<const TRuntimeNode>& dependentNodes);
TRuntimeNode WithContext(TRuntimeNode input, const std::string_view& contextType);
TRuntimeNode PgInternal0(TType* returnType);
TRuntimeNode PgArray(const TArrayRef<const TRuntimeNode>& args, TType* returnType);
diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h
index 257553f36e1..ce2f556a826 100644
--- a/ydb/library/yql/minikql/mkql_runtime_version.h
+++ b/ydb/library/yql/minikql/mkql_runtime_version.h
@@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 37U
+#define MKQL_RUNTIME_VERSION 38U
#endif
// History:
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index 55c7e29c02e..bc694c4b311 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -1528,6 +1528,43 @@ private:
bool MultiDims = false;
};
+template <bool PassByValue, bool IsCString>
+class TPgClone : public TMutableComputationNode<TPgClone<PassByValue, IsCString>> {
+ typedef TMutableComputationNode<TPgClone<PassByValue, IsCString>> TBaseComputation;
+public:
+ TPgClone(TComputationMutables& mutables, IComputationNode* input, TComputationNodePtrVector&& dependentNodes)
+ : TBaseComputation(mutables)
+ , Input(input)
+ , DependentNodes(std::move(dependentNodes))
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const {
+ auto value = Input->GetValue(compCtx);
+ if constexpr (PassByValue) {
+ return value.Release();
+ }
+
+ auto datum = PointerDatumFromPod(value);
+ if constexpr (IsCString) {
+ return PointerDatumToPod((Datum)MakeCString(TStringBuf((const char*)datum)));
+ } else {
+ return PointerDatumToPod((Datum)MakeVar(GetVarBuf((const text*)datum)));
+ }
+ }
+
+private:
+ void RegisterDependencies() const final {
+ this->DependsOn(Input);
+ for (auto arg : DependentNodes) {
+ this->DependsOn(arg);
+ }
+ }
+
+ IComputationNode* const Input;
+ TComputationNodePtrVector DependentNodes;
+};
+
struct TFromPgExec {
TFromPgExec(ui32 sourceId)
: SourceId(sourceId)
@@ -2009,6 +2046,25 @@ TComputationNodeFactory GetPgFactory() {
return new TPgArray(ctx.Mutables, std::move(argNodes), std::move(argTypes), arrayTypeId);
}
+ if (name == "PgClone") {
+ auto input = LocateNode(ctx.NodeLocator, callable, 0);
+ TComputationNodePtrVector dependentNodes;
+ for (ui32 i = 1; i < callable.GetInputsCount(); ++i) {
+ dependentNodes.emplace_back(LocateNode(ctx.NodeLocator, callable, i));
+ }
+
+ auto returnType = callable.GetType()->GetReturnType();
+ auto typeId = AS_TYPE(TPgType, returnType)->GetTypeId();
+ const auto& desc = NPg::LookupType(typeId);
+ if (desc.PassByValue) {
+ return new TPgClone<true, false>(ctx.Mutables, input, std::move(dependentNodes));
+ } else if (desc.TypeLen == -1) {
+ return new TPgClone<false, false>(ctx.Mutables, input, std::move(dependentNodes));
+ } else {
+ return new TPgClone<false, true>(ctx.Mutables, input, std::move(dependentNodes));
+ }
+ }
+
return nullptr;
};
}
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 55d8b8e23f9..fa583b41107 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -610,7 +610,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
{"Now", &TProgramBuilder::Now},
{"CurrentUtcDate", &TProgramBuilder::CurrentUtcDate},
{"CurrentUtcDatetime", &TProgramBuilder::CurrentUtcDatetime},
- {"CurrentUtcTimestamp", &TProgramBuilder::CurrentUtcTimestamp}
+ {"CurrentUtcTimestamp", &TProgramBuilder::CurrentUtcTimestamp},
});
AddSimpleCallables({
@@ -2503,6 +2503,16 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.BlockToPg(input, returnType);
});
+ AddCallable("PgClone", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ auto input = MkqlBuildExpr(*node.Child(0), ctx);
+ TVector<TRuntimeNode> dependentNodes;
+ for (ui32 i = 1; i < node.ChildrenSize(); ++i) {
+ dependentNodes.push_back(MkqlBuildExpr(*node.Child(i), ctx));
+ }
+
+ return ctx.ProgramBuilder.PgClone(input, dependentNodes);
+ });
+
AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
auto input = MkqlBuildExpr(*node.Child(0), ctx);
return ctx.ProgramBuilder.WithContext(input, node.Child(1)->Content());