diff options
author | vvvv <vvvv@yandex-team.ru> | 2022-03-22 02:29:42 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.ru> | 2022-03-22 02:29:42 +0300 |
commit | 0bc06f620d7059dff18ef16863d5df80985c9212 (patch) | |
tree | 2a8a1ec113e0c7ad5c9c0c42f67cf875bad70870 | |
parent | d9764c900cb7a7ffd5167dd33d7068b483765a2f (diff) | |
download | ydb-0bc06f620d7059dff18ef16863d5df80985c9212.tar.gz |
YQL-14534 introduced WithContext (scalar & flow), integration of PgResolved call with current context
ref:b184dc6228fedaa26e9aff60154217626e0c5407
13 files changed, 298 insertions, 29 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index e8befae258..2758a199a2 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -2224,10 +2224,6 @@ TExprNode::TPtr ExpandListHas(const TExprNode::TPtr& input, TExprContext& ctx) { return RewriteSearchByKeyForTypesMismatch<true, true>(input, ctx); } -TExprNode::TPtr ExpandCurrentStream(const TExprNode::TPtr& input, TExprContext& ctx) { - return ctx.NewCallable(input->Pos(), "Void", {}); -} - template <bool Flat, bool List> TExprNode::TPtr ExpandContainerIf(const TExprNode::TPtr& input, TExprContext& ctx) { YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << input->Content(); @@ -5774,8 +5770,7 @@ struct TPeepHoleRules { {"RangeEmpty", &ExpandRangeEmpty}, {"AsRange", &ExpandAsRange}, {"RangeFor", &ExpandRangeFor}, - {"ToFlow", &DropToFlowDeps}, - {"CurrentStream", &ExpandCurrentStream} + {"ToFlow", &DropToFlowDeps} }; static constexpr std::initializer_list<TPeepHoleOptimizerMap::value_type> SimplifyStageRulesInit = { diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 9a55fb46fa..83566cfcf6 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -3195,13 +3195,40 @@ namespace NTypeAnnImpl { return IGraphTransformer::TStatus::Ok; } - IGraphTransformer::TStatus CurrentStreamWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - Y_UNUSED(output); - if (!EnsureArgsCount(*input, 0, ctx.Expr)) { + IGraphTransformer::TStatus WithContextWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + if (!EnsureArgsCount(*input, 2, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - input->SetTypeAnn(ctx.Expr.MakeType<TVoidExprType>()); + if (!EnsureAtom(input->Head(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (input->Head().Content() != "Agg" && input->Head().Content() != "WinAgg") { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), TStringBuilder() << + "Unexpected context type: " << input->Head().Content())); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureComputable(input->Tail(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (input->Tail().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream) { + output = ctx.Expr.Builder(input->Pos()) + .Callable("FromFlow") + .Callable("WithContext") + .Add(0, input->HeadPtr()) + .Callable(1, "ToFlow") + .Add(0, input->TailPtr()) + .Seal() + .Seal() + .Seal() + .Build(); + return IGraphTransformer::TStatus::Repeat; + } + + input->SetTypeAnn(input->Tail().GetTypeAnn()); return IGraphTransformer::TStatus::Ok; } @@ -8800,7 +8827,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> }; IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { - bool isResolved = input->Content() == "PgResolvedCall"; + bool isResolved = input->Content().StartsWith("PgResolvedCall"); if (!EnsureMinArgsCount(*input, isResolved ? 2 : 1, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -9731,7 +9758,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> finishLambda = ctx.Expr.Builder(input->Pos()) .Lambda() .Param("state") - .Callable("PgResolvedCall") + .Callable("PgResolvedCallCtx") .Atom(0, NPg::LookupProc(aggDesc.FinalFuncId).Name) .Atom(1, ToString(aggDesc.FinalFuncId)) .Arg(2, "state") @@ -9771,7 +9798,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> initLambda = ctx.Expr.Builder(input->Pos()) .Lambda() .Param("row") - .Callable("PgResolvedCall") + .Callable("PgResolvedCallCtx") .Atom(0, transFuncDesc.Name) .Atom(1, ToString(aggDesc.TransFuncId)) .Add(2, initValue) @@ -9787,7 +9814,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> .Param("row") .Param("state") .Callable("Coalesce") - .Callable(0, "PgResolvedCall") + .Callable(0, "PgResolvedCallCtx") .Atom(0, transFuncDesc.Name) .Atom(1, ToString(aggDesc.TransFuncId)) .Callable(2, "Coalesce") @@ -9821,7 +9848,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> .Arg(0, "state") .Seal() .Callable(1, "Coalesce") - .Callable(0, "PgResolvedCall") + .Callable(0, "PgResolvedCallCtx") .Atom(0, transFuncDesc.Name) .Atom(1, ToString(aggDesc.TransFuncId)) .Arg(2, "state") @@ -13677,7 +13704,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["FromFlow"] = &FromFlowWrapper; Functions["BuildTablePath"] = &BuildTablePathWrapper; Functions["WithOptionalArgs"] = &WithOptionalArgsWrapper; - Functions["CurrentStream"] = &CurrentStreamWrapper; + Functions["WithContext"] = &WithContextWrapper; Functions["DecimalDiv"] = &DecimalBinaryWrapper; Functions["DecimalMod"] = &DecimalBinaryWrapper; @@ -13745,6 +13772,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> ExtFunctions["PgAggWindowCall"] = &PgAggWrapper; ExtFunctions["PgCall"] = &PgCallWrapper; ExtFunctions["PgResolvedCall"] = &PgCallWrapper; + ExtFunctions["PgResolvedCallCtx"] = &PgCallWrapper; ExtFunctions["PgOp"] = &PgOpWrapper; ExtFunctions["PgResolvedOp"] = &PgOpWrapper; ExtFunctions["PgSelect"] = &PgSelectWrapper; diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt index 63a0842e2d..2094ec7b18 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt @@ -128,5 +128,6 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_zip.cpp ) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index ae2b84405b..7c1696c290 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -96,6 +96,7 @@ #include "mkql_wide_condense.h" #include "mkql_wide_filter.h" #include "mkql_wide_map.h" +#include "mkql_withcontext.h" #include "mkql_zip.h" #include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> @@ -184,6 +185,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"Not", &WrapNot}, {"Zip", &WrapZip<false>}, {"ZipAll", &WrapZip<true>}, + {"WithContext", &WrapWithContext}, {"Reduce", &WrapReduce}, {"Length", &WrapLength}, {"Iterable", &WrapIterable}, diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp new file mode 100644 index 0000000000..c9a5e86a96 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp @@ -0,0 +1,139 @@ +#include "mkql_withcontext.h" + +#include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_pg.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> + +#include <util/generic/scope.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +class TWithContextWrapper : public TMutableComputationNode<TWithContextWrapper> { + typedef TMutableComputationNode<TWithContextWrapper> TBaseComputation; +public: + TWithContextWrapper(TComputationMutables& mutables, const std::string_view& contextType, IComputationNode* arg) + : TBaseComputation(mutables) + , ContextType(contextType) + , Arg(arg) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const { + auto prev = TlsAllocState->CurrentContext; + TlsAllocState->CurrentContext = PgInitializeContext(ContextType); + Y_DEFER { + PgDestroyContext(ContextType, TlsAllocState->CurrentContext); + TlsAllocState->CurrentContext = prev; + }; + + TPAllocScope scope; + return Arg->GetValue(compCtx).Release(); + } + +private: + void RegisterDependencies() const final { + this->DependsOn(Arg); + } + + IComputationNode* const Arg; + const std::string_view ContextType; +}; + +struct TState : public TComputationValue<TState> { + + TState(TMemoryUsageInfo* memInfo, const std::string_view& contextType) + : TComputationValue(memInfo) + , ContextType(contextType) + { + Ctx = PgInitializeContext(ContextType); + Scope.Detach(); + } + + void Attach() { + Scope.Attach(); + PrevContext = TlsAllocState->CurrentContext; + TlsAllocState->CurrentContext = Ctx; + } + + void Detach() { + Scope.Detach(); + TlsAllocState->CurrentContext = PrevContext; + } + + void Cleanup() { + if (Ctx) { + PgDestroyContext(ContextType, Ctx); + Ctx = nullptr; + Scope.Cleanup(); + } + } + + ~TState() { + Cleanup(); + } + + const std::string_view ContextType; + void* Ctx = nullptr; + TPAllocScope Scope; + void* PrevContext = nullptr; +}; + +class TWithContextFlowWrapper : public TStatefulFlowComputationNode<TWithContextFlowWrapper> { + typedef TStatefulFlowComputationNode<TWithContextFlowWrapper> TBaseComputation; +public: + TWithContextFlowWrapper(TComputationMutables& mutables, const std::string_view& contextType, + EValueRepresentation kind, IComputationNode* flow) + : TBaseComputation(mutables, flow, kind, EValueRepresentation::Any) + , Flow(flow) + , ContextType(contextType) + {} + + NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx) const { + if (!stateValue.HasValue()) { + stateValue = ctx.HolderFactory.Create<TState>(ContextType); + } + + TState& state = *static_cast<TState*>(stateValue.AsBoxed().Get()); + state.Attach(); + Y_DEFER { + state.Detach(); + }; + + auto item = Flow->GetValue(ctx); + if (item.IsFinish()) { + state.Cleanup(); + } + + return item; + } + +private: + void RegisterDependencies() const final { + this->FlowDependsOn(Flow); + } + + IComputationNode* const Flow; + const std::string_view ContextType; +}; + + +} + +IComputationNode* WrapWithContext(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + const auto contextTypeData = AS_VALUE(TDataLiteral, callable.GetInput(0)); + auto contextType = contextTypeData->AsValue().AsStringRef(); + auto arg = LocateNode(ctx.NodeLocator, callable, 1); + if (callable.GetInput(1).GetStaticType()->IsFlow()) { + const auto type = callable.GetType()->GetReturnType(); + return new TWithContextFlowWrapper(ctx.Mutables, contextType, GetValueRepresentation(type), arg); + } else { + MKQL_ENSURE(!callable.GetInput(1).GetStaticType()->IsStream(), "Stream is not expected here"); + return new TWithContextWrapper(ctx.Mutables, contextType, arg); + } +} + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h new file mode 100644 index 0000000000..0d2ab1523f --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h @@ -0,0 +1,10 @@ +#pragma once +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> + +namespace NKikimr { +namespace NMiniKQL { + +IComputationNode* WrapWithContext(TCallable& callable, const TComputationNodeFactoryContext& ctx); + +} +} diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pg.h b/ydb/library/yql/minikql/computation/mkql_computation_pg.h new file mode 100644 index 0000000000..fbb4ae7302 --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_computation_pg.h @@ -0,0 +1,11 @@ +#pragma once +#include <string_view> + +namespace NKikimr { +namespace NMiniKQL { + +void* PgInitializeContext(const std::string_view& contextType); +void PgDestroyContext(const std::string_view& contextType, void* ctx); + +} // namespace MiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h index 791d9132ad..733f6502de 100644 --- a/ydb/library/yql/minikql/mkql_alloc.h +++ b/ydb/library/yql/minikql/mkql_alloc.h @@ -57,6 +57,7 @@ struct TAllocState : public TAlignedPagePool TListEntry OffloadedBlocksRoot; TListEntry GlobalPAllocList; TListEntry* CurrentPAllocList; + void* CurrentContext = nullptr; ::NKikimr::NUdf::TBoxedValueLink Root; @@ -81,7 +82,7 @@ public: } ~TPAllocScope() { - TAllocState::CleanupPAllocList(&PAllocList); + Cleanup(); Detach(); } @@ -100,6 +101,10 @@ public: } } + void Cleanup() { + TAllocState::CleanupPAllocList(&PAllocList); + } + private: TAllocState::TListEntry PAllocList; TAllocState::TListEntry* Prev = nullptr; diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index e1bc8ed11d..d41c29302f 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5030,13 +5030,15 @@ TRuntimeNode TProgramBuilder::PgConst(TPgType* pgType, const std::string_view& v return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::PgResolvedCall(const std::string_view& name, ui32 id, const TArrayRef<const TRuntimeNode>& args, +TRuntimeNode TProgramBuilder::PgResolvedCall(bool useContext, const std::string_view& name, + ui32 id, const TArrayRef<const TRuntimeNode>& args, TType* returnType) { if constexpr (RuntimeVersion < 30U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(NewDataLiteral(useContext)); callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(name)); callableBuilder.Add(NewDataLiteral(id)); for (const auto arg : args) { @@ -5076,6 +5078,17 @@ TRuntimeNode TProgramBuilder::ToPg(TRuntimeNode input, TType* returnType) { return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TProgramBuilder::WithContext(const std::string_view& contextType, TRuntimeNode input) { + if constexpr (RuntimeVersion < 30U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + TCallableBuilder callableBuilder(Env, __func__, input.GetStaticType()); + callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(contextType)); + callableBuilder.Add(input); + return TRuntimeNode(callableBuilder.Build(), false); +} + bool CanExportType(TType* type, const TTypeEnvironment& env) { if (type->GetKind() == TType::EKind::Type) { return false; // Type of Type diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 9f95236701..eeba821332 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -624,10 +624,12 @@ public: typedef TRuntimeNode (TProgramBuilder::*NarrowFunctionMethod)(TRuntimeNode, const TNarrowLambda&); TRuntimeNode PgConst(TPgType* pgType, const std::string_view& value); - TRuntimeNode PgResolvedCall(const std::string_view& name, ui32 id, const TArrayRef<const TRuntimeNode>& args, TType* returnType); + TRuntimeNode PgResolvedCall(bool useContext, const std::string_view& name, ui32 id, + const TArrayRef<const TRuntimeNode>& args, TType* returnType); TRuntimeNode PgCast(TRuntimeNode input, TType* returnType); TRuntimeNode FromPg(TRuntimeNode input, TType* returnType); TRuntimeNode ToPg(TRuntimeNode input, TType* returnType); + TRuntimeNode WithContext(const std::string_view& contextType, TRuntimeNode input); protected: TRuntimeNode Invoke(const std::string_view& funcName, TType* resultType, const TArrayRef<const TRuntimeNode>& args); diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index e1810a03ec..f0f7c2acad 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -13,6 +13,8 @@ #define TypeName PG_TypeName #define SortBy PG_SortBy +#define Sort PG_Sort +#define Unique PG_Unique #undef SIZEOF_SIZE_T extern "C" { #include "postgres.h" @@ -21,6 +23,7 @@ extern "C" { #include "utils/fmgrprotos.h" #include "utils/builtins.h" #include "utils/memutils.h" +#include "nodes/execnodes.h" #include "lib/stringinfo.h" #include "thread_inits.h" #undef Abs @@ -28,6 +31,8 @@ extern "C" { #undef Max #undef TypeName #undef SortBy +#undef Sort +#undef Unique #undef LOG #undef INFO #undef NOTICE @@ -290,10 +295,11 @@ private: class TPgResolvedCall : public TMutableComputationNode<TPgResolvedCall> { typedef TMutableComputationNode<TPgResolvedCall> TBaseComputation; public: - TPgResolvedCall(TComputationMutables& mutables, const std::string_view& name, ui32 id, + TPgResolvedCall(TComputationMutables& mutables, bool useContext, const std::string_view& name, ui32 id, TComputationNodePtrVector&& argNodes, TVector<TType*>&& argTypes) : TBaseComputation(mutables) , StateIndex(mutables.CurValueIndex++) + , UseContext(useContext) , Name(name) , Id(id) , ArgNodes(std::move(argNodes)) @@ -335,6 +341,10 @@ public: auto& state = GetState(compCtx); auto& callInfo = state.CallInfo.Ref(); + if (UseContext) { + callInfo.context = (Node*)TlsAllocState->CurrentContext; + } + callInfo.isnull = false; for (ui32 i = 0; i < ArgNodes.size(); ++i) { auto value = ArgNodes[i]->GetValue(compCtx); @@ -355,7 +365,11 @@ public: } inputArgs.Detach(); - TPAllocScope call; + TMaybe<TPAllocScope> call; + if (!callInfo.context) { + call.ConstructInPlace(); + } + PG_TRY(); { auto ret = FInfo.fn_addr(&callInfo); @@ -404,6 +418,7 @@ private: } const ui32 StateIndex; + const bool UseContext; const std::string_view Name; const ui32 Id; FmgrInfo FInfo; @@ -751,18 +766,20 @@ TComputationNodeFactory GetPgFactory() { } if (name == "PgResolvedCall") { - const auto nameData = AS_VALUE(TDataLiteral, callable.GetInput(0)); - const auto idData = AS_VALUE(TDataLiteral, callable.GetInput(1)); + const auto useContextData = AS_VALUE(TDataLiteral, callable.GetInput(0)); + const auto nameData = AS_VALUE(TDataLiteral, callable.GetInput(1)); + const auto idData = AS_VALUE(TDataLiteral, callable.GetInput(2)); + auto useContext = useContextData->AsValue().Get<bool>(); auto name = nameData->AsValue().AsStringRef(); - ui32 id = idData->AsValue().Get<ui32>(); + auto id = idData->AsValue().Get<ui32>(); TComputationNodePtrVector argNodes; TVector<TType*> argTypes; - for (ui32 i = 2; i < callable.GetInputsCount(); ++i) { + for (ui32 i = 3; i < callable.GetInputsCount(); ++i) { argNodes.emplace_back(LocateNode(ctx.NodeLocator, callable, i)); argTypes.emplace_back(callable.GetInput(i).GetStaticType()); } - return new TPgResolvedCall(ctx.Mutables, name, id, std::move(argNodes), std::move(argTypes)); + return new TPgResolvedCall(ctx.Mutables, useContext, name, id, std::move(argNodes), std::move(argTypes)); } if (name == "PgCast") { @@ -1570,6 +1587,37 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { } } +void* PgInitializeContext(const std::string_view& contextType) { + if (contextType == "Agg") { + auto ctx = (AggState*)MKQLAllocWithSize(sizeof(AggState)); + Zero(*ctx); + *(NodeTag*)ctx = T_AggState; + ctx->curaggcontext = (ExprContext*)MKQLAllocWithSize(sizeof(ExprContext)); + Zero(*ctx->curaggcontext); + ctx->curaggcontext->ecxt_per_tuple_memory = TMkqlPgAdapter::Instance(); + return ctx; + } else if (contextType == "WinAgg") { + auto ctx = (WindowAggState*)MKQLAllocWithSize(sizeof(WindowAggState)); + Zero(*ctx); + *(NodeTag*)ctx = T_WindowAggState; + ctx->curaggcontext = TMkqlPgAdapter::Instance(); + return ctx; + } else { + ythrow yexception() << "Unsupported context type: " << contextType; + } +} + +void PgDestroyContext(const std::string_view& contextType, void* ctx) { + if (contextType == "Agg") { + MKQLFreeWithSize(((AggState*)ctx)->curaggcontext, sizeof(ExprContext)); + MKQLFreeWithSize(ctx, sizeof(AggState)); + } else if (contextType == "WinAgg") { + MKQLFreeWithSize(ctx, sizeof(WindowAggState)); + } else { + Y_FAIL("Unsupported context type"); + } +} + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 01cedd1cdb..285c7ed232 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -2242,7 +2242,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.PgConst(type, node.Head().Content()); }); - AddCallable("PgResolvedCall", [](const TExprNode& node, TMkqlBuildContext& ctx) { + AddCallable({"PgResolvedCall","PgResolvedCallCtx" }, [](const TExprNode& node, TMkqlBuildContext& ctx) { auto name = node.Head().Content(); auto id = FromString<ui32>(node.Child(1)->Content()); std::vector<TRuntimeNode> args; @@ -2252,7 +2252,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { } auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); - return ctx.ProgramBuilder.PgResolvedCall(name, id, args, returnType); + return ctx.ProgramBuilder.PgResolvedCall(node.IsCallable("PgResolvedCallCtx"), name, id, args, returnType); }); AddCallable("PgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) { @@ -2266,7 +2266,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { } auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); - return ctx.ProgramBuilder.PgResolvedCall(procName, procId, args, returnType); + return ctx.ProgramBuilder.PgResolvedCall(false, procName, procId, args, returnType); }); AddCallable("PgCast", [](const TExprNode& node, TMkqlBuildContext& ctx) { @@ -2287,6 +2287,11 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.ToPg(input, returnType); }); + AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) { + auto input = MkqlBuildExpr(*node.Child(1), ctx); + return ctx.ProgramBuilder.WithContext(node.Child(0)->Content(), input); + }); + AddCallable("QueueCreate", [](const TExprNode& node, TMkqlBuildContext& ctx) { const auto initCapacity = MkqlBuildExpr(*node.Child(1), ctx); const auto initSize = MkqlBuildExpr(*node.Child(2), ctx); diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 5fa30e6021..ae594de09d 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -87,6 +87,16 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { throw yexception() << "PG types are not supported"; } +void* PgInitializeContext(const std::string_view& contextType) { + Y_UNUSED(contextType); + return nullptr; +} + +void PgDestroyContext(const std::string_view& contextType, void* ctx) { + Y_UNUSED(contextType); + Y_UNUSED(ctx); +} + } // namespace NMiniKQL } // namespace NKikimr |