aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2023-11-30 12:24:45 +0300
committera-romanov <Anton.Romanov@ydb.tech>2023-11-30 13:13:04 +0300
commit3ead31aa7b26ef9e65aa73bbd2282f9119793a29 (patch)
tree8e91fb63b9f3568e42381e07d7d447d20c56727d
parent8951ddf780e02616cdb2ec54a02bc354e8507c0f (diff)
downloadydb-3ead31aa7b26ef9e65aa73bbd2282f9119793a29.tar.gz
KIKIMR-19512 Option for ignore yields in WideCombiner.
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp94
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp146
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp13
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h2
-rw-r--r--ydb/library/yql/minikql/mkql_runtime_version.h2
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp4
6 files changed, 229 insertions, 32 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
index 73c9077fb49..a0ee20997f1 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -5,6 +5,7 @@
#include <ydb/library/yql/minikql/computation/mkql_llvm_base.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_runtime_version.h>
#include <ydb/library/yql/minikql/mkql_stats_registry.h>
#include <ydb/library/yql/minikql/defs.h>
#include <ydb/library/yql/utils/cast.h>
@@ -244,7 +245,13 @@ public:
return isNew;
}
- bool IsEmpty() {
+ template<bool SkipYields>
+ bool ReadMore() {
+ if constexpr (SkipYields) {
+ if (EFetchResult::Yield == InputStatus)
+ return true;
+ }
+
if (!States.Empty())
return false;
@@ -337,13 +344,13 @@ public:
};
#endif
-template <bool TrackRss>
-class TWideCombinerWrapper: public TStatefulWideFlowCodegeneratorNode<TWideCombinerWrapper<TrackRss>>
+template <bool TrackRss, bool SkipYields>
+class TWideCombinerWrapper: public TStatefulWideFlowCodegeneratorNode<TWideCombinerWrapper<TrackRss, SkipYields>>
#ifndef MKQL_DISABLE_CODEGEN
, public ICodegeneratorRootNode
#endif
{
-using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideCombinerWrapper<TrackRss>>;
+using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideCombinerWrapper<TrackRss, SkipYields>>;
public:
TWideCombinerWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TCombinerNodes&& nodes, TKeyTypes&& keyTypes, ui64 memLimit)
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
@@ -360,13 +367,16 @@ public:
}
while (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) {
- if (ptr->IsEmpty()) {
+ if (ptr->ReadMore<SkipYields>()) {
switch (ptr->InputStatus) {
case EFetchResult::One:
break;
case EFetchResult::Yield:
ptr->InputStatus = EFetchResult::One;
- return EFetchResult::Yield;
+ if constexpr (SkipYields)
+ break;
+ else
+ return EFetchResult::Yield;
case EFetchResult::Finish:
return EFetchResult::Finish;
}
@@ -381,8 +391,16 @@ public:
fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx);
ptr->InputStatus = Flow->FetchValues(ctx, fields);
- if (EFetchResult::One != ptr->InputStatus) {
- break;
+ if constexpr (SkipYields) {
+ if (EFetchResult::Yield == ptr->InputStatus) {
+ return EFetchResult::Yield;
+ } else if (EFetchResult::Finish == ptr->InputStatus) {
+ break;
+ }
+ } else {
+ if (EFetchResult::One != ptr->InputStatus) {
+ break;
+ }
}
Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
@@ -441,15 +459,15 @@ public:
const auto over = BasicBlock::Create(context, "over", ctx.Func);
const auto result = PHINode::Create(statusType, 3U, "result", over);
- const auto isEmptyFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::IsEmpty));
- const auto isEmptyFuncType = FunctionType::get(Type::getInt1Ty(context), { statePtrType }, false);
- const auto isEmptyFuncPtr = CastInst::Create(Instruction::IntToPtr, isEmptyFunc, PointerType::getUnqual(isEmptyFuncType), "empty_func", block);
- const auto empty = CallInst::Create(isEmptyFuncType, isEmptyFuncPtr, { stateArg }, "empty", block);
+ const auto readMoreFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::ReadMore<SkipYields>));
+ const auto readMoreFuncType = FunctionType::get(Type::getInt1Ty(context), { statePtrType }, false);
+ const auto readMoreFuncPtr = CastInst::Create(Instruction::IntToPtr, readMoreFunc, PointerType::getUnqual(readMoreFuncType), "read_more_func", block);
+ const auto readMore = CallInst::Create(readMoreFuncType, readMoreFuncPtr, { stateArg }, "read_more", block);
const auto next = BasicBlock::Create(context, "next", ctx.Func);
const auto full = BasicBlock::Create(context, "full", ctx.Func);
- BranchInst::Create(next, full, empty, block);
+ BranchInst::Create(next, full, readMore, block);
{
block = next;
@@ -473,9 +491,15 @@ public:
block = rest;
new StoreInst(ConstantInt::get(last->getType(), static_cast<i32>(EFetchResult::One)), statusPtr, block);
- result->addIncoming(last, block);
+ if constexpr (SkipYields) {
+ new StoreInst(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::One)), statusPtr, block);
- BranchInst::Create(over, block);
+ BranchInst::Create(pull, block);
+ } else {
+ result->addIncoming(last, block);
+
+ BranchInst::Create(over, block);
+ }
block = pull;
@@ -487,9 +511,22 @@ public:
const auto getres = GetNodeValues(Flow, ctx, block);
- const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), static_cast<i32>(EFetchResult::Yield)), "special", block);
+ if constexpr (SkipYields) {
+ const auto save = BasicBlock::Create(context, "save", ctx.Func);
- BranchInst::Create(done, good, special, block);
+ const auto way = SwitchInst::Create(getres.first, good, 2U, block);
+ way->addCase(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), save);
+ way->addCase(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Finish)), done);
+
+ block = save;
+
+ new StoreInst(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), statusPtr, block);
+ result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), block);
+ BranchInst::Create(over, block);
+ } else {
+ const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), static_cast<i32>(EFetchResult::Yield)), "special", block);
+ BranchInst::Create(done, good, special, block);
+ }
block = good;
@@ -1128,11 +1165,24 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF
if constexpr (Last)
return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes));
else {
- const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<ui64>();
- if (EGraphPerProcess::Single == ctx.GraphPerProcess)
- return new TWideCombinerWrapper<true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit);
- else
- return new TWideCombinerWrapper<false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit);
+ if constexpr (RuntimeVersion < 46U) {
+ const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<ui64>();
+ if (EGraphPerProcess::Single == ctx.GraphPerProcess)
+ return new TWideCombinerWrapper<true, false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit);
+ else
+ return new TWideCombinerWrapper<false, false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit);
+ } else {
+ if (const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<i64>(); memLimit >= 0)
+ if (EGraphPerProcess::Single == ctx.GraphPerProcess)
+ return new TWideCombinerWrapper<true, false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), ui64(memLimit));
+ else
+ return new TWideCombinerWrapper<false, false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), ui64(memLimit));
+ else
+ if (EGraphPerProcess::Single == ctx.GraphPerProcess)
+ return new TWideCombinerWrapper<true, true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), ui64(-memLimit));
+ else
+ return new TWideCombinerWrapper<false, true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), ui64(-memLimit));
+ }
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
index 8e49947c0d1..1984d7bde28 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
@@ -1,5 +1,8 @@
#include "mkql_computation_node_ut.h"
+
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_runtime_version.h>
+#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
@@ -10,9 +13,112 @@
namespace NKikimr {
namespace NMiniKQL {
namespace {
-const auto border = 9124596000000000ULL;
+
+constexpr auto border = 9124596000000000ULL;
+constexpr ui64 g_Yield = std::numeric_limits<ui64>::max();
+constexpr ui64 g_TestYieldStreamData[] = {0, 1, 0, 2, g_Yield, 0, g_Yield, 1, 2, 0, 1, 3, 0, g_Yield, 1, 2};
+
+class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper> {
+using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
+public:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ public:
+ using TBase = TComputationValue<TStreamValue>;
+
+ TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx)
+ : TBase(memInfo), CompCtx(compCtx)
+ {}
+ private:
+ NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
+ constexpr auto size = Y_ARRAY_SIZE(g_TestYieldStreamData);
+ if (Index == size) {
+ return NUdf::EFetchStatus::Finish;
+ }
+
+ const auto val = g_TestYieldStreamData[Index];
+ if (g_Yield == val) {
+ ++Index;
+ return NUdf::EFetchStatus::Yield;
+ }
+
+ NUdf::TUnboxedValue* items = nullptr;
+ result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items);
+ items[0] = NUdf::TUnboxedValuePod(val);
+ items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val)));
+
+ ++Index;
+ return NUdf::EFetchStatus::Ok;
+ }
+
+ private:
+ TComputationContext& CompCtx;
+ ui64 Index = 0;
+ };
+
+ TTestStreamWrapper(TComputationMutables& mutables)
+ : TBaseComputation(mutables)
+ {}
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TStreamValue>(ctx);
+ }
+private:
+ void RegisterDependencies() const final {}
+};
+
+IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx) {
+ return new TTestStreamWrapper(ctx.Mutables);
+}
+
+TComputationNodeFactory GetNodeFactory() {
+ return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ if (callable.GetType()->GetName() == "TestYieldStream") {
+ return WrapTestStream(ctx);
+ }
+ return GetBuiltinFactory()(callable, ctx);
+ };
}
+template <bool LLVM>
+TRuntimeNode MakeStream(TSetup<LLVM>& setup) {
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ TCallableBuilder callableBuilder(*setup.Env, "TestYieldStream",
+ pb.NewStreamType(
+ pb.NewStructType({
+ {TStringBuf("a"), pb.NewDataType(NUdf::EDataSlot::Uint64)},
+ {TStringBuf("b"), pb.NewDataType(NUdf::EDataSlot::String)}
+ })
+ )
+ );
+
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+template <bool OverFlow>
+TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)> finishLambda) {
+ const auto keyExtractor = [&](TRuntimeNode item) {
+ return pb.Member(item, "a");
+ };
+ const auto init = [&](TRuntimeNode /*key*/, TRuntimeNode item) {
+ return item;
+ };
+ const auto update = [&](TRuntimeNode /*key*/, TRuntimeNode item, TRuntimeNode state) {
+ const auto a = pb.Add(pb.Member(item, "a"), pb.Member(state, "a"));
+ const auto b = pb.Concat(pb.Member(item, "b"), pb.Member(state, "b"));
+ return pb.NewStruct({
+ {TStringBuf("a"), a},
+ {TStringBuf("b"), b},
+ });
+ };
+
+ return OverFlow ?
+ pb.FromFlow(pb.CombineCore(pb.ToFlow(stream), keyExtractor, init, update, finishLambda, 64ul << 20)):
+ pb.CombineCore(stream, keyExtractor, init, update, finishLambda, 64ul << 20);
+}
+
+} // unnamed
+
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
Y_UNIT_TEST_LLVM(TestLongStringsRefCounting) {
@@ -55,7 +161,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), 0ULL,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), -100000LL,
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
return {pb.NewOptional(items.back()), pb.NewOptional(keys.front()), pb.NewEmptyOptional(optionalType), pb.NewEmptyOptional(optionalType)};
@@ -131,7 +237,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), 0ULL,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), -1000000LL,
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
return {items.back(), keys.front(), items.back(), items.front()};
@@ -203,7 +309,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list),
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }), 0ULL,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }), -1000000LL,
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
return {items.back(), keys.front(), empty, empty};
@@ -321,6 +427,38 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
+#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 46u
+ Y_UNIT_TEST_LLVM(TestHasLimitButPasstroughtYields) {
+ TSetup<LLVM> setup(GetNodeFactory());
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto stream = MakeStream<LLVM>(setup);
+ const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(stream),
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Member(item, "a"), pb.Member(item, "b")}; }), -123456789LL,
+ [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {state.front(), pb.AggrConcat(state.back(), items.back())}; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
+ [&](TRuntimeNode::TList items) { return items.back(); }
+ ));
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto streamVal = graph->GetValue();
+ NUdf::TUnboxedValue result;
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "00000");
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "1111");
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "222");
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
+ UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "3");
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
+ }
+#endif
}
Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) {
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 367305c09fb..4fce599bdb9 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -4711,11 +4711,17 @@ TRuntimeNode TProgramBuilder::CommonJoinCore(TRuntimeNode flow, EJoinKind joinKi
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, ui64 memLimit, const TWideLambda& extractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish) {
+TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, i64 memLimit, const TWideLambda& extractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish) {
if constexpr (RuntimeVersion < 18U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
+ if (memLimit < 0) {
+ if constexpr (RuntimeVersion < 46U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__ << " with limit " << memLimit;
+ }
+ }
+
const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
@@ -4755,7 +4761,10 @@ TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, ui64 memLimit, con
TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems)));
callableBuilder.Add(flow);
- callableBuilder.Add(NewDataLiteral(memLimit));
+ if constexpr (RuntimeVersion < 46U)
+ callableBuilder.Add(NewDataLiteral(ui64(memLimit)));
+ else
+ callableBuilder.Add(NewDataLiteral(memLimit));
callableBuilder.Add(NewDataLiteral(ui32(keyArgs.size())));
callableBuilder.Add(NewDataLiteral(ui32(stateArgs.size())));
std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 16aab5a7f30..c207a4386c9 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -418,7 +418,7 @@ public:
TRuntimeNode WideTakeWhileInclusive(TRuntimeNode flow, const TNarrowLambda& handler);
TRuntimeNode WideSkipWhileInclusive(TRuntimeNode flow, const TNarrowLambda& handler);
- TRuntimeNode WideCombiner(TRuntimeNode flow, ui64 memLimit, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish);
+ TRuntimeNode WideCombiner(TRuntimeNode flow, i64 memLimit, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish);
TRuntimeNode WideLastCombiner(TRuntimeNode flow, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish);
TRuntimeNode WideCondense1(TRuntimeNode stream, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& handler, bool useCtx = false);
diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h
index a5650b04fc2..0a90b23d022 100644
--- a/ydb/library/yql/minikql/mkql_runtime_version.h
+++ b/ydb/library/yql/minikql/mkql_runtime_version.h
@@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 45U
+#define MKQL_RUNTIME_VERSION 46U
#endif
// History:
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 4bc9a93c77c..5bcba572472 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -733,8 +733,8 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
AddCallable("WideCombiner", [](const TExprNode& node, TMkqlBuildContext& ctx) {
const auto flow = MkqlBuildExpr(node.Head(), ctx);
- ui64 memLimit = 0ULL;
- const bool withLimit = TryFromString<ui64>(node.Child(1U)->Content(), memLimit);
+ i64 memLimit = 0LL;
+ const bool withLimit = TryFromString<i64>(node.Child(1U)->Content(), memLimit);
const auto keyExtractor = [&](TRuntimeNode::TList items) {
return MkqlBuildWideLambda(*node.Child(2U), ctx, items);