aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.ru>2022-03-22 02:29:42 +0300
committervvvv <vvvv@yandex-team.ru>2022-03-22 02:29:42 +0300
commit0bc06f620d7059dff18ef16863d5df80985c9212 (patch)
tree2a8a1ec113e0c7ad5c9c0c42f67cf875bad70870
parentd9764c900cb7a7ffd5167dd33d7068b483765a2f (diff)
downloadydb-0bc06f620d7059dff18ef16863d5df80985c9212.tar.gz
YQL-14534 introduced WithContext (scalar & flow), integration of PgResolved call with current context
ref:b184dc6228fedaa26e9aff60154217626e0c5407
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp7
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp48
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp139
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h10
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pg.h11
-rw-r--r--ydb/library/yql/minikql/mkql_alloc.h7
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp15
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h4
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp62
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp11
-rw-r--r--ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp10
13 files changed, 298 insertions, 29 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index e8befae258..2758a199a2 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -2224,10 +2224,6 @@ TExprNode::TPtr ExpandListHas(const TExprNode::TPtr& input, TExprContext& ctx) {
return RewriteSearchByKeyForTypesMismatch<true, true>(input, ctx);
}
-TExprNode::TPtr ExpandCurrentStream(const TExprNode::TPtr& input, TExprContext& ctx) {
- return ctx.NewCallable(input->Pos(), "Void", {});
-}
-
template <bool Flat, bool List>
TExprNode::TPtr ExpandContainerIf(const TExprNode::TPtr& input, TExprContext& ctx) {
YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << input->Content();
@@ -5774,8 +5770,7 @@ struct TPeepHoleRules {
{"RangeEmpty", &ExpandRangeEmpty},
{"AsRange", &ExpandAsRange},
{"RangeFor", &ExpandRangeFor},
- {"ToFlow", &DropToFlowDeps},
- {"CurrentStream", &ExpandCurrentStream}
+ {"ToFlow", &DropToFlowDeps}
};
static constexpr std::initializer_list<TPeepHoleOptimizerMap::value_type> SimplifyStageRulesInit = {
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 9a55fb46fa..83566cfcf6 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -3195,13 +3195,40 @@ namespace NTypeAnnImpl {
return IGraphTransformer::TStatus::Ok;
}
- IGraphTransformer::TStatus CurrentStreamWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 0, ctx.Expr)) {
+ IGraphTransformer::TStatus WithContextWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- input->SetTypeAnn(ctx.Expr.MakeType<TVoidExprType>());
+ if (!EnsureAtom(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (input->Head().Content() != "Agg" && input->Head().Content() != "WinAgg") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), TStringBuilder() <<
+ "Unexpected context type: " << input->Head().Content()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureComputable(input->Tail(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (input->Tail().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream) {
+ output = ctx.Expr.Builder(input->Pos())
+ .Callable("FromFlow")
+ .Callable("WithContext")
+ .Add(0, input->HeadPtr())
+ .Callable(1, "ToFlow")
+ .Add(0, input->TailPtr())
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ input->SetTypeAnn(input->Tail().GetTypeAnn());
return IGraphTransformer::TStatus::Ok;
}
@@ -8800,7 +8827,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
};
IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
- bool isResolved = input->Content() == "PgResolvedCall";
+ bool isResolved = input->Content().StartsWith("PgResolvedCall");
if (!EnsureMinArgsCount(*input, isResolved ? 2 : 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -9731,7 +9758,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
finishLambda = ctx.Expr.Builder(input->Pos())
.Lambda()
.Param("state")
- .Callable("PgResolvedCall")
+ .Callable("PgResolvedCallCtx")
.Atom(0, NPg::LookupProc(aggDesc.FinalFuncId).Name)
.Atom(1, ToString(aggDesc.FinalFuncId))
.Arg(2, "state")
@@ -9771,7 +9798,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
initLambda = ctx.Expr.Builder(input->Pos())
.Lambda()
.Param("row")
- .Callable("PgResolvedCall")
+ .Callable("PgResolvedCallCtx")
.Atom(0, transFuncDesc.Name)
.Atom(1, ToString(aggDesc.TransFuncId))
.Add(2, initValue)
@@ -9787,7 +9814,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
.Param("row")
.Param("state")
.Callable("Coalesce")
- .Callable(0, "PgResolvedCall")
+ .Callable(0, "PgResolvedCallCtx")
.Atom(0, transFuncDesc.Name)
.Atom(1, ToString(aggDesc.TransFuncId))
.Callable(2, "Coalesce")
@@ -9821,7 +9848,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
.Arg(0, "state")
.Seal()
.Callable(1, "Coalesce")
- .Callable(0, "PgResolvedCall")
+ .Callable(0, "PgResolvedCallCtx")
.Atom(0, transFuncDesc.Name)
.Atom(1, ToString(aggDesc.TransFuncId))
.Arg(2, "state")
@@ -13677,7 +13704,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["FromFlow"] = &FromFlowWrapper;
Functions["BuildTablePath"] = &BuildTablePathWrapper;
Functions["WithOptionalArgs"] = &WithOptionalArgsWrapper;
- Functions["CurrentStream"] = &CurrentStreamWrapper;
+ Functions["WithContext"] = &WithContextWrapper;
Functions["DecimalDiv"] = &DecimalBinaryWrapper;
Functions["DecimalMod"] = &DecimalBinaryWrapper;
@@ -13745,6 +13772,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
ExtFunctions["PgAggWindowCall"] = &PgAggWrapper;
ExtFunctions["PgCall"] = &PgCallWrapper;
ExtFunctions["PgResolvedCall"] = &PgCallWrapper;
+ ExtFunctions["PgResolvedCallCtx"] = &PgCallWrapper;
ExtFunctions["PgOp"] = &PgOpWrapper;
ExtFunctions["PgResolvedOp"] = &PgOpWrapper;
ExtFunctions["PgSelect"] = &PgSelectWrapper;
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt
index 63a0842e2d..2094ec7b18 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.txt
@@ -128,5 +128,6 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_zip.cpp
)
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index ae2b84405b..7c1696c290 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -96,6 +96,7 @@
#include "mkql_wide_condense.h"
#include "mkql_wide_filter.h"
#include "mkql_wide_map.h"
+#include "mkql_withcontext.h"
#include "mkql_zip.h"
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h>
@@ -184,6 +185,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"Not", &WrapNot},
{"Zip", &WrapZip<false>},
{"ZipAll", &WrapZip<true>},
+ {"WithContext", &WrapWithContext},
{"Reduce", &WrapReduce},
{"Length", &WrapLength},
{"Iterable", &WrapIterable},
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp
new file mode 100644
index 0000000000..c9a5e86a96
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.cpp
@@ -0,0 +1,139 @@
+#include "mkql_withcontext.h"
+
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_pg.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+
+#include <util/generic/scope.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+class TWithContextWrapper : public TMutableComputationNode<TWithContextWrapper> {
+ typedef TMutableComputationNode<TWithContextWrapper> TBaseComputation;
+public:
+ TWithContextWrapper(TComputationMutables& mutables, const std::string_view& contextType, IComputationNode* arg)
+ : TBaseComputation(mutables)
+ , ContextType(contextType)
+ , Arg(arg)
+ {}
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const {
+ auto prev = TlsAllocState->CurrentContext;
+ TlsAllocState->CurrentContext = PgInitializeContext(ContextType);
+ Y_DEFER {
+ PgDestroyContext(ContextType, TlsAllocState->CurrentContext);
+ TlsAllocState->CurrentContext = prev;
+ };
+
+ TPAllocScope scope;
+ return Arg->GetValue(compCtx).Release();
+ }
+
+private:
+ void RegisterDependencies() const final {
+ this->DependsOn(Arg);
+ }
+
+ IComputationNode* const Arg;
+ const std::string_view ContextType;
+};
+
+struct TState : public TComputationValue<TState> {
+
+ TState(TMemoryUsageInfo* memInfo, const std::string_view& contextType)
+ : TComputationValue(memInfo)
+ , ContextType(contextType)
+ {
+ Ctx = PgInitializeContext(ContextType);
+ Scope.Detach();
+ }
+
+ void Attach() {
+ Scope.Attach();
+ PrevContext = TlsAllocState->CurrentContext;
+ TlsAllocState->CurrentContext = Ctx;
+ }
+
+ void Detach() {
+ Scope.Detach();
+ TlsAllocState->CurrentContext = PrevContext;
+ }
+
+ void Cleanup() {
+ if (Ctx) {
+ PgDestroyContext(ContextType, Ctx);
+ Ctx = nullptr;
+ Scope.Cleanup();
+ }
+ }
+
+ ~TState() {
+ Cleanup();
+ }
+
+ const std::string_view ContextType;
+ void* Ctx = nullptr;
+ TPAllocScope Scope;
+ void* PrevContext = nullptr;
+};
+
+class TWithContextFlowWrapper : public TStatefulFlowComputationNode<TWithContextFlowWrapper> {
+ typedef TStatefulFlowComputationNode<TWithContextFlowWrapper> TBaseComputation;
+public:
+ TWithContextFlowWrapper(TComputationMutables& mutables, const std::string_view& contextType,
+ EValueRepresentation kind, IComputationNode* flow)
+ : TBaseComputation(mutables, flow, kind, EValueRepresentation::Any)
+ , Flow(flow)
+ , ContextType(contextType)
+ {}
+
+ NUdf::TUnboxedValue DoCalculate(NUdf::TUnboxedValue& stateValue, TComputationContext& ctx) const {
+ if (!stateValue.HasValue()) {
+ stateValue = ctx.HolderFactory.Create<TState>(ContextType);
+ }
+
+ TState& state = *static_cast<TState*>(stateValue.AsBoxed().Get());
+ state.Attach();
+ Y_DEFER {
+ state.Detach();
+ };
+
+ auto item = Flow->GetValue(ctx);
+ if (item.IsFinish()) {
+ state.Cleanup();
+ }
+
+ return item;
+ }
+
+private:
+ void RegisterDependencies() const final {
+ this->FlowDependsOn(Flow);
+ }
+
+ IComputationNode* const Flow;
+ const std::string_view ContextType;
+};
+
+
+}
+
+IComputationNode* WrapWithContext(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ const auto contextTypeData = AS_VALUE(TDataLiteral, callable.GetInput(0));
+ auto contextType = contextTypeData->AsValue().AsStringRef();
+ auto arg = LocateNode(ctx.NodeLocator, callable, 1);
+ if (callable.GetInput(1).GetStaticType()->IsFlow()) {
+ const auto type = callable.GetType()->GetReturnType();
+ return new TWithContextFlowWrapper(ctx.Mutables, contextType, GetValueRepresentation(type), arg);
+ } else {
+ MKQL_ENSURE(!callable.GetInput(1).GetStaticType()->IsStream(), "Stream is not expected here");
+ return new TWithContextWrapper(ctx.Mutables, contextType, arg);
+ }
+}
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h
new file mode 100644
index 0000000000..0d2ab1523f
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_withcontext.h
@@ -0,0 +1,10 @@
+#pragma once
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+IComputationNode* WrapWithContext(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+
+}
+}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pg.h b/ydb/library/yql/minikql/computation/mkql_computation_pg.h
new file mode 100644
index 0000000000..fbb4ae7302
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pg.h
@@ -0,0 +1,11 @@
+#pragma once
+#include <string_view>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+void* PgInitializeContext(const std::string_view& contextType);
+void PgDestroyContext(const std::string_view& contextType, void* ctx);
+
+} // namespace MiniKQL
+} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h
index 791d9132ad..733f6502de 100644
--- a/ydb/library/yql/minikql/mkql_alloc.h
+++ b/ydb/library/yql/minikql/mkql_alloc.h
@@ -57,6 +57,7 @@ struct TAllocState : public TAlignedPagePool
TListEntry OffloadedBlocksRoot;
TListEntry GlobalPAllocList;
TListEntry* CurrentPAllocList;
+ void* CurrentContext = nullptr;
::NKikimr::NUdf::TBoxedValueLink Root;
@@ -81,7 +82,7 @@ public:
}
~TPAllocScope() {
- TAllocState::CleanupPAllocList(&PAllocList);
+ Cleanup();
Detach();
}
@@ -100,6 +101,10 @@ public:
}
}
+ void Cleanup() {
+ TAllocState::CleanupPAllocList(&PAllocList);
+ }
+
private:
TAllocState::TListEntry PAllocList;
TAllocState::TListEntry* Prev = nullptr;
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index e1bc8ed11d..d41c29302f 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -5030,13 +5030,15 @@ TRuntimeNode TProgramBuilder::PgConst(TPgType* pgType, const std::string_view& v
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::PgResolvedCall(const std::string_view& name, ui32 id, const TArrayRef<const TRuntimeNode>& args,
+TRuntimeNode TProgramBuilder::PgResolvedCall(bool useContext, const std::string_view& name,
+ ui32 id, const TArrayRef<const TRuntimeNode>& args,
TType* returnType) {
if constexpr (RuntimeVersion < 30U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(NewDataLiteral(useContext));
callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(name));
callableBuilder.Add(NewDataLiteral(id));
for (const auto arg : args) {
@@ -5076,6 +5078,17 @@ TRuntimeNode TProgramBuilder::ToPg(TRuntimeNode input, TType* returnType) {
return TRuntimeNode(callableBuilder.Build(), false);
}
+TRuntimeNode TProgramBuilder::WithContext(const std::string_view& contextType, TRuntimeNode input) {
+ if constexpr (RuntimeVersion < 30U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ TCallableBuilder callableBuilder(Env, __func__, input.GetStaticType());
+ callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(contextType));
+ callableBuilder.Add(input);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
bool CanExportType(TType* type, const TTypeEnvironment& env) {
if (type->GetKind() == TType::EKind::Type) {
return false; // Type of Type
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 9f95236701..eeba821332 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -624,10 +624,12 @@ public:
typedef TRuntimeNode (TProgramBuilder::*NarrowFunctionMethod)(TRuntimeNode, const TNarrowLambda&);
TRuntimeNode PgConst(TPgType* pgType, const std::string_view& value);
- TRuntimeNode PgResolvedCall(const std::string_view& name, ui32 id, const TArrayRef<const TRuntimeNode>& args, TType* returnType);
+ TRuntimeNode PgResolvedCall(bool useContext, const std::string_view& name, ui32 id,
+ const TArrayRef<const TRuntimeNode>& args, TType* returnType);
TRuntimeNode PgCast(TRuntimeNode input, TType* returnType);
TRuntimeNode FromPg(TRuntimeNode input, TType* returnType);
TRuntimeNode ToPg(TRuntimeNode input, TType* returnType);
+ TRuntimeNode WithContext(const std::string_view& contextType, TRuntimeNode input);
protected:
TRuntimeNode Invoke(const std::string_view& funcName, TType* resultType, const TArrayRef<const TRuntimeNode>& args);
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index e1810a03ec..f0f7c2acad 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -13,6 +13,8 @@
#define TypeName PG_TypeName
#define SortBy PG_SortBy
+#define Sort PG_Sort
+#define Unique PG_Unique
#undef SIZEOF_SIZE_T
extern "C" {
#include "postgres.h"
@@ -21,6 +23,7 @@ extern "C" {
#include "utils/fmgrprotos.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
+#include "nodes/execnodes.h"
#include "lib/stringinfo.h"
#include "thread_inits.h"
#undef Abs
@@ -28,6 +31,8 @@ extern "C" {
#undef Max
#undef TypeName
#undef SortBy
+#undef Sort
+#undef Unique
#undef LOG
#undef INFO
#undef NOTICE
@@ -290,10 +295,11 @@ private:
class TPgResolvedCall : public TMutableComputationNode<TPgResolvedCall> {
typedef TMutableComputationNode<TPgResolvedCall> TBaseComputation;
public:
- TPgResolvedCall(TComputationMutables& mutables, const std::string_view& name, ui32 id,
+ TPgResolvedCall(TComputationMutables& mutables, bool useContext, const std::string_view& name, ui32 id,
TComputationNodePtrVector&& argNodes, TVector<TType*>&& argTypes)
: TBaseComputation(mutables)
, StateIndex(mutables.CurValueIndex++)
+ , UseContext(useContext)
, Name(name)
, Id(id)
, ArgNodes(std::move(argNodes))
@@ -335,6 +341,10 @@ public:
auto& state = GetState(compCtx);
auto& callInfo = state.CallInfo.Ref();
+ if (UseContext) {
+ callInfo.context = (Node*)TlsAllocState->CurrentContext;
+ }
+
callInfo.isnull = false;
for (ui32 i = 0; i < ArgNodes.size(); ++i) {
auto value = ArgNodes[i]->GetValue(compCtx);
@@ -355,7 +365,11 @@ public:
}
inputArgs.Detach();
- TPAllocScope call;
+ TMaybe<TPAllocScope> call;
+ if (!callInfo.context) {
+ call.ConstructInPlace();
+ }
+
PG_TRY();
{
auto ret = FInfo.fn_addr(&callInfo);
@@ -404,6 +418,7 @@ private:
}
const ui32 StateIndex;
+ const bool UseContext;
const std::string_view Name;
const ui32 Id;
FmgrInfo FInfo;
@@ -751,18 +766,20 @@ TComputationNodeFactory GetPgFactory() {
}
if (name == "PgResolvedCall") {
- const auto nameData = AS_VALUE(TDataLiteral, callable.GetInput(0));
- const auto idData = AS_VALUE(TDataLiteral, callable.GetInput(1));
+ const auto useContextData = AS_VALUE(TDataLiteral, callable.GetInput(0));
+ const auto nameData = AS_VALUE(TDataLiteral, callable.GetInput(1));
+ const auto idData = AS_VALUE(TDataLiteral, callable.GetInput(2));
+ auto useContext = useContextData->AsValue().Get<bool>();
auto name = nameData->AsValue().AsStringRef();
- ui32 id = idData->AsValue().Get<ui32>();
+ auto id = idData->AsValue().Get<ui32>();
TComputationNodePtrVector argNodes;
TVector<TType*> argTypes;
- for (ui32 i = 2; i < callable.GetInputsCount(); ++i) {
+ for (ui32 i = 3; i < callable.GetInputsCount(); ++i) {
argNodes.emplace_back(LocateNode(ctx.NodeLocator, callable, i));
argTypes.emplace_back(callable.GetInput(i).GetStaticType());
}
- return new TPgResolvedCall(ctx.Mutables, name, id, std::move(argNodes), std::move(argTypes));
+ return new TPgResolvedCall(ctx.Mutables, useContext, name, id, std::move(argNodes), std::move(argTypes));
}
if (name == "PgCast") {
@@ -1570,6 +1587,37 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
}
}
+void* PgInitializeContext(const std::string_view& contextType) {
+ if (contextType == "Agg") {
+ auto ctx = (AggState*)MKQLAllocWithSize(sizeof(AggState));
+ Zero(*ctx);
+ *(NodeTag*)ctx = T_AggState;
+ ctx->curaggcontext = (ExprContext*)MKQLAllocWithSize(sizeof(ExprContext));
+ Zero(*ctx->curaggcontext);
+ ctx->curaggcontext->ecxt_per_tuple_memory = TMkqlPgAdapter::Instance();
+ return ctx;
+ } else if (contextType == "WinAgg") {
+ auto ctx = (WindowAggState*)MKQLAllocWithSize(sizeof(WindowAggState));
+ Zero(*ctx);
+ *(NodeTag*)ctx = T_WindowAggState;
+ ctx->curaggcontext = TMkqlPgAdapter::Instance();
+ return ctx;
+ } else {
+ ythrow yexception() << "Unsupported context type: " << contextType;
+ }
+}
+
+void PgDestroyContext(const std::string_view& contextType, void* ctx) {
+ if (contextType == "Agg") {
+ MKQLFreeWithSize(((AggState*)ctx)->curaggcontext, sizeof(ExprContext));
+ MKQLFreeWithSize(ctx, sizeof(AggState));
+ } else if (contextType == "WinAgg") {
+ MKQLFreeWithSize(ctx, sizeof(WindowAggState));
+ } else {
+ Y_FAIL("Unsupported context type");
+ }
+}
+
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 01cedd1cdb..285c7ed232 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -2242,7 +2242,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.PgConst(type, node.Head().Content());
});
- AddCallable("PgResolvedCall", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ AddCallable({"PgResolvedCall","PgResolvedCallCtx" }, [](const TExprNode& node, TMkqlBuildContext& ctx) {
auto name = node.Head().Content();
auto id = FromString<ui32>(node.Child(1)->Content());
std::vector<TRuntimeNode> args;
@@ -2252,7 +2252,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
}
auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
- return ctx.ProgramBuilder.PgResolvedCall(name, id, args, returnType);
+ return ctx.ProgramBuilder.PgResolvedCall(node.IsCallable("PgResolvedCallCtx"), name, id, args, returnType);
});
AddCallable("PgResolvedOp", [](const TExprNode& node, TMkqlBuildContext& ctx) {
@@ -2266,7 +2266,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
}
auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
- return ctx.ProgramBuilder.PgResolvedCall(procName, procId, args, returnType);
+ return ctx.ProgramBuilder.PgResolvedCall(false, procName, procId, args, returnType);
});
AddCallable("PgCast", [](const TExprNode& node, TMkqlBuildContext& ctx) {
@@ -2287,6 +2287,11 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.ToPg(input, returnType);
});
+ AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ auto input = MkqlBuildExpr(*node.Child(1), ctx);
+ return ctx.ProgramBuilder.WithContext(node.Child(0)->Content(), input);
+ });
+
AddCallable("QueueCreate", [](const TExprNode& node, TMkqlBuildContext& ctx) {
const auto initCapacity = MkqlBuildExpr(*node.Child(1), ctx);
const auto initSize = MkqlBuildExpr(*node.Child(2), ctx);
diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
index 5fa30e6021..ae594de09d 100644
--- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
@@ -87,6 +87,16 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
throw yexception() << "PG types are not supported";
}
+void* PgInitializeContext(const std::string_view& contextType) {
+ Y_UNUSED(contextType);
+ return nullptr;
+}
+
+void PgDestroyContext(const std::string_view& contextType, void* ctx) {
+ Y_UNUSED(contextType);
+ Y_UNUSED(ctx);
+}
+
} // namespace NMiniKQL
} // namespace NKikimr