diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2023-11-30 12:24:45 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2023-11-30 13:13:04 +0300 |
commit | 3ead31aa7b26ef9e65aa73bbd2282f9119793a29 (patch) | |
tree | 8e91fb63b9f3568e42381e07d7d447d20c56727d | |
parent | 8951ddf780e02616cdb2ec54a02bc354e8507c0f (diff) | |
download | ydb-3ead31aa7b26ef9e65aa73bbd2282f9119793a29.tar.gz |
KIKIMR-19512 Option for ignore yields in WideCombiner.
6 files changed, 229 insertions, 32 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index 73c9077fb49..a0ee20997f1 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -5,6 +5,7 @@ #include <ydb/library/yql/minikql/computation/mkql_llvm_base.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_runtime_version.h> #include <ydb/library/yql/minikql/mkql_stats_registry.h> #include <ydb/library/yql/minikql/defs.h> #include <ydb/library/yql/utils/cast.h> @@ -244,7 +245,13 @@ public: return isNew; } - bool IsEmpty() { + template<bool SkipYields> + bool ReadMore() { + if constexpr (SkipYields) { + if (EFetchResult::Yield == InputStatus) + return true; + } + if (!States.Empty()) return false; @@ -337,13 +344,13 @@ public: }; #endif -template <bool TrackRss> -class TWideCombinerWrapper: public TStatefulWideFlowCodegeneratorNode<TWideCombinerWrapper<TrackRss>> +template <bool TrackRss, bool SkipYields> +class TWideCombinerWrapper: public TStatefulWideFlowCodegeneratorNode<TWideCombinerWrapper<TrackRss, SkipYields>> #ifndef MKQL_DISABLE_CODEGEN , public ICodegeneratorRootNode #endif { -using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideCombinerWrapper<TrackRss>>; +using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideCombinerWrapper<TrackRss, SkipYields>>; public: TWideCombinerWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TCombinerNodes&& nodes, TKeyTypes&& keyTypes, ui64 memLimit) : TBaseComputation(mutables, flow, EValueRepresentation::Boxed) @@ -360,13 +367,16 @@ public: } while (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) { - if (ptr->IsEmpty()) { + if (ptr->ReadMore<SkipYields>()) { switch (ptr->InputStatus) { case EFetchResult::One: break; case EFetchResult::Yield: ptr->InputStatus = EFetchResult::One; - return EFetchResult::Yield; + if constexpr (SkipYields) + break; + else + return EFetchResult::Yield; case EFetchResult::Finish: return EFetchResult::Finish; } @@ -381,8 +391,16 @@ public: fields[i] = &Nodes.ItemNodes[i]->RefValue(ctx); ptr->InputStatus = Flow->FetchValues(ctx, fields); - if (EFetchResult::One != ptr->InputStatus) { - break; + if constexpr (SkipYields) { + if (EFetchResult::Yield == ptr->InputStatus) { + return EFetchResult::Yield; + } else if (EFetchResult::Finish == ptr->InputStatus) { + break; + } + } else { + if (EFetchResult::One != ptr->InputStatus) { + break; + } } Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); @@ -441,15 +459,15 @@ public: const auto over = BasicBlock::Create(context, "over", ctx.Func); const auto result = PHINode::Create(statusType, 3U, "result", over); - const auto isEmptyFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::IsEmpty)); - const auto isEmptyFuncType = FunctionType::get(Type::getInt1Ty(context), { statePtrType }, false); - const auto isEmptyFuncPtr = CastInst::Create(Instruction::IntToPtr, isEmptyFunc, PointerType::getUnqual(isEmptyFuncType), "empty_func", block); - const auto empty = CallInst::Create(isEmptyFuncType, isEmptyFuncPtr, { stateArg }, "empty", block); + const auto readMoreFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::ReadMore<SkipYields>)); + const auto readMoreFuncType = FunctionType::get(Type::getInt1Ty(context), { statePtrType }, false); + const auto readMoreFuncPtr = CastInst::Create(Instruction::IntToPtr, readMoreFunc, PointerType::getUnqual(readMoreFuncType), "read_more_func", block); + const auto readMore = CallInst::Create(readMoreFuncType, readMoreFuncPtr, { stateArg }, "read_more", block); const auto next = BasicBlock::Create(context, "next", ctx.Func); const auto full = BasicBlock::Create(context, "full", ctx.Func); - BranchInst::Create(next, full, empty, block); + BranchInst::Create(next, full, readMore, block); { block = next; @@ -473,9 +491,15 @@ public: block = rest; new StoreInst(ConstantInt::get(last->getType(), static_cast<i32>(EFetchResult::One)), statusPtr, block); - result->addIncoming(last, block); + if constexpr (SkipYields) { + new StoreInst(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::One)), statusPtr, block); - BranchInst::Create(over, block); + BranchInst::Create(pull, block); + } else { + result->addIncoming(last, block); + + BranchInst::Create(over, block); + } block = pull; @@ -487,9 +511,22 @@ public: const auto getres = GetNodeValues(Flow, ctx, block); - const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), static_cast<i32>(EFetchResult::Yield)), "special", block); + if constexpr (SkipYields) { + const auto save = BasicBlock::Create(context, "save", ctx.Func); - BranchInst::Create(done, good, special, block); + const auto way = SwitchInst::Create(getres.first, good, 2U, block); + way->addCase(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), save); + way->addCase(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Finish)), done); + + block = save; + + new StoreInst(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), statusPtr, block); + result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), block); + BranchInst::Create(over, block); + } else { + const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), static_cast<i32>(EFetchResult::Yield)), "special", block); + BranchInst::Create(done, good, special, block); + } block = good; @@ -1128,11 +1165,24 @@ IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeF if constexpr (Last) return new TWideLastCombinerWrapper(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes)); else { - const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<ui64>(); - if (EGraphPerProcess::Single == ctx.GraphPerProcess) - return new TWideCombinerWrapper<true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit); - else - return new TWideCombinerWrapper<false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit); + if constexpr (RuntimeVersion < 46U) { + const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<ui64>(); + if (EGraphPerProcess::Single == ctx.GraphPerProcess) + return new TWideCombinerWrapper<true, false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit); + else + return new TWideCombinerWrapper<false, false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), memLimit); + } else { + if (const auto memLimit = AS_VALUE(TDataLiteral, callable.GetInput(1U))->AsValue().Get<i64>(); memLimit >= 0) + if (EGraphPerProcess::Single == ctx.GraphPerProcess) + return new TWideCombinerWrapper<true, false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), ui64(memLimit)); + else + return new TWideCombinerWrapper<false, false>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), ui64(memLimit)); + else + if (EGraphPerProcess::Single == ctx.GraphPerProcess) + return new TWideCombinerWrapper<true, true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), ui64(-memLimit)); + else + return new TWideCombinerWrapper<false, true>(ctx.Mutables, wide, std::move(nodes), std::move(keyTypes), ui64(-memLimit)); + } } } diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp index 8e49947c0d1..1984d7bde28 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp @@ -1,5 +1,8 @@ #include "mkql_computation_node_ut.h" + +#include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_runtime_version.h> +#include <ydb/library/yql/minikql/mkql_string_util.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> @@ -10,9 +13,112 @@ namespace NKikimr { namespace NMiniKQL { namespace { -const auto border = 9124596000000000ULL; + +constexpr auto border = 9124596000000000ULL; +constexpr ui64 g_Yield = std::numeric_limits<ui64>::max(); +constexpr ui64 g_TestYieldStreamData[] = {0, 1, 0, 2, g_Yield, 0, g_Yield, 1, 2, 0, 1, 3, 0, g_Yield, 1, 2}; + +class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper> { +using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>; +public: + class TStreamValue : public TComputationValue<TStreamValue> { + public: + using TBase = TComputationValue<TStreamValue>; + + TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx) + : TBase(memInfo), CompCtx(compCtx) + {} + private: + NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override { + constexpr auto size = Y_ARRAY_SIZE(g_TestYieldStreamData); + if (Index == size) { + return NUdf::EFetchStatus::Finish; + } + + const auto val = g_TestYieldStreamData[Index]; + if (g_Yield == val) { + ++Index; + return NUdf::EFetchStatus::Yield; + } + + NUdf::TUnboxedValue* items = nullptr; + result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items); + items[0] = NUdf::TUnboxedValuePod(val); + items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val))); + + ++Index; + return NUdf::EFetchStatus::Ok; + } + + private: + TComputationContext& CompCtx; + ui64 Index = 0; + }; + + TTestStreamWrapper(TComputationMutables& mutables) + : TBaseComputation(mutables) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TStreamValue>(ctx); + } +private: + void RegisterDependencies() const final {} +}; + +IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx) { + return new TTestStreamWrapper(ctx.Mutables); +} + +TComputationNodeFactory GetNodeFactory() { + return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + if (callable.GetType()->GetName() == "TestYieldStream") { + return WrapTestStream(ctx); + } + return GetBuiltinFactory()(callable, ctx); + }; } +template <bool LLVM> +TRuntimeNode MakeStream(TSetup<LLVM>& setup) { + TProgramBuilder& pb = *setup.PgmBuilder; + + TCallableBuilder callableBuilder(*setup.Env, "TestYieldStream", + pb.NewStreamType( + pb.NewStructType({ + {TStringBuf("a"), pb.NewDataType(NUdf::EDataSlot::Uint64)}, + {TStringBuf("b"), pb.NewDataType(NUdf::EDataSlot::String)} + }) + ) + ); + + return TRuntimeNode(callableBuilder.Build(), false); +} + +template <bool OverFlow> +TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)> finishLambda) { + const auto keyExtractor = [&](TRuntimeNode item) { + return pb.Member(item, "a"); + }; + const auto init = [&](TRuntimeNode /*key*/, TRuntimeNode item) { + return item; + }; + const auto update = [&](TRuntimeNode /*key*/, TRuntimeNode item, TRuntimeNode state) { + const auto a = pb.Add(pb.Member(item, "a"), pb.Member(state, "a")); + const auto b = pb.Concat(pb.Member(item, "b"), pb.Member(state, "b")); + return pb.NewStruct({ + {TStringBuf("a"), a}, + {TStringBuf("b"), b}, + }); + }; + + return OverFlow ? + pb.FromFlow(pb.CombineCore(pb.ToFlow(stream), keyExtractor, init, update, finishLambda, 64ul << 20)): + pb.CombineCore(stream, keyExtractor, init, update, finishLambda, 64ul << 20); +} + +} // unnamed + #if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) { Y_UNIT_TEST_LLVM(TestLongStringsRefCounting) { @@ -55,7 +161,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), 0ULL, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), -100000LL, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {pb.NewOptional(items.back()), pb.NewOptional(keys.front()), pb.NewEmptyOptional(optionalType), pb.NewEmptyOptional(optionalType)}; @@ -131,7 +237,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), 0ULL, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), -1000000LL, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back(), keys.front(), items.back(), items.front()}; @@ -203,7 +309,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) { const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!"); const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(list), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }), 0ULL, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }), -1000000LL, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, [&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.back(), keys.front(), empty, empty}; @@ -321,6 +427,38 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) { UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } +#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 46u + Y_UNIT_TEST_LLVM(TestHasLimitButPasstroughtYields) { + TSetup<LLVM> setup(GetNodeFactory()); + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto stream = MakeStream<LLVM>(setup); + const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(stream), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Member(item, "a"), pb.Member(item, "b")}; }), -123456789LL, + [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {state.front(), pb.AggrConcat(state.back(), items.back())}; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }), + [&](TRuntimeNode::TList items) { return items.back(); } + )); + const auto graph = setup.BuildGraph(pgmReturn); + const auto streamVal = graph->GetValue(); + NUdf::TUnboxedValue result; + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok); + UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "00000"); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok); + UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "1111"); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok); + UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "222"); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok); + UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "3"); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish); + } +#endif } Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) { diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 367305c09fb..4fce599bdb9 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -4711,11 +4711,17 @@ TRuntimeNode TProgramBuilder::CommonJoinCore(TRuntimeNode flow, EJoinKind joinKi return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, ui64 memLimit, const TWideLambda& extractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish) { +TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, i64 memLimit, const TWideLambda& extractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish) { if constexpr (RuntimeVersion < 18U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } + if (memLimit < 0) { + if constexpr (RuntimeVersion < 46U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__ << " with limit " << memLimit; + } + } + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; @@ -4755,7 +4761,10 @@ TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, ui64 memLimit, con TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems))); callableBuilder.Add(flow); - callableBuilder.Add(NewDataLiteral(memLimit)); + if constexpr (RuntimeVersion < 46U) + callableBuilder.Add(NewDataLiteral(ui64(memLimit))); + else + callableBuilder.Add(NewDataLiteral(memLimit)); callableBuilder.Add(NewDataLiteral(ui32(keyArgs.size()))); callableBuilder.Add(NewDataLiteral(ui32(stateArgs.size()))); std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 16aab5a7f30..c207a4386c9 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -418,7 +418,7 @@ public: TRuntimeNode WideTakeWhileInclusive(TRuntimeNode flow, const TNarrowLambda& handler); TRuntimeNode WideSkipWhileInclusive(TRuntimeNode flow, const TNarrowLambda& handler); - TRuntimeNode WideCombiner(TRuntimeNode flow, ui64 memLimit, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish); + TRuntimeNode WideCombiner(TRuntimeNode flow, i64 memLimit, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish); TRuntimeNode WideLastCombiner(TRuntimeNode flow, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish); TRuntimeNode WideCondense1(TRuntimeNode stream, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& handler, bool useCtx = false); diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index a5650b04fc2..0a90b23d022 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 45U +#define MKQL_RUNTIME_VERSION 46U #endif // History: diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 4bc9a93c77c..5bcba572472 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -733,8 +733,8 @@ TMkqlCommonCallableCompiler::TShared::TShared() { AddCallable("WideCombiner", [](const TExprNode& node, TMkqlBuildContext& ctx) { const auto flow = MkqlBuildExpr(node.Head(), ctx); - ui64 memLimit = 0ULL; - const bool withLimit = TryFromString<ui64>(node.Child(1U)->Content(), memLimit); + i64 memLimit = 0LL; + const bool withLimit = TryFromString<i64>(node.Child(1U)->Content(), memLimit); const auto keyExtractor = [&](TRuntimeNode::TList items) { return MkqlBuildWideLambda(*node.Child(2U), ctx, items); |