diff options
| author | vvvv <[email protected]> | 2022-03-22 02:29:42 +0300 | 
|---|---|---|
| committer | vvvv <[email protected]> | 2022-03-22 02:29:42 +0300 | 
| commit | 0bc06f620d7059dff18ef16863d5df80985c9212 (patch) | |
| tree | 2a8a1ec113e0c7ad5c9c0c42f67cf875bad70870 | |
| parent | d9764c900cb7a7ffd5167dd33d7068b483765a2f (diff) | |
YQL-14534 introduced WithContext (scalar & flow), integration of PgResolved call with current context
ref:b184dc6228fedaa26e9aff60154217626e0c5407
13 files changed, 298 insertions, 29 deletions
| diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index e8befae2581..2758a199a21 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -2224,10 +2224,6 @@ TExprNode::TPtr ExpandListHas(const TExprNode::TPtr& input, TExprContext& ctx) {      return RewriteSearchByKeyForTypesMismatch<true, true>(input, ctx);  } -TExprNode::TPtr ExpandCurrentStream(const TExprNode::TPtr& input, TExprContext& ctx) { -    return ctx.NewCallable(input->Pos(), "Void", {}); -} -  template <bool Flat, bool List>  TExprNode::TPtr ExpandContainerIf(const TExprNode::TPtr& input, TExprContext& ctx) {      YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << input->Content(); @@ -5774,8 +5770,7 @@ struct TPeepHoleRules {          {"RangeEmpty", &ExpandRangeEmpty},          {"AsRange", &ExpandAsRange},          {"RangeFor", &ExpandRangeFor}, -        {"ToFlow", &DropToFlowDeps}, -        {"CurrentStream", &ExpandCurrentStream} +        {"ToFlow", &DropToFlowDeps}      };      static constexpr std::initializer_list<TPeepHoleOptimizerMap::value_type> SimplifyStageRulesInit = { diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 9a55fb46fab..83566cfcf64 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -3195,13 +3195,40 @@ namespace NTypeAnnImpl {          return IGraphTransformer::TStatus::Ok;      } -    IGraphTransformer::TStatus CurrentStreamWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { -        Y_UNUSED(output); -        if (!EnsureArgsCount(*input, 0, ctx.Expr)) { +    IGraphTransformer::TStatus WithContextWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { +        if (!EnsureArgsCount(*input, 2, ctx.Expr)) {              return IGraphTransformer::TStatus::Error;          } -        input->SetTypeAnn(ctx.Expr.MakeType<TVoidExprType>()); +        if (!EnsureAtom(input->Head(), ctx.Expr)) { +            return IGraphTransformer::TStatus::Error; +        } + +        if (input->Head().Content() != "Agg" && input->Head().Content() != "WinAgg") { +            ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), TStringBuilder() << +                "Unexpected context type: " << input->Head().Content())); +            return IGraphTransformer::TStatus::Error; +        } + +        if (!EnsureComputable(input->Tail(), ctx.Expr)) { +            return IGraphTransformer::TStatus::Error; +        } + +        if (input->Tail().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream) { +            output = ctx.Expr.Builder(input->Pos()) +                .Callable("FromFlow") +                    .Callable("WithContext") +                        .Add(0, input->HeadPtr()) +                        .Callable(1, "ToFlow") +                            .Add(0, input->TailPtr()) +                        .Seal() +                    .Seal() +                .Seal() +                .Build(); +            return IGraphTransformer::TStatus::Repeat; +        } + +        input->SetTypeAnn(input->Tail().GetTypeAnn());          return IGraphTransformer::TStatus::Ok;      } @@ -8800,7 +8827,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>      };      IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { -        bool isResolved = input->Content() == "PgResolvedCall"; +        bool isResolved = input->Content().StartsWith("PgResolvedCall");          if (!EnsureMinArgsCount(*input, isResolved ? 2 : 1, ctx.Expr)) {              return IGraphTransformer::TStatus::Error;          } @@ -9731,7 +9758,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>              finishLambda = ctx.Expr.Builder(input->Pos())                  .Lambda()                  .Param("state") -                .Callable("PgResolvedCall") +                .Callable("PgResolvedCallCtx")                      .Atom(0, NPg::LookupProc(aggDesc.FinalFuncId).Name)                      .Atom(1, ToString(aggDesc.FinalFuncId))                      .Arg(2, "state") @@ -9771,7 +9798,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>              initLambda = ctx.Expr.Builder(input->Pos())                  .Lambda()                      .Param("row") -                    .Callable("PgResolvedCall") +                    .Callable("PgResolvedCallCtx")                          .Atom(0, transFuncDesc.Name)                          .Atom(1, ToString(aggDesc.TransFuncId))                          .Add(2, initValue) @@ -9787,7 +9814,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>                      .Param("row")                      .Param("state")                      .Callable("Coalesce") -                        .Callable(0, "PgResolvedCall") +                        .Callable(0, "PgResolvedCallCtx")                              .Atom(0, transFuncDesc.Name)                              .Atom(1, ToString(aggDesc.TransFuncId))                              .Callable(2, "Coalesce") @@ -9821,7 +9848,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>                              .Arg(0, "state")                          .Seal()                          .Callable(1, "Coalesce") -                            .Callable(0, "PgResolvedCall") +                            .Callable(0, "PgResolvedCallCtx")                                  .Atom(0, transFuncDesc.Name)                                  .Atom(1, ToString(aggDesc.TransFuncId))                                  .Arg(2, "state") @@ -13677,7 +13704,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>          Functions["FromFlow"] = &FromFlowWrapper;          Functions["BuildTablePath"] = &BuildTablePathWrapper;          Functions["WithOptionalArgs"] = &WithOptionalArgsWrapper; -        Functions["CurrentStream"] = &CurrentStreamWrapper; +        Functions["WithContext"] = &WithContextWrapper;          Functions["DecimalDiv"] = &DecimalBinaryWrapper;          Functions["DecimalMod"] = &DecimalBinaryWrapper; @@ -13745,6 +13772,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>          ExtFunctions["PgAggWindowCall"] = &PgAggWrapper;          ExtFunctions["PgCall"] = &PgCallWrapper;          ExtFunctions["PgResolvedCall"] = &PgCallWrapper; +        ExtFunctions["PgResolvedCallCtx"] = &PgCallWrapper;          ExtFunctions["PgOp"] = &PgOpWrapper;          ExtFunctions["PgResolvedOp"] = &PgOpWrapper;          ExtFunctions["PgSelect"] = &PgSelectWrapper; diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt index 63a0842e2d5..2094ec7b189 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt @@ -128,5 +128,6 @@ target_sources(yql-minikql-comp_nodes PRIVATE    ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp    ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp    ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp +  ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp    ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_zip.cpp  ) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index ae2b84405b4..7c1696c290a 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -96,6 +96,7 @@  #include "mkql_wide_condense.h"  #include "mkql_wide_filter.h"  #include "mkql_wide_map.h" +#include "mkql_withcontext.h"  #include "mkql_zip.h"  #include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> @@ -184,6 +185,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {          {"Not", &WrapNot},          {"Zip", &WrapZip<false>},          {"ZipAll", &WrapZip<true>}, +        {"WithContext", &WrapWithContext},          {"Reduce", &WrapReduce},          {"Length", &WrapLength},          {"Iterable", &WrapIterable}, diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp new file mode 100644 index 00000000000..c9a5e86a96d --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp @@ -0,0 +1,139 @@ +#include "mkql_withcontext.h" + +#include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_pg.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> + +#include <util/generic/scope.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +class TWithContextWrapper : public TMutableComputationNode<TWithContextWrapper> { +    typedef TMutableComputationNode<TWithContextWrapper> TBaseComputation; +public: +    TWithContextWrapper(TComputationMutables& mutables, const std::string_view& contextType, IComputationNode* arg) +        : TBaseComputation(mutables) +        , ContextType(contextType) +        , Arg(arg) +    {} + +    NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const { +        auto prev = TlsAllocState->CurrentContext; +        TlsAllocState->CurrentContext = PgInitializeContext(ContextType); +        Y_DEFER { +            PgDestroyContext(ContextType, TlsAllocState->CurrentContext); +            TlsAllocState->CurrentContext = prev; +        }; + +        TPAllocScope scope; +        return Arg->GetValue(compCtx).Release(); +    } + +private: +    void RegisterDependencies() const final { +        this->DependsOn(Arg); +    } + +    IComputationNode* const Arg; +    const std::string_view ContextType; +}; + +struct TState : public TComputationValue<TState> { + +    TState(TMemoryUsageInfo* memInfo, const std::string_view& contextType) +        : TComputationValue(memInfo) +        , ContextType(contextType) +    { +        Ctx = PgInitializeContext(ContextType); +        Scope.Detach(); +    } + +    void Attach() { +        Scope.Attach(); +        PrevContext = TlsAllocState->CurrentContext; +        TlsAllocState->CurrentContext = Ctx; +    } + +    void Detach() { +        Scope.Detach(); +        TlsAllocState->CurrentContext = PrevContext; +    } + +    void Cleanup() { +        if (Ctx) { +            PgDestroyContext(ContextType, Ctx); +            Ctx = nullptr; +            Scope.Cleanup(); +        } +    } + +    ~TState() { +        Cleanup(); +    } + +    const std::string_view ContextType; +    void* Ctx = nullptr; +    TPAllocScope Scope; +    void* PrevContext = nullptr; +}; + +class TWithContextFlowWrapper : public TStatefulFlowComputationNode<TWithContextFlowWrapper> { +    typedef TStatefulFlowComputationNode<TWithContextFlowWrapper> TBaseComputation; +public: +    TWithContextFlowWrapper(TComputationMutables& mutables, const std::string_view& contextType, +        EValueRepresentation kind, IComputationNode* flow) +        : TBaseComputation(mutables, flow, kind, EValueRepresentation::Any) +        , Flow(flow) +        , ContextType(contextType) +    {} + +    NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx) const { +        if (!stateValue.HasValue()) { +            stateValue = ctx.HolderFactory.Create<TState>(ContextType); +        } + +        TState& state = *static_cast<TState*>(stateValue.AsBoxed().Get()); +        state.Attach(); +        Y_DEFER { +            state.Detach(); +        }; + +        auto item = Flow->GetValue(ctx); +        if (item.IsFinish()) { +            state.Cleanup(); +        } + +        return item; +    } + +private: +    void RegisterDependencies() const final { +        this->FlowDependsOn(Flow); +    } + +    IComputationNode* const Flow; +    const std::string_view ContextType; +}; + + +} + +IComputationNode* WrapWithContext(TCallable& callable, const TComputationNodeFactoryContext& ctx) { +    const auto contextTypeData = AS_VALUE(TDataLiteral, callable.GetInput(0)); +    auto contextType = contextTypeData->AsValue().AsStringRef(); +    auto arg = LocateNode(ctx.NodeLocator, callable, 1); +    if (callable.GetInput(1).GetStaticType()->IsFlow()) { +        const auto type = callable.GetType()->GetReturnType(); +        return new TWithContextFlowWrapper(ctx.Mutables, contextType, GetValueRepresentation(type), arg); +    } else { +        MKQL_ENSURE(!callable.GetInput(1).GetStaticType()->IsStream(), "Stream is not expected here"); +        return new TWithContextWrapper(ctx.Mutables, contextType, arg); +    } +} + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h new file mode 100644 index 00000000000..0d2ab1523f2 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h @@ -0,0 +1,10 @@ +#pragma once +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> + +namespace NKikimr { +namespace NMiniKQL { + +IComputationNode* WrapWithContext(TCallable& callable, const TComputationNodeFactoryContext& ctx); + +} +} diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pg.h b/ydb/library/yql/minikql/computation/mkql_computation_pg.h new file mode 100644 index 00000000000..fbb4ae7302a --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_computation_pg.h @@ -0,0 +1,11 @@ +#pragma once +#include <string_view> + +namespace NKikimr { +namespace NMiniKQL { + +void* PgInitializeContext(const std::string_view& contextType); +void PgDestroyContext(const std::string_view& contextType, void* ctx); + +} // namespace MiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h index 791d9132add..733f6502dea 100644 --- a/ydb/library/yql/minikql/mkql_alloc.h +++ b/ydb/library/yql/minikql/mkql_alloc.h @@ -57,6 +57,7 @@ struct TAllocState : public TAlignedPagePool      TListEntry OffloadedBlocksRoot;      TListEntry GlobalPAllocList;      TListEntry* CurrentPAllocList; +    void* CurrentContext = nullptr;      ::NKikimr::NUdf::TBoxedValueLink Root; @@ -81,7 +82,7 @@ public:      }      ~TPAllocScope() { -        TAllocState::CleanupPAllocList(&PAllocList); +        Cleanup();          Detach();      } @@ -100,6 +101,10 @@ public:          }      } +    void Cleanup() { +        TAllocState::CleanupPAllocList(&PAllocList); +    } +  private:      TAllocState::TListEntry PAllocList;      TAllocState::TListEntry* Prev = nullptr; diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index e1bc8ed11d1..d41c29302fd 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5030,13 +5030,15 @@ TRuntimeNode TProgramBuilder::PgConst(TPgType* pgType, const std::string_view& v      return TRuntimeNode(callableBuilder.Build(), false);  } -TRuntimeNode TProgramBuilder::PgResolvedCall(const std::string_view& name, ui32 id, const TArrayRef<const TRuntimeNode>& args, +TRuntimeNode TProgramBuilder::PgResolvedCall(bool useContext, const std::string_view& name, +    ui32 id, const TArrayRef<const TRuntimeNode>& args,      TType* returnType) {      if constexpr (RuntimeVersion < 30U) {          THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;      }      TCallableBuilder callableBuilder(Env, __func__, returnType); +    callableBuilder.Add(NewDataLiteral(useContext));      callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(name));      callableBuilder.Add(NewDataLiteral(id));      for (const auto arg : args) { @@ -5076,6 +5078,17 @@ TRuntimeNode TProgramBuilder::ToPg(TRuntimeNode input, TType* returnType) {      return TRuntimeNode(callableBuilder.Build(), false);  } +TRuntimeNode TProgramBuilder::WithContext(const std::string_view& contextType, TRuntimeNode input) { +    if constexpr (RuntimeVersion < 30U) { +        THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; +    } + +    TCallableBuilder callableBuilder(Env, __func__, input.GetStaticType()); +    callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(contextType)); +    callableBuilder.Add(input); +    return TRuntimeNode(callableBuilder.Build(), false); +} +  bool CanExportType(TType* type, const TTypeEnvironment& env) {      if (type->GetKind() == TType::EKind::Type) {          return false; // Type of Type diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 9f952367019..eeba8213321 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -624,10 +624,12 @@ public:      typedef TRuntimeNode (TProgramBuilder::*NarrowFunctionMethod)(TRuntimeNode, const TNarrowLambda&);      TRuntimeNode PgConst(TPgType* pgType, const std::string_view& value); -    TRuntimeNode PgResolvedCall(const std::string_view& name, ui32 id, const TArrayRef<const TRuntimeNode>& args, TType* returnType); +    TRuntimeNode PgResolvedCall(bool useContext, const std::string_view& name, ui32 id, +        const TArrayRef<const TRuntimeNode>& args, TType* returnType);      TRuntimeNode PgCast(TRuntimeNode input, TType* returnType);      TRuntimeNode FromPg(TRuntimeNode input, TType* returnType);      TRuntimeNode ToPg(TRuntimeNode input, TType* returnType); +    TRuntimeNode WithContext(const std::string_view& contextType, TRuntimeNode input);  protected:      TRuntimeNode Invoke(const std::string_view& funcName, TType* resultType, const TArrayRef<const TRuntimeNode>& args); diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index e1810a03eca..f0f7c2acad4 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -13,6 +13,8 @@  #define TypeName PG_TypeName  #define SortBy PG_SortBy +#define Sort PG_Sort +#define Unique PG_Unique  #undef SIZEOF_SIZE_T  extern "C" {  #include "postgres.h" @@ -21,6 +23,7 @@ extern "C" {  #include "utils/fmgrprotos.h"  #include "utils/builtins.h"  #include "utils/memutils.h" +#include "nodes/execnodes.h"  #include "lib/stringinfo.h"  #include "thread_inits.h"  #undef Abs @@ -28,6 +31,8 @@ extern "C" {  #undef Max  #undef TypeName  #undef SortBy +#undef Sort +#undef Unique  #undef LOG  #undef INFO  #undef NOTICE @@ -290,10 +295,11 @@ private:  class TPgResolvedCall : public TMutableComputationNode<TPgResolvedCall> {      typedef TMutableComputationNode<TPgResolvedCall> TBaseComputation;  public: -    TPgResolvedCall(TComputationMutables& mutables, const std::string_view& name, ui32 id, +    TPgResolvedCall(TComputationMutables& mutables, bool useContext, const std::string_view& name, ui32 id,          TComputationNodePtrVector&& argNodes, TVector<TType*>&& argTypes)          : TBaseComputation(mutables)          , StateIndex(mutables.CurValueIndex++) +        , UseContext(useContext)          , Name(name)          , Id(id)          , ArgNodes(std::move(argNodes)) @@ -335,6 +341,10 @@ public:          auto& state = GetState(compCtx);          auto& callInfo = state.CallInfo.Ref(); +        if (UseContext) { +            callInfo.context = (Node*)TlsAllocState->CurrentContext; +        } +          callInfo.isnull = false;          for (ui32 i = 0; i < ArgNodes.size(); ++i) {              auto value = ArgNodes[i]->GetValue(compCtx); @@ -355,7 +365,11 @@ public:          }          inputArgs.Detach(); -        TPAllocScope call; +        TMaybe<TPAllocScope> call; +        if (!callInfo.context) { +            call.ConstructInPlace(); +        } +          PG_TRY();          {              auto ret = FInfo.fn_addr(&callInfo); @@ -404,6 +418,7 @@ private:      }      const ui32 StateIndex; +    const bool UseContext;      const std::string_view Name;      const ui32 Id;      FmgrInfo FInfo; @@ -751,18 +766,20 @@ TComputationNodeFactory GetPgFactory() {              }              if (name == "PgResolvedCall") { -                const auto nameData = AS_VALUE(TDataLiteral, callable.GetInput(0)); -                const auto idData = AS_VALUE(TDataLiteral, callable.GetInput(1)); +                const auto useContextData = AS_VALUE(TDataLiteral, callable.GetInput(0)); +                const auto nameData = AS_VALUE(TDataLiteral, callable.GetInput(1)); +                const auto idData = AS_VALUE(TDataLiteral, callable.GetInput(2)); +                auto useContext = useContextData->AsValue().Get<bool>();                  auto name = nameData->AsValue().AsStringRef(); -                ui32 id = idData->AsValue().Get<ui32>(); +                auto id = idData->AsValue().Get<ui32>();                  TComputationNodePtrVector argNodes;                  TVector<TType*> argTypes; -                for (ui32 i = 2; i < callable.GetInputsCount(); ++i) { +                for (ui32 i = 3; i < callable.GetInputsCount(); ++i) {                      argNodes.emplace_back(LocateNode(ctx.NodeLocator, callable, i));                      argTypes.emplace_back(callable.GetInput(i).GetStaticType());                  } -                return new TPgResolvedCall(ctx.Mutables, name, id, std::move(argNodes), std::move(argTypes)); +                return new TPgResolvedCall(ctx.Mutables, useContext, name, id, std::move(argNodes), std::move(argTypes));              }              if (name == "PgCast") { @@ -1570,6 +1587,37 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {      }  } +void* PgInitializeContext(const std::string_view& contextType) { +    if (contextType == "Agg") { +        auto ctx = (AggState*)MKQLAllocWithSize(sizeof(AggState)); +        Zero(*ctx); +        *(NodeTag*)ctx = T_AggState; +        ctx->curaggcontext = (ExprContext*)MKQLAllocWithSize(sizeof(ExprContext)); +        Zero(*ctx->curaggcontext); +        ctx->curaggcontext->ecxt_per_tuple_memory = TMkqlPgAdapter::Instance(); +        return ctx; +    } else if (contextType == "WinAgg") { +        auto ctx = (WindowAggState*)MKQLAllocWithSize(sizeof(WindowAggState)); +        Zero(*ctx); +        *(NodeTag*)ctx = T_WindowAggState; +        ctx->curaggcontext = TMkqlPgAdapter::Instance(); +        return ctx; +    } else { +        ythrow yexception() << "Unsupported context type: " << contextType; +    } +} + +void PgDestroyContext(const std::string_view& contextType, void* ctx) { +    if (contextType == "Agg") { +        MKQLFreeWithSize(((AggState*)ctx)->curaggcontext, sizeof(ExprContext)); +        MKQLFreeWithSize(ctx, sizeof(AggState)); +    } else if (contextType == "WinAgg") { +        MKQLFreeWithSize(ctx, sizeof(WindowAggState)); +    } else { +        Y_FAIL("Unsupported context type"); +    } +} +  } // namespace NMiniKQL  } // namespace NKikimr diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 01cedd1cdb1..285c7ed2321 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -2242,7 +2242,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {          return ctx.ProgramBuilder.PgConst(type, node.Head().Content());      }); -    AddCallable("PgResolvedCall", [](const TExprNode& node, TMkqlBuildContext& ctx) { +    AddCallable({"PgResolvedCall","PgResolvedCallCtx" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {          auto name = node.Head().Content();          auto id = FromString<ui32>(node.Child(1)->Content());          std::vector<TRuntimeNode> args; @@ -2252,7 +2252,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {          }          auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); -        return ctx.ProgramBuilder.PgResolvedCall(name, id, args, returnType); +        return ctx.ProgramBuilder.PgResolvedCall(node.IsCallable("PgResolvedCallCtx"), name, id, args, returnType);      });      AddCallable("PgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) { @@ -2266,7 +2266,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {          }          auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); -        return ctx.ProgramBuilder.PgResolvedCall(procName, procId, args, returnType); +        return ctx.ProgramBuilder.PgResolvedCall(false, procName, procId, args, returnType);      });      AddCallable("PgCast", [](const TExprNode& node, TMkqlBuildContext& ctx) { @@ -2287,6 +2287,11 @@ TMkqlCommonCallableCompiler::TShared::TShared() {          return ctx.ProgramBuilder.ToPg(input, returnType);      }); +    AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) { +        auto input = MkqlBuildExpr(*node.Child(1), ctx); +        return ctx.ProgramBuilder.WithContext(node.Child(0)->Content(), input); +    }); +      AddCallable("QueueCreate", [](const TExprNode& node, TMkqlBuildContext& ctx) {          const auto initCapacity = MkqlBuildExpr(*node.Child(1), ctx);          const auto initSize = MkqlBuildExpr(*node.Child(2), ctx); diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 5fa30e60212..ae594de09dc 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -87,6 +87,16 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {     throw yexception() << "PG types are not supported";  } +void* PgInitializeContext(const std::string_view& contextType) { +   Y_UNUSED(contextType); +   return nullptr; +} + +void PgDestroyContext(const std::string_view& contextType, void* ctx) { +   Y_UNUSED(contextType); +   Y_UNUSED(ctx); +} +  } // namespace NMiniKQL  } // namespace NKikimr | 
