diff options
author | vvvv <vvvv@ydb.tech> | 2023-05-11 11:02:33 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-05-11 11:02:33 +0300 |
commit | bbc9f26922e5aebda5ba85c50c00c3423245bd7b (patch) | |
tree | 9acaf8202a33323632a84c718ecb56a7480ed7ad | |
parent | cdafe4800200f0bf59e607b0ee20d0cb9064e953 (diff) | |
download | ydb-bbc9f26922e5aebda5ba85c50c00c3423245bd7b.tar.gz |
fixed Pg aggregations with init values
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_core.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_pg.cpp | 43 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_pg.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_program_builder.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_program_builder.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_runtime_version.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 56 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp | 12 |
8 files changed, 126 insertions, 4 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 7e734738532..d2998399056 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -11965,6 +11965,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["FromPg"] = &FromPgWrapper; Functions["ToPg"] = &ToPgWrapper; + Functions["PgClone"] = &PgCloneWrapper; ExtFunctions["PgAgg"] = &PgAggWrapper; ExtFunctions["PgAggWindowCall"] = &PgAggWrapper; ExtFunctions["PgCall"] = &PgCallWrapper; diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp index 89392486334..54895428c3d 100644 --- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp @@ -284,6 +284,33 @@ IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode:: return IGraphTransformer::TStatus::Ok; } +IGraphTransformer::TStatus PgCloneWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + if (!EnsureDependsOnTail(*input, ctx.Expr, 1)) { + return IGraphTransformer::TStatus::Error; + } + + if (IsNull(input->Head())) { + output = input->HeadPtr(); + return IGraphTransformer::TStatus::Repeat; + } + + auto type = input->Head().GetTypeAnn(); + ui32 argType; + bool convertToPg; + if (!ExtractPgType(type, argType, convertToPg, input->Head().Pos(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (convertToPg) { + input->ChildRef(0) = ctx.Expr.NewCallable(input->Head().Pos(), "ToPg", { input->ChildPtr(0) }); + return IGraphTransformer::TStatus::Repeat; + } + + auto result = ctx.Expr.MakeType<TPgExprType>(argType); + input->SetTypeAnn(result); + return IGraphTransformer::TStatus::Ok; +} + IGraphTransformer::TStatus PgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { bool isResolved = input->IsCallable("PgResolvedOp"); if (!EnsureMinArgsCount(*input, isResolved ? 3 : 2, ctx.Expr)) { @@ -1150,12 +1177,18 @@ IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& inp initLambda = ctx.Expr.Builder(input->Pos()) .Lambda() .Param("row") + .Param("parent") .Callable("PgResolvedCallCtx") .Atom(0, transFuncDesc.Name) .Atom(1, ToString(aggDesc.TransFuncId)) .List(2) .Seal() - .Add(3, initValue) + .Callable(3, "PgClone") + .Add(0, initValue) + .Callable(1, "DependsOn") + .Arg(0, "parent") + .Seal() + .Seal() .Apply(4, lambda) .With(0, "row") .Seal() @@ -1167,6 +1200,7 @@ IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& inp .Lambda() .Param("row") .Param("state") + .Param("parent") .Callable("Coalesce") .Callable(0, "PgResolvedCallCtx") .Atom(0, transFuncDesc.Name) @@ -1175,7 +1209,12 @@ IGraphTransformer::TStatus PgAggregationTraitsWrapper(const TExprNode::TPtr& inp .Seal() .Callable(3, "Coalesce") .Arg(0, "state") - .Add(1, initValue) + .Callable(1, "PgClone") + .Add(0, initValue) + .Callable(1, "DependsOn") + .Arg(0, "parent") + .Seal() + .Seal() .Seal() .Apply(4, lambda) .With(0, "row") diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.h b/ydb/library/yql/core/type_ann/type_ann_pg.h index a699db1cf5d..86c9b533476 100644 --- a/ydb/library/yql/core/type_ann/type_ann_pg.h +++ b/ydb/library/yql/core/type_ann/type_ann_pg.h @@ -23,6 +23,7 @@ IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode IGraphTransformer::TStatus PgBoolOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus FromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); +IGraphTransformer::TStatus PgCloneWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus PgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus PgWindowCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus PgAggWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 8670090ea89..e9fc8d5bf8d 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5462,6 +5462,20 @@ TRuntimeNode TProgramBuilder::ToPg(TRuntimeNode input, TType* returnType) { return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TProgramBuilder::PgClone(TRuntimeNode input, const TArrayRef<const TRuntimeNode>& dependentNodes) { + if constexpr (RuntimeVersion < 38U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + TCallableBuilder callableBuilder(Env, __func__, input.GetStaticType()); + callableBuilder.Add(input); + for (const auto& node : dependentNodes) { + callableBuilder.Add(node); + } + + return TRuntimeNode(callableBuilder.Build(), false); +} + TRuntimeNode TProgramBuilder::WithContext(TRuntimeNode input, const std::string_view& contextType) { if constexpr (RuntimeVersion < 30U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index cb1bedc770c..3f611aad9db 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -685,6 +685,7 @@ public: TRuntimeNode PgCast(TRuntimeNode input, TType* returnType, TRuntimeNode typeMod = {}); TRuntimeNode FromPg(TRuntimeNode input, TType* returnType); TRuntimeNode ToPg(TRuntimeNode input, TType* returnType); + TRuntimeNode PgClone(TRuntimeNode input, const TArrayRef<const TRuntimeNode>& dependentNodes); TRuntimeNode WithContext(TRuntimeNode input, const std::string_view& contextType); TRuntimeNode PgInternal0(TType* returnType); TRuntimeNode PgArray(const TArrayRef<const TRuntimeNode>& args, TType* returnType); diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index 257553f36e1..ce2f556a826 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 37U +#define MKQL_RUNTIME_VERSION 38U #endif // History: diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 55c7e29c02e..bc694c4b311 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -1528,6 +1528,43 @@ private: bool MultiDims = false; }; +template <bool PassByValue, bool IsCString> +class TPgClone : public TMutableComputationNode<TPgClone<PassByValue, IsCString>> { + typedef TMutableComputationNode<TPgClone<PassByValue, IsCString>> TBaseComputation; +public: + TPgClone(TComputationMutables& mutables, IComputationNode* input, TComputationNodePtrVector&& dependentNodes) + : TBaseComputation(mutables) + , Input(input) + , DependentNodes(std::move(dependentNodes)) + { + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const { + auto value = Input->GetValue(compCtx); + if constexpr (PassByValue) { + return value.Release(); + } + + auto datum = PointerDatumFromPod(value); + if constexpr (IsCString) { + return PointerDatumToPod((Datum)MakeCString(TStringBuf((const char*)datum))); + } else { + return PointerDatumToPod((Datum)MakeVar(GetVarBuf((const text*)datum))); + } + } + +private: + void RegisterDependencies() const final { + this->DependsOn(Input); + for (auto arg : DependentNodes) { + this->DependsOn(arg); + } + } + + IComputationNode* const Input; + TComputationNodePtrVector DependentNodes; +}; + struct TFromPgExec { TFromPgExec(ui32 sourceId) : SourceId(sourceId) @@ -2009,6 +2046,25 @@ TComputationNodeFactory GetPgFactory() { return new TPgArray(ctx.Mutables, std::move(argNodes), std::move(argTypes), arrayTypeId); } + if (name == "PgClone") { + auto input = LocateNode(ctx.NodeLocator, callable, 0); + TComputationNodePtrVector dependentNodes; + for (ui32 i = 1; i < callable.GetInputsCount(); ++i) { + dependentNodes.emplace_back(LocateNode(ctx.NodeLocator, callable, i)); + } + + auto returnType = callable.GetType()->GetReturnType(); + auto typeId = AS_TYPE(TPgType, returnType)->GetTypeId(); + const auto& desc = NPg::LookupType(typeId); + if (desc.PassByValue) { + return new TPgClone<true, false>(ctx.Mutables, input, std::move(dependentNodes)); + } else if (desc.TypeLen == -1) { + return new TPgClone<false, false>(ctx.Mutables, input, std::move(dependentNodes)); + } else { + return new TPgClone<false, true>(ctx.Mutables, input, std::move(dependentNodes)); + } + } + return nullptr; }; } diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 55d8b8e23f9..fa583b41107 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -610,7 +610,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { {"Now", &TProgramBuilder::Now}, {"CurrentUtcDate", &TProgramBuilder::CurrentUtcDate}, {"CurrentUtcDatetime", &TProgramBuilder::CurrentUtcDatetime}, - {"CurrentUtcTimestamp", &TProgramBuilder::CurrentUtcTimestamp} + {"CurrentUtcTimestamp", &TProgramBuilder::CurrentUtcTimestamp}, }); AddSimpleCallables({ @@ -2503,6 +2503,16 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.BlockToPg(input, returnType); }); + AddCallable("PgClone", [](const TExprNode& node, TMkqlBuildContext& ctx) { + auto input = MkqlBuildExpr(*node.Child(0), ctx); + TVector<TRuntimeNode> dependentNodes; + for (ui32 i = 1; i < node.ChildrenSize(); ++i) { + dependentNodes.push_back(MkqlBuildExpr(*node.Child(i), ctx)); + } + + return ctx.ProgramBuilder.PgClone(input, dependentNodes); + }); + AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) { auto input = MkqlBuildExpr(*node.Child(0), ctx); return ctx.ProgramBuilder.WithContext(input, node.Child(1)->Content()); |