aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.ru>2022-03-24 19:04:55 +0300
committervvvv <vvvv@yandex-team.ru>2022-03-24 19:04:55 +0300
commita6a11e07f1f3d6a0151440a67319641ece7e068a (patch)
treeadcf853cbdd91b978fda7e5d0c38187a59b2600f
parent5b38bc8a727d799624f5da8f3065d11981508402 (diff)
downloadydb-a6a11e07f1f3d6a0151440a67319641ece7e068a.tar.gz
YQL-13710 cleanup current context after switch in Condense/Condense1/WideCondense
ref:f272d0444db432cbea099c3d28ea5905511dff3e
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_combine.cpp6
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_condense.cpp70
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp67
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_squeeze_state.h3
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp6
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp37
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp7
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_impl.h1
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp21
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h6
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp7
11 files changed, 169 insertions, 62 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_combine.cpp
index 61400db1974..ce646cf6b11 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_combine.cpp
@@ -93,11 +93,7 @@ public:
return false;
}
- auto& allocState = *TlsAllocState;
- if (allocState.CurrentContext) {
- TAllocState::CleanupPAllocList(allocState.CurrentPAllocList);
- }
-
+ CleanupCurrentContext();
return true;
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_condense.cpp
index c179abab492..33f5e9f1e10 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_condense.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_condense.cpp
@@ -11,9 +11,9 @@ namespace NMiniKQL {
namespace {
-template <bool Interruptable>
-class TCondenseFlowWrapper : public TStatefulFlowCodegeneratorNode<TCondenseFlowWrapper<Interruptable>> {
- typedef TStatefulFlowCodegeneratorNode<TCondenseFlowWrapper<Interruptable>> TBaseComputation;
+template <bool Interruptable, bool UseCtx>
+class TCondenseFlowWrapper : public TStatefulFlowCodegeneratorNode<TCondenseFlowWrapper<Interruptable, UseCtx>> {
+ typedef TStatefulFlowCodegeneratorNode<TCondenseFlowWrapper<Interruptable, UseCtx>> TBaseComputation;
public:
TCondenseFlowWrapper(
TComputationMutables& mutables,
@@ -36,6 +36,11 @@ public:
if (state.IsInvalid()) {
state = NUdf::TUnboxedValuePod();
State->SetValue(ctx, InitState->GetValue(ctx));
+ } else if (UseCtx && state.IsEmbedded()) {
+ CleanupCurrentContext();
+ state = NUdf::TUnboxedValuePod();
+ State->SetValue(ctx, InitState->GetValue(ctx));
+ State->SetValue(ctx, UpdateState->GetValue(ctx));
}
while (true) {
@@ -58,8 +63,12 @@ public:
if (reset.template Get<bool>()) {
auto result = State->GetValue(ctx);
- State->SetValue(ctx, InitState->GetValue(ctx));
- State->SetValue(ctx, UpdateState->GetValue(ctx));
+ if (UseCtx) {
+ state = NUdf::TUnboxedValuePod(0);
+ } else {
+ State->SetValue(ctx, InitState->GetValue(ctx));
+ State->SetValue(ctx, UpdateState->GetValue(ctx));
+ }
return result.Release();
}
}
@@ -167,9 +176,9 @@ private:
IComputationNode* const UpdateState;
};
-template <bool Interruptable>
-class TCondenseWrapper : public TCustomValueCodegeneratorNode<TCondenseWrapper<Interruptable>> {
- typedef TCustomValueCodegeneratorNode<TCondenseWrapper<Interruptable>> TBaseComputation;
+template <bool Interruptable, bool UseCtx>
+class TCondenseWrapper : public TCustomValueCodegeneratorNode<TCondenseWrapper<Interruptable, UseCtx>> {
+ typedef TCustomValueCodegeneratorNode<TCondenseWrapper<Interruptable, UseCtx>> TBaseComputation;
public:
class TValue : public TComputationValue<TValue> {
public:
@@ -211,6 +220,11 @@ public:
if (ESqueezeState::Idle == State.Stage) {
State.Stage = ESqueezeState::Work;
State.State->SetValue(Ctx, State.InitState->GetValue(Ctx));
+ } else if (UseCtx && ESqueezeState::NeedInit == State.Stage) {
+ CleanupCurrentContext();
+ State.Stage = ESqueezeState::Work;
+ State.State->SetValue(Ctx, State.InitState->GetValue(Ctx));
+ State.State->SetValue(Ctx, State.UpdateState->GetValue(Ctx));
}
for (;;) {
@@ -231,8 +245,13 @@ public:
if (reset.template Get<bool>()) {
result = State.State->GetValue(Ctx);
- State.State->SetValue(Ctx, State.InitState->GetValue(Ctx));
- State.State->SetValue(Ctx, State.UpdateState->GetValue(Ctx));
+ if (UseCtx) {
+ State.Stage = ESqueezeState::NeedInit;
+ } else {
+ State.State->SetValue(Ctx, State.InitState->GetValue(Ctx));
+ State.State->SetValue(Ctx, State.UpdateState->GetValue(Ctx));
+ }
+
return NUdf::EFetchStatus::Ok;
}
}
@@ -441,9 +460,8 @@ private:
}
-IComputationNode* WrapCondense(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 args");
-
+template <bool UseCtx>
+IComputationNode* WrapCondenseImpl(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
const auto initState = LocateNode(ctx.NodeLocator, callable, 1);
const auto outSwitch = LocateNode(ctx.NodeLocator, callable, 4);
@@ -459,19 +477,35 @@ IComputationNode* WrapCondense(TCallable& callable, const TComputationNodeFactor
if (type->IsFlow()) {
const auto kind = GetValueRepresentation(AS_TYPE(TFlowType, type)->GetItemType());
if (isOptional) {
- return new TCondenseFlowWrapper<true>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState);
+ return new TCondenseFlowWrapper<true, UseCtx>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState);
} else {
- return new TCondenseFlowWrapper<false>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState);
+ return new TCondenseFlowWrapper<false, UseCtx>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState);
}
} else {
if (isOptional) {
- return new TCondenseWrapper<true>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState);
+ return new TCondenseWrapper<true, UseCtx>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState);
} else {
- return new TCondenseWrapper<false>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState);
+ return new TCondenseWrapper<false, UseCtx>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState);
}
}
}
+IComputationNode* WrapCondense(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 6 || callable.GetInputsCount() == 7, "Expected 6 or 7 args");
+
+ bool useCtx = false;
+ if (callable.GetInputsCount() >= 7) {
+ useCtx = AS_VALUE(TDataLiteral, callable.GetInput(6))->AsValue().Get<bool>();
+ }
+
+ if (useCtx) {
+ return WrapCondenseImpl<true>(callable, ctx);
+ } else {
+ return WrapCondenseImpl<false>(callable, ctx);
+ }
+
+}
+
IComputationNode* WrapSqueeze(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 9, "Expected 9 args");
@@ -495,7 +529,7 @@ IComputationNode* WrapSqueeze(TCallable& callable, const TComputationNodeFactory
}
const auto stateType = hasSaveLoad ? callable.GetInput(6).GetStaticType() : nullptr;
- return new TCondenseWrapper<false>(ctx.Mutables, stream, item, state, nullptr, initState, updateState, inSave, outSave, inLoad, outLoad, stateType);
+ return new TCondenseWrapper<false, false>(ctx.Mutables, stream, item, state, nullptr, initState, updateState, inSave, outSave, inLoad, outLoad, stateType);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
index 2f5ac894325..694aa85808d 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_condense1.cpp
@@ -11,9 +11,9 @@ namespace NMiniKQL {
namespace {
-template <bool Interruptable>
-class TCondense1FlowWrapper : public TStatefulFlowCodegeneratorNode<TCondense1FlowWrapper<Interruptable>> {
- typedef TStatefulFlowCodegeneratorNode<TCondense1FlowWrapper<Interruptable>> TBaseComputation;
+template <bool Interruptable, bool UseCtx>
+class TCondense1FlowWrapper : public TStatefulFlowCodegeneratorNode<TCondense1FlowWrapper<Interruptable, UseCtx>> {
+ typedef TStatefulFlowCodegeneratorNode<TCondense1FlowWrapper<Interruptable, UseCtx>> TBaseComputation;
public:
TCondense1FlowWrapper(
TComputationMutables& mutables,
@@ -33,6 +33,12 @@ public:
return static_cast<const NUdf::TUnboxedValuePod&>(state);
}
+ if (UseCtx && state.IsEmbedded()) {
+ CleanupCurrentContext();
+ state = NUdf::TUnboxedValuePod();
+ State->SetValue(ctx, InitState->GetValue(ctx));
+ }
+
while (true) {
auto item = Flow->GetValue(ctx);
if (item.IsYield()) {
@@ -57,7 +63,12 @@ public:
if (reset.template Get<bool>()) {
auto result = State->GetValue(ctx);
- State->SetValue(ctx, InitState->GetValue(ctx));
+ if (UseCtx) {
+ state = NUdf::TUnboxedValuePod(0);
+ } else {
+ State->SetValue(ctx, InitState->GetValue(ctx));
+ }
+
return result.Release();
}
}
@@ -180,9 +191,9 @@ private:
IComputationNode* const UpdateState;
};
-template <bool Interruptable>
-class TCondense1Wrapper : public TCustomValueCodegeneratorNode<TCondense1Wrapper<Interruptable>> {
- typedef TCustomValueCodegeneratorNode<TCondense1Wrapper<Interruptable>> TBaseComputation;
+template <bool Interruptable, bool UseCtx>
+class TCondense1Wrapper : public TCustomValueCodegeneratorNode<TCondense1Wrapper<Interruptable, UseCtx>> {
+ typedef TCustomValueCodegeneratorNode<TCondense1Wrapper<Interruptable, UseCtx>> TBaseComputation;
public:
class TValue : public TComputationValue<TValue> {
public:
@@ -221,6 +232,12 @@ public:
if (ESqueezeState::Finished == State.Stage)
return NUdf::EFetchStatus::Finish;
+ if (UseCtx && State.Stage == ESqueezeState::NeedInit) {
+ CleanupCurrentContext();
+ State.Stage = ESqueezeState::Work;
+ State.State->SetValue(Ctx, State.InitState->GetValue(Ctx));
+ }
+
for (;;) {
const auto status = Stream.Fetch(State.Item->RefValue(Ctx));
if (status == NUdf::EFetchStatus::Yield) {
@@ -243,7 +260,12 @@ public:
if (reset.template Get<bool>()) {
result = State.State->GetValue(Ctx);
- State.State->SetValue(Ctx, State.InitState->GetValue(Ctx));
+ if (UseCtx) {
+ State.Stage = ESqueezeState::NeedInit;
+ } else {
+ State.State->SetValue(Ctx, State.InitState->GetValue(Ctx));
+ }
+
return NUdf::EFetchStatus::Ok;
}
}
@@ -472,9 +494,8 @@ private:
}
-IComputationNode* WrapCondense1(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 args");
-
+template <bool UseCtx>
+IComputationNode* WrapCondense1Impl(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
const auto initState = LocateNode(ctx.NodeLocator, callable, 2);
const auto outSwitch = LocateNode(ctx.NodeLocator, callable, 4);
@@ -490,19 +511,33 @@ IComputationNode* WrapCondense1(TCallable& callable, const TComputationNodeFacto
if (type->IsFlow()) {
const auto kind = GetValueRepresentation(AS_TYPE(TFlowType, type)->GetItemType());
if (isOptional) {
- return new TCondense1FlowWrapper<true>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState);
+ return new TCondense1FlowWrapper<true, UseCtx>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState);
} else {
- return new TCondense1FlowWrapper<false>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState);
+ return new TCondense1FlowWrapper<false, UseCtx>(ctx.Mutables, kind, stream, item, state, outSwitch, initState, updateState);
}
} else {
if (isOptional) {
- return new TCondense1Wrapper<true>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState);
+ return new TCondense1Wrapper<true, UseCtx>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState);
} else {
- return new TCondense1Wrapper<false>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState);
+ return new TCondense1Wrapper<false, UseCtx>(ctx.Mutables, stream, item, state, outSwitch, initState, updateState);
}
}
}
+IComputationNode* WrapCondense1(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 6 || callable.GetInputsCount() == 7, "Expected 6 or 7 args");
+ bool useCtx = false;
+ if (callable.GetInputsCount() >= 7) {
+ useCtx = AS_VALUE(TDataLiteral, callable.GetInput(6))->AsValue().Get<bool>();
+ }
+
+ if (useCtx) {
+ return WrapCondense1Impl<true>(callable, ctx);
+ } else {
+ return WrapCondense1Impl<false>(callable, ctx);
+ }
+}
+
IComputationNode* WrapSqueeze1(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 9, "Expected 9 args");
@@ -526,7 +561,7 @@ IComputationNode* WrapSqueeze1(TCallable& callable, const TComputationNodeFactor
}
const auto stateType = hasSaveLoad ? callable.GetInput(6).GetStaticType() : nullptr;
- return new TCondense1Wrapper<false>(ctx.Mutables, stream, item, state, nullptr, initState, updateState, inSave, outSave, inLoad, outLoad, stateType);
+ return new TCondense1Wrapper<false, false>(ctx.Mutables, stream, item, state, nullptr, initState, updateState, inSave, outSave, inLoad, outLoad, stateType);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_squeeze_state.h b/ydb/library/yql/minikql/comp_nodes/mkql_squeeze_state.h
index 6896c9f2a95..42d4d7c6623 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_squeeze_state.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_squeeze_state.h
@@ -9,7 +9,8 @@ namespace NMiniKQL {
enum class ESqueezeState : ui8 {
Idle = 0,
Work,
- Finished
+ Finished,
+ NeedInit
};
struct TSqueezeState {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
index ded74a611b2..778a594b958 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -176,11 +176,7 @@ public:
Tongue = Storage.back().data();
}
- auto& allocState = *TlsAllocState;
- if (allocState.CurrentContext) {
- TAllocState::CleanupPAllocList(allocState.CurrentPAllocList);
- }
-
+ CleanupCurrentContext();
return true;
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
index 6473ad4b44d..118a9e7c1c5 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
@@ -13,9 +13,9 @@ using NYql::EnsureDynamicCast;
namespace {
-template <bool Interruptable>
-class TWideCondense1Wrapper : public TStatefulWideFlowCodegeneratorNode<TWideCondense1Wrapper<Interruptable>> {
-using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideCondense1Wrapper<Interruptable>>;
+template <bool Interruptable, bool UseCtx>
+class TWideCondense1Wrapper : public TStatefulWideFlowCodegeneratorNode<TWideCondense1Wrapper<Interruptable, UseCtx>> {
+using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideCondense1Wrapper<Interruptable, UseCtx>>;
public:
TWideCondense1Wrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow,
TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& initState,
@@ -32,6 +32,10 @@ public:
if (state.IsFinish()) {
return EFetchResult::Finish;
} else if (state.HasValue() && state.Get<bool>()) {
+ if (UseCtx) {
+ CleanupCurrentContext();
+ }
+
state = NUdf::TUnboxedValuePod(false);
for (ui32 i = 0U; i < State.size(); ++i)
State[i]->SetValue(ctx, InitState[i]->GetValue(ctx));
@@ -269,11 +273,28 @@ IComputationNode* WrapWideCondense1(TCallable& callable, const TComputationNodeF
std::generate_n(std::back_inserter(state), outputWidth, [&](){ return LocateExternalNode(ctx.NodeLocator, callable, ++index); } );
- if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
- if (isOptional) {
- return new TWideCondense1Wrapper<true>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState));
- } else {
- return new TWideCondense1Wrapper<false>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState));
+ index = 2 + inputWidth + 3 * outputWidth;
+ bool useCtx = false;
+ if (index < callable.GetInputsCount()) {
+ useCtx = AS_VALUE(TDataLiteral, callable.GetInput(index))->AsValue().Get<bool>();
+ ++index;
+ }
+
+ if (useCtx) {
+ if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
+ if (isOptional) {
+ return new TWideCondense1Wrapper<true, true>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState));
+ } else {
+ return new TWideCondense1Wrapper<false, true>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState));
+ }
+ }
+ } else {
+ if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
+ if (isOptional) {
+ return new TWideCondense1Wrapper<true, false>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState));
+ } else {
+ return new TWideCondense1Wrapper<false, false>(ctx.Mutables, wide, std::move(items), std::move(initState), std::move(state), outSwitch, std::move(updateState));
+ }
}
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
index 15777c32880..f010db6bf87 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_impl.cpp
@@ -324,5 +324,12 @@ const IComputationNode* GetCommonSource(const IComputationNode* first, const ICo
return common;
}
+void CleanupCurrentContext() {
+ auto& allocState = *TlsAllocState;
+ if (allocState.CurrentContext) {
+ TAllocState::CleanupPAllocList(allocState.CurrentPAllocList);
+ }
+}
+
}
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_impl.h b/ydb/library/yql/minikql/computation/mkql_computation_node_impl.h
index 0a63ca9d349..01d170cf119 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_impl.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_impl.h
@@ -1084,6 +1084,7 @@ std::optional<size_t> IsPasstrought(const IComputationNode* root, const TComputa
TPasstroughtMap MergePasstroughtMaps(const TPasstroughtMap& lhs, const TPasstroughtMap& rhs);
void ApplyChanges(const NUdf::TUnboxedValue& value, NUdf::IApplyContext& applyCtx);
+void CleanupCurrentContext();
}
}
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 8b98460e6f5..3d8e454beb4 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -574,7 +574,7 @@ TRuntimeNode TProgramBuilder::Reduce(TRuntimeNode list, TRuntimeNode state1,
TRuntimeNode TProgramBuilder::Condense(TRuntimeNode flow, TRuntimeNode state,
const TBinaryLambda& switcher,
- const TBinaryLambda& handler) {
+ const TBinaryLambda& handler, bool useCtx) {
const auto flowType = flow.GetStaticType();
if (flowType->IsList()) {
@@ -601,12 +601,17 @@ TRuntimeNode TProgramBuilder::Condense(TRuntimeNode flow, TRuntimeNode state,
callableBuilder.Add(stateArg);
callableBuilder.Add(outSwitch);
callableBuilder.Add(newState);
+ if (useCtx) {
+ MKQL_ENSURE(RuntimeVersion >= 30U, "Too old runtime version");
+ callableBuilder.Add(NewDataLiteral<bool>(useCtx));
+ }
+
return TRuntimeNode(callableBuilder.Build(), false);
}
TRuntimeNode TProgramBuilder::Condense1(TRuntimeNode flow, const TUnaryLambda& init,
const TBinaryLambda& switcher,
- const TBinaryLambda& handler) {
+ const TBinaryLambda& handler, bool useCtx) {
const auto flowType = flow.GetStaticType();
if (flowType->IsList()) {
@@ -635,6 +640,11 @@ TRuntimeNode TProgramBuilder::Condense1(TRuntimeNode flow, const TUnaryLambda& i
callableBuilder.Add(stateArg);
callableBuilder.Add(outSwitch);
callableBuilder.Add(newState);
+ if (useCtx) {
+ MKQL_ENSURE(RuntimeVersion >= 30U, "Too old runtime version");
+ callableBuilder.Add(NewDataLiteral<bool>(useCtx));
+ }
+
return TRuntimeNode(callableBuilder.Build(), false);
}
@@ -4409,7 +4419,7 @@ TRuntimeNode TProgramBuilder::WideLastCombiner(TRuntimeNode flow, const TWideLam
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& update) {
+TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& update, bool useCtx) {
if constexpr (RuntimeVersion < 18U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
@@ -4444,6 +4454,11 @@ TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda
std::for_each(stateArgs.cbegin(), stateArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
callableBuilder.Add(chop);
std::for_each(next.cbegin(), next.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
+ if (useCtx) {
+ MKQL_ENSURE(RuntimeVersion >= 30U, "Too old runtime version");
+ callableBuilder.Add(NewDataLiteral<bool>(useCtx));
+ }
+
return TRuntimeNode(callableBuilder.Build(), false);
}
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 4412523dced..cf86862078b 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -311,10 +311,10 @@ public:
const TBinaryLambda& handler3);
TRuntimeNode Condense(TRuntimeNode stream, TRuntimeNode state,
const TBinaryLambda& switcher,
- const TBinaryLambda& handler);
+ const TBinaryLambda& handler, bool useCtx = false);
TRuntimeNode Condense1(TRuntimeNode stream, const TUnaryLambda& init,
const TBinaryLambda& switcher,
- const TBinaryLambda& handler);
+ const TBinaryLambda& handler, bool useCtx = false);
TRuntimeNode Squeeze(TRuntimeNode stream, TRuntimeNode state,
const TBinaryLambda& handler,
const TUnaryLambda& save = {},
@@ -372,7 +372,7 @@ public:
TRuntimeNode WideCombiner(TRuntimeNode flow, ui64 memLimit, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish);
TRuntimeNode WideLastCombiner(TRuntimeNode flow, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish);
- TRuntimeNode WideCondense1(TRuntimeNode stream, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& handler);
+ TRuntimeNode WideCondense1(TRuntimeNode stream, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& handler, bool useCtx = false);
TRuntimeNode Length(TRuntimeNode listOrDict);
TRuntimeNode Iterator(TRuntimeNode list, const TArrayRef<const TRuntimeNode>& dependentNodes);
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index e085f8e97e0..2fc40f294cc 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -675,7 +675,8 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
[&](TRuntimeNode::TList items, TRuntimeNode::TList state) {
items.insert(items.cend(), state.cbegin(), state.cend());
return MkqlBuildWideLambda(*node.Child(3), ctx, items);
- });
+ },
+ HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
});
AddCallable("WideCombiner", [](const TExprNode& node, TMkqlBuildContext& ctx) {
@@ -1045,7 +1046,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
},
[&](TRuntimeNode item, TRuntimeNode state) {
return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
- });
+ }, HasContextFuncs(*node.Child(3)));
});
AddCallable("Condense1", [](const TExprNode& node, TMkqlBuildContext& ctx) {
@@ -1059,7 +1060,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
},
[&](TRuntimeNode item, TRuntimeNode state) {
return MkqlBuildLambda(*node.Child(3), ctx, {item, state});
- });
+ }, HasContextFuncs(*node.Child(1)) || HasContextFuncs(*node.Child(3)));
});
AddCallable("Squeeze", [](const TExprNode& node, TMkqlBuildContext& ctx) {