diff options
author | vvvv <vvvv@yandex-team.ru> | 2022-03-24 19:04:55 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.ru> | 2022-03-24 19:04:55 +0300 |
commit | a6a11e07f1f3d6a0151440a67319641ece7e068a (patch) | |
tree | adcf853cbdd91b978fda7e5d0c38187a59b2600f | |
parent | 5b38bc8a727d799624f5da8f3065d11981508402 (diff) | |
download | ydb-a6a11e07f1f3d6a0151440a67319641ece7e068a.tar.gz |
YQL-13710 cleanup current context after switch in Condense/Condense1/WideCondense
ref:f272d0444db432cbea099c3d28ea5905511dff3e
11 files changed, 169 insertions, 62 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_combine.cpp index 61400db1974..ce646cf6b11 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_combine.cpp @@ -93,11 +93,7 @@ public: return false; } - auto& allocState = *TlsAllocState; - if (allocState.CurrentContext) { - TAllocState::CleanupPAllocList(allocState.CurrentPAllocList); - } - + CleanupCurrentContext(); return true; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_condense.cpp index c179abab492..33f5e9f1e10 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_condense.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_condense.cpp @@ -11,9 +11,9 @@ namespace NMiniKQL { namespace { -template <bool Interruptable> -class TCondenseFlowWrapper : public TStatefulFlowCodegeneratorNode<TCondenseFlowWrapper<Interruptable>> { - typedef TStatefulFlowCodegeneratorNode<TCondenseFlowWrapper<Interruptable>> TBaseComputation; +template <bool Interruptable, bool UseCtx> +class TCondenseFlowWrapper : public TStatefulFlowCodegeneratorNode<TCondenseFlowWrapper<Interruptable, UseCtx>> { + typedef TStatefulFlowCodegeneratorNode<TCondenseFlowWrapper<Interruptable, UseCtx>> TBaseComputation; public: TCondenseFlowWrapper( TComputationMutables& mutables, @@ -36,6 +36,11 @@ public: if (state.IsInvalid()) { state = NUdf::TUnboxedValuePod(); State->SetValue(ctx, InitState->GetValue(ctx)); + } else if (UseCtx && state.IsEmbedded()) { + CleanupCurrentContext(); + state = NUdf::TUnboxedValuePod(); + State->SetValue(ctx, InitState->GetValue(ctx)); + State->SetValue(ctx, UpdateState->GetValue(ctx)); } while (true) { @@ -58,8 +63,12 @@ public: if (reset.template Get<bool>()) { auto result = State->GetValue(ctx); - State->SetValue(ctx, InitState->GetValue(ctx)); - State->SetValue(ctx, UpdateState->GetValue(ctx)); + if (UseCtx) { + state = NUdf::TUnboxedValuePod(0); + } else { + State->SetValue(ctx, InitState->GetValue(ctx)); + State->SetValue(ctx, UpdateState->GetValue(ctx)); + } return result.Release(); } } @@ -167,9 +176,9 @@ private: IComputationNode* const UpdateState; }; -template <bool Interruptable> -class TCondenseWrapper : public TCustomValueCodegeneratorNode<TCondenseWrapper<Interruptable>> { - typedef TCustomValueCodegeneratorNode<TCondenseWrapper<Interruptable>> TBaseComputation; +template <bool Interruptable, bool UseCtx> +class TCondenseWrapper : public TCustomValueCodegeneratorNode<TCondenseWrapper<Interruptable, UseCtx>> { + typedef TCustomValueCodegeneratorNode<TCondenseWrapper<Interruptable, UseCtx>> TBaseComputation; public: class TValue : public TComputationValue<TValue> { public: @@ -211,6 +220,11 @@ public: if (ESqueezeState::Idle == State.Stage) { State.Stage = ESqueezeState::Work; State.State->SetValue(Ctx, State.InitState->GetValue(Ctx)); + } else if (UseCtx && ESqueezeState::NeedInit == State.Stage) { + CleanupCurrentContext(); + State.Stage = ESqueezeState::Work; + State.State->SetValue(Ctx, State.InitState->GetValue(Ctx)); + State.State->SetValue(Ctx, State.UpdateState->GetValue(Ctx)); } for (;;) { @@ -231,8 +245,13 @@ public: if (reset.template Get<bool>()) { result = State.State->GetValue(Ctx); - State.State->SetValue(Ctx, State.InitState->GetValue(Ctx)); - State.State->SetValue(Ctx, State.UpdateState->GetValue(Ctx)); + if (UseCtx) { + State.Stage = ESqueezeState::NeedInit; + } else { + State.State->SetValue(Ctx, State.InitState->GetValue(Ctx)); + State.State->SetValue(Ctx, State.UpdateState->GetValue(Ctx)); + } + return NUdf::EFetchStatus::Ok; } } @@ -441,9 +460,8 @@ private: } -IComputationNode* WrapCondense(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 args"); - +template <bool UseCtx> +IComputationNode* WrapCondenseImpl(TCallable& callable, const TComputationNodeFactoryContext& ctx) { const auto stream = LocateNode(ctx.NodeLocator, callable, 0); const auto initState = LocateNode(ctx.NodeLocator, callable, 1); const auto outSwitch = LocateNode(ctx.NodeLocator, callable, 4); @@ -459,19 +477,35 @@ IComputationNode* WrapCondense(TCallable& callable, const TComputationNodeFactor if (type->IsFlow()) { const auto kind = GetValueRepresentation(AS_TYPE(TFlowType, type)->GetItemType()); if (isOptional) { - return new TCondenseFlowWrapper<true>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState); + return new TCondenseFlowWrapper<true, UseCtx>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState); } else { - return new TCondenseFlowWrapper<false>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState); + return new TCondenseFlowWrapper<false, UseCtx>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState); } } else { if (isOptional) { - return new TCondenseWrapper<true>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState); + return new TCondenseWrapper<true, UseCtx>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState); } else { - return new TCondenseWrapper<false>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState); + return new TCondenseWrapper<false, UseCtx>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState); } } } +IComputationNode* WrapCondense(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 6 || callable.GetInputsCount() == 7, "Expected 6 or 7 args"); + + bool useCtx = false; + if (callable.GetInputsCount() >= 7) { + useCtx = AS_VALUE(TDataLiteral, callable.GetInput(6))->AsValue().Get<bool>(); + } + + if (useCtx) { + return WrapCondenseImpl<true>(callable, ctx); + } else { + return WrapCondenseImpl<false>(callable, ctx); + } + +} + IComputationNode* WrapSqueeze(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 9, "Expected 9 args"); @@ -495,7 +529,7 @@ IComputationNode* WrapSqueeze(TCallable& callable, const TComputationNodeFactory } const auto stateType = hasSaveLoad ? callable.GetInput(6).GetStaticType() : nullptr; - return new TCondenseWrapper<false>(ctx.Mutables, stream, item, state, nullptr, initState, updateState, inSave, outSave, inLoad, outLoad, stateType); + return new TCondenseWrapper<false, false>(ctx.Mutables, stream, item, state, nullptr, initState, updateState, inSave, outSave, inLoad, outLoad, stateType); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp index 2f5ac894325..694aa85808d 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp @@ -11,9 +11,9 @@ namespace NMiniKQL { namespace { -template <bool Interruptable> -class TCondense1FlowWrapper : public TStatefulFlowCodegeneratorNode<TCondense1FlowWrapper<Interruptable>> { - typedef TStatefulFlowCodegeneratorNode<TCondense1FlowWrapper<Interruptable>> TBaseComputation; +template <bool Interruptable, bool UseCtx> +class TCondense1FlowWrapper : public TStatefulFlowCodegeneratorNode<TCondense1FlowWrapper<Interruptable, UseCtx>> { + typedef TStatefulFlowCodegeneratorNode<TCondense1FlowWrapper<Interruptable, UseCtx>> TBaseComputation; public: TCondense1FlowWrapper( TComputationMutables& mutables, @@ -33,6 +33,12 @@ public: return static_cast<const NUdf::TUnboxedValuePod&>(state); } + if (UseCtx && state.IsEmbedded()) { + CleanupCurrentContext(); + state = NUdf::TUnboxedValuePod(); + State->SetValue(ctx, InitState->GetValue(ctx)); + } + while (true) { auto item = Flow->GetValue(ctx); if (item.IsYield()) { @@ -57,7 +63,12 @@ public: if (reset.template Get<bool>()) { auto result = State->GetValue(ctx); - State->SetValue(ctx, InitState->GetValue(ctx)); + if (UseCtx) { + state = NUdf::TUnboxedValuePod(0); + } else { + State->SetValue(ctx, InitState->GetValue(ctx)); + } + return result.Release(); } } @@ -180,9 +191,9 @@ private: IComputationNode* const UpdateState; }; -template <bool Interruptable> -class TCondense1Wrapper : public TCustomValueCodegeneratorNode<TCondense1Wrapper<Interruptable>> { - typedef TCustomValueCodegeneratorNode<TCondense1Wrapper<Interruptable>> TBaseComputation; +template <bool Interruptable, bool UseCtx> +class TCondense1Wrapper : public TCustomValueCodegeneratorNode<TCondense1Wrapper<Interruptable, UseCtx>> { + typedef TCustomValueCodegeneratorNode<TCondense1Wrapper<Interruptable, UseCtx>> TBaseComputation; public: class TValue : public TComputationValue<TValue> { public: @@ -221,6 +232,12 @@ public: if (ESqueezeState::Finished == State.Stage) return NUdf::EFetchStatus::Finish; + if (UseCtx && State.Stage == ESqueezeState::NeedInit) { + CleanupCurrentContext(); + State.Stage = ESqueezeState::Work; + State.State->SetValue(Ctx, State.InitState->GetValue(Ctx)); + } + for (;;) { const auto status = Stream.Fetch(State.Item->RefValue(Ctx)); if (status == NUdf::EFetchStatus::Yield) { @@ -243,7 +260,12 @@ public: if (reset.template Get<bool>()) { result = State.State->GetValue(Ctx); - State.State->SetValue(Ctx, State.InitState->GetValue(Ctx)); + if (UseCtx) { + State.Stage = ESqueezeState::NeedInit; + } else { + State.State->SetValue(Ctx, State.InitState->GetValue(Ctx)); + } + return NUdf::EFetchStatus::Ok; } } @@ -472,9 +494,8 @@ private: } -IComputationNode* WrapCondense1(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 args"); - +template <bool UseCtx> +IComputationNode* WrapCondense1Impl(TCallable& callable, const TComputationNodeFactoryContext& ctx) { const auto stream = LocateNode(ctx.NodeLocator, callable, 0); const auto initState = LocateNode(ctx.NodeLocator, callable, 2); const auto outSwitch = LocateNode(ctx.NodeLocator, callable, 4); @@ -490,19 +511,33 @@ IComputationNode* WrapCondense1(TCallable& callable, const TComputationNodeFacto if (type->IsFlow()) { const auto kind = GetValueRepresentation(AS_TYPE(TFlowType, type)->GetItemType()); if (isOptional) { - return new TCondense1FlowWrapper<true>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState); + return new TCondense1FlowWrapper<true, UseCtx>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState); } else { - return new TCondense1FlowWrapper<false>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState); + return new TCondense1FlowWrapper<false, UseCtx>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState); } } else { if (isOptional) { - return new TCondense1Wrapper<true>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState); + return new TCondense1Wrapper<true, UseCtx>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState); } else { - return new TCondense1Wrapper<false>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState); + return new TCondense1Wrapper<false, UseCtx>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState); } } } +IComputationNode* WrapCondense1(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 6 || callable.GetInputsCount() == 7, "Expected 6 or 7 args"); + bool useCtx = false; + if (callable.GetInputsCount() >= 7) { + useCtx = AS_VALUE(TDataLiteral, callable.GetInput(6))->AsValue().Get<bool>(); + } + + if (useCtx) { + return WrapCondense1Impl<true>(callable, ctx); + } else { + return WrapCondense1Impl<false>(callable, ctx); + } +} + IComputationNode* WrapSqueeze1(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 9, "Expected 9 args"); @@ -526,7 +561,7 @@ IComputationNode* WrapSqueeze1(TCallable& callable, const TComputationNodeFactor } const auto stateType = hasSaveLoad ? callable.GetInput(6).GetStaticType() : nullptr; - return new TCondense1Wrapper<false>(ctx.Mutables, stream, item, state, nullptr, initState, updateState, inSave, outSave, inLoad, outLoad, stateType); + return new TCondense1Wrapper<false, false>(ctx.Mutables, stream, item, state, nullptr, initState, updateState, inSave, outSave, inLoad, outLoad, stateType); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_squeeze_state.h b/ydb/library/yql/minikql/comp_nodes/mkql_squeeze_state.h index 6896c9f2a95..42d4d7c6623 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_squeeze_state.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_squeeze_state.h @@ -9,7 +9,8 @@ namespace NMiniKQL { enum class ESqueezeState : ui8 { Idle = 0, Work, - Finished + Finished, + NeedInit }; struct TSqueezeState { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index ded74a611b2..778a594b958 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -176,11 +176,7 @@ public: Tongue = Storage.back().data(); } - auto& allocState = *TlsAllocState; - if (allocState.CurrentContext) { - TAllocState::CleanupPAllocList(allocState.CurrentPAllocList); - } - + CleanupCurrentContext(); return true; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp index 6473ad4b44d..118a9e7c1c5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp @@ -13,9 +13,9 @@ using NYql::EnsureDynamicCast; namespace { -template <bool Interruptable> -class TWideCondense1Wrapper : public TStatefulWideFlowCodegeneratorNode<TWideCondense1Wrapper<Interruptable>> { -using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideCondense1Wrapper<Interruptable>>; +template <bool Interruptable, bool UseCtx> +class TWideCondense1Wrapper : public TStatefulWideFlowCodegeneratorNode<TWideCondense1Wrapper<Interruptable, UseCtx>> { +using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideCondense1Wrapper<Interruptable, UseCtx>>; public: TWideCondense1Wrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& initState, @@ -32,6 +32,10 @@ public: if (state.IsFinish()) { return EFetchResult::Finish; } else if (state.HasValue() && state.Get<bool>()) { + if (UseCtx) { + CleanupCurrentContext(); + } + state = NUdf::TUnboxedValuePod(false); for (ui32 i = 0U; i < State.size(); ++i) State[i]->SetValue(ctx, InitState[i]->GetValue(ctx)); @@ -269,11 +273,28 @@ IComputationNode* WrapWideCondense1(TCallable& callable, const TComputationNodeF std::generate_n(std::back_inserter(state), outputWidth, [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); } ); - if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { - if (isOptional) { - return new TWideCondense1Wrapper<true>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState)); - } else { - return new TWideCondense1Wrapper<false>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState)); + index = 2 + inputWidth + 3 * outputWidth; + bool useCtx = false; + if (index < callable.GetInputsCount()) { + useCtx = AS_VALUE(TDataLiteral, callable.GetInput(index))->AsValue().Get<bool>(); + ++index; + } + + if (useCtx) { + if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { + if (isOptional) { + return new TWideCondense1Wrapper<true, true>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState)); + } else { + return new TWideCondense1Wrapper<false, true>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState)); + } + } + } else { + if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { + if (isOptional) { + return new TWideCondense1Wrapper<true, false>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState)); + } else { + return new TWideCondense1Wrapper<false, false>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState)); + } } } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp index 15777c32880..f010db6bf87 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp @@ -324,5 +324,12 @@ const IComputationNode* GetCommonSource(const IComputationNode* first, const ICo return common; } +void CleanupCurrentContext() { + auto& allocState = *TlsAllocState; + if (allocState.CurrentContext) { + TAllocState::CleanupPAllocList(allocState.CurrentPAllocList); + } +} + } } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_impl.h b/ydb/library/yql/minikql/computation/mkql_computation_node_impl.h index 0a63ca9d349..01d170cf119 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_impl.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_impl.h @@ -1084,6 +1084,7 @@ std::optional<size_t> IsPasstrought(const IComputationNode* root, const TComputa TPasstroughtMap MergePasstroughtMaps(const TPasstroughtMap& lhs, const TPasstroughtMap& rhs); void ApplyChanges(const NUdf::TUnboxedValue& value, NUdf::IApplyContext& applyCtx); +void CleanupCurrentContext(); } } diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 8b98460e6f5..3d8e454beb4 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -574,7 +574,7 @@ TRuntimeNode TProgramBuilder::Reduce(TRuntimeNode list, TRuntimeNode state1, TRuntimeNode TProgramBuilder::Condense(TRuntimeNode flow, TRuntimeNode state, const TBinaryLambda& switcher, - const TBinaryLambda& handler) { + const TBinaryLambda& handler, bool useCtx) { const auto flowType = flow.GetStaticType(); if (flowType->IsList()) { @@ -601,12 +601,17 @@ TRuntimeNode TProgramBuilder::Condense(TRuntimeNode flow, TRuntimeNode state, callableBuilder.Add(stateArg); callableBuilder.Add(outSwitch); callableBuilder.Add(newState); + if (useCtx) { + MKQL_ENSURE(RuntimeVersion >= 30U, "Too old runtime version"); + callableBuilder.Add(NewDataLiteral<bool>(useCtx)); + } + return TRuntimeNode(callableBuilder.Build(), false); } TRuntimeNode TProgramBuilder::Condense1(TRuntimeNode flow, const TUnaryLambda& init, const TBinaryLambda& switcher, - const TBinaryLambda& handler) { + const TBinaryLambda& handler, bool useCtx) { const auto flowType = flow.GetStaticType(); if (flowType->IsList()) { @@ -635,6 +640,11 @@ TRuntimeNode TProgramBuilder::Condense1(TRuntimeNode flow, const TUnaryLambda& i callableBuilder.Add(stateArg); callableBuilder.Add(outSwitch); callableBuilder.Add(newState); + if (useCtx) { + MKQL_ENSURE(RuntimeVersion >= 30U, "Too old runtime version"); + callableBuilder.Add(NewDataLiteral<bool>(useCtx)); + } + return TRuntimeNode(callableBuilder.Build(), false); } @@ -4409,7 +4419,7 @@ TRuntimeNode TProgramBuilder::WideLastCombiner(TRuntimeNode flow, const TWideLam return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& update) { +TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& update, bool useCtx) { if constexpr (RuntimeVersion < 18U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } @@ -4444,6 +4454,11 @@ TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda std::for_each(stateArgs.cbegin(), stateArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); callableBuilder.Add(chop); std::for_each(next.cbegin(), next.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); + if (useCtx) { + MKQL_ENSURE(RuntimeVersion >= 30U, "Too old runtime version"); + callableBuilder.Add(NewDataLiteral<bool>(useCtx)); + } + return TRuntimeNode(callableBuilder.Build(), false); } diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 4412523dced..cf86862078b 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -311,10 +311,10 @@ public: const TBinaryLambda& handler3); TRuntimeNode Condense(TRuntimeNode stream, TRuntimeNode state, const TBinaryLambda& switcher, - const TBinaryLambda& handler); + const TBinaryLambda& handler, bool useCtx = false); TRuntimeNode Condense1(TRuntimeNode stream, const TUnaryLambda& init, const TBinaryLambda& switcher, - const TBinaryLambda& handler); + const TBinaryLambda& handler, bool useCtx = false); TRuntimeNode Squeeze(TRuntimeNode stream, TRuntimeNode state, const TBinaryLambda& handler, const TUnaryLambda& save = {}, @@ -372,7 +372,7 @@ public: TRuntimeNode WideCombiner(TRuntimeNode flow, ui64 memLimit, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish); TRuntimeNode WideLastCombiner(TRuntimeNode flow, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish); - TRuntimeNode WideCondense1(TRuntimeNode stream, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& handler); + TRuntimeNode WideCondense1(TRuntimeNode stream, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& handler, bool useCtx = false); TRuntimeNode Length(TRuntimeNode listOrDict); TRuntimeNode Iterator(TRuntimeNode list, const TArrayRef<const TRuntimeNode>& dependentNodes); diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index e085f8e97e0..2fc40f294cc 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -675,7 +675,8 @@ TMkqlCommonCallableCompiler::TShared::TShared() { [&](TRuntimeNode::TList items, TRuntimeNode::TList state) { items.insert(items.cend(), state.cbegin(), state.cend()); return MkqlBuildWideLambda(*node.Child(3), ctx, items); - }); + }, + HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3))); }); AddCallable("WideCombiner", [](const TExprNode& node, TMkqlBuildContext& ctx) { @@ -1045,7 +1046,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { }, [&](TRuntimeNode item, TRuntimeNode state) { return MkqlBuildLambda(*node.Child(3), ctx, {item, state}); - }); + }, HasContextFuncs(*node.Child(3))); }); AddCallable("Condense1", [](const TExprNode& node, TMkqlBuildContext& ctx) { @@ -1059,7 +1060,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { }, [&](TRuntimeNode item, TRuntimeNode state) { return MkqlBuildLambda(*node.Child(3), ctx, {item, state}); - }); + }, HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3))); }); AddCallable("Squeeze", [](const TExprNode& node, TMkqlBuildContext& ctx) { |