diff options
author | mfilitov <mfilitov@yandex-team.com> | 2024-11-27 14:26:51 +0300 |
---|---|---|
committer | mfilitov <mfilitov@yandex-team.com> | 2024-11-27 14:42:03 +0300 |
commit | 2d4ccf80e7eefd304f113a94a6d1a1b2e9eb5d4c (patch) | |
tree | 197487e1342a7f1117337c495276817896771a05 /yql | |
parent | ffe056ac974135f1d74039e670e77c7104942e66 (diff) | |
download | ydb-2d4ccf80e7eefd304f113a94a6d1a1b2e9eb5d4c.tar.gz |
Fix memlimit with skip yields
В WideCombine неправильно работал MemLimit при включении skipYields. Память замерялась при заходе в DoCalculate и дальше при каждом Fetch проверялось, что разница между текущим значением и сохраненным изначально не превышает MemLimit.
При включении skipYields эта логика ломается, так как разница в памяти между заходом в DoCalculate и Fetch не учитывает данные, накопленные на предыдущей итерации, когда случился Yield, но комбайнер ничего не вернул.
Теперь разница накопленные на предыдущей итегации данные учитываются при проверке лимита
commit_hash:299c172834c0b3657ed0ff2ab4171a7f18f5126c
Diffstat (limited to 'yql')
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp | 49 | ||||
-rw-r--r-- | yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp | 77 |
2 files changed, 107 insertions, 19 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index 75e5e0dd6f..55fd5800b8 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -315,6 +315,7 @@ public: CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod()); CurrentPosition = 0; Tongue = CurrentPage->data(); + StoredDataSize = 0; CleanupCurrentContext(); return true; @@ -345,6 +346,7 @@ public: EFetchResult InputStatus = EFetchResult::One; NUdf::TUnboxedValuePod* Tongue = nullptr; NUdf::TUnboxedValuePod* Throat = nullptr; + i64 StoredDataSize = 0; private: std::optional<TStorageIterator> ExtractIt; @@ -904,6 +906,7 @@ private: llvm::IntegerType* ValueType; llvm::PointerType* PtrValueType; llvm::IntegerType* StatusType; + llvm::IntegerType* StoredType; protected: using TBase::Context; public: @@ -912,6 +915,7 @@ public: result.emplace_back(StatusType); //status result.emplace_back(PtrValueType); //tongue result.emplace_back(PtrValueType); //throat + result.emplace_back(StoredType); //StoredDataSize result.emplace_back(Type::getInt32Ty(Context)); //size result.emplace_back(Type::getInt32Ty(Context)); //size return result; @@ -929,11 +933,16 @@ public: return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 2); } + llvm::Constant* GetStored() { + return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 3); + } + TLLVMFieldsStructureState(llvm::LLVMContext& context) : TBase(context) , ValueType(Type::getInt128Ty(Context)) , PtrValueType(PointerType::getUnqual(ValueType)) - , StatusType(Type::getInt32Ty(Context)) { + , StatusType(Type::getInt32Ty(Context)) + , StoredType(Type::getInt64Ty(Context)) { } }; @@ -988,6 +997,10 @@ public: ptr->InputStatus = Flow->FetchValues(ctx, fields); if constexpr (SkipYields) { if (EFetchResult::Yield == ptr->InputStatus) { + if (MemLimit) { + const auto currentUsage = ctx.HolderFactory.GetMemoryUsed(); + ptr->StoredDataSize += currentUsage > initUsage ? currentUsage - initUsage : 0; + } return EFetchResult::Yield; } else if (EFetchResult::Finish == ptr->InputStatus) { break; @@ -1000,7 +1013,7 @@ public: Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat)); - } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage)); + } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize)); ptr->PushStat(ctx.Stats); } @@ -1019,6 +1032,7 @@ public: const auto valueType = Type::getInt128Ty(context); const auto ptrValueType = PointerType::getUnqual(valueType); const auto statusType = Type::getInt32Ty(context); + const auto storedType = Type::getInt64Ty(context); TLLVMFieldsStructureState stateFields(context); const auto stateType = StructType::get(context, stateFields.GetFieldsArray()); @@ -1113,6 +1127,28 @@ public: block = save; + if (MemLimit) { + const auto storedPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetStored() }, "stored_ptr", block); + const auto lastStored = new LoadInst(storedType, storedPtr, "last_stored", block); + const auto currentUsage = GetMemoryUsed(MemLimit, ctx, block); + + + const auto skipSavingUsed = BasicBlock::Create(context, "skip_saving_used", ctx.Func); + const auto saveUsed = BasicBlock::Create(context, "save_used", ctx.Func); + const auto check = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGE, currentUsage, used, "check", block); + BranchInst::Create(saveUsed, skipSavingUsed, check, block); + + block = saveUsed; + + const auto usedMemory = BinaryOperator::CreateSub(GetMemoryUsed(MemLimit, ctx, block), used, "used_memory", block); + const auto inc = BinaryOperator::CreateAdd(lastStored, usedMemory, "inc", block); + new StoreInst(inc, storedPtr, block); + + BranchInst::Create(skipSavingUsed, block); + + block = skipSavingUsed; + } + new StoreInst(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), statusPtr, block); result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), block); BranchInst::Create(over, block); @@ -1249,7 +1285,14 @@ public: block = test; - const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, used, ctx, block); + auto totalUsed = used; + if (MemLimit) { + const auto storedPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetStored() }, "stored_ptr", block); + const auto lastStored = new LoadInst(storedType, storedPtr, "last_stored", block); + totalUsed = BinaryOperator::CreateSub(used, lastStored, "decr", block); + } + + const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, totalUsed, ctx, block); BranchInst::Create(done, loop, check, block); block = done; diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp index 904f38ac67..8faf061d3b 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp @@ -15,8 +15,13 @@ namespace NMiniKQL { namespace { constexpr auto border = 9124596000000000ULL; -constexpr ui64 g_Yield = std::numeric_limits<ui64>::max(); -constexpr ui64 g_TestYieldStreamData[] = {0, 1, 0, 2, g_Yield, 0, g_Yield, 1, 2, 0, 1, 3, 0, g_Yield, 1, 2}; + +struct TTestStreamParams { + static constexpr ui64 Yield = std::numeric_limits<ui64>::max(); + + ui64 StringSize = 1; + std::vector<ui64> TestYieldStreamData; +}; class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper> { using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>; @@ -25,19 +30,19 @@ public: public: using TBase = TComputationValue<TStreamValue>; - TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx) - : TBase(memInfo), CompCtx(compCtx) + TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, TTestStreamParams& params) + : TBase(memInfo), CompCtx(compCtx), Params(params) {} private: NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override { - constexpr auto size = Y_ARRAY_SIZE(g_TestYieldStreamData); + auto size = Params.TestYieldStreamData.size(); if (Index == size) { return NUdf::EFetchStatus::Finish; } - const auto val = g_TestYieldStreamData[Index]; - if (g_Yield == val) { + const auto val = Params.TestYieldStreamData[Index]; + if (Params.Yield == val) { ++Index; return NUdf::EFetchStatus::Yield; } @@ -45,7 +50,7 @@ public: NUdf::TUnboxedValue* items = nullptr; result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items); items[0] = NUdf::TUnboxedValuePod(val); - items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val))); + items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val) * Params.StringSize)); ++Index; @@ -55,27 +60,31 @@ public: private: TComputationContext& CompCtx; ui64 Index = 0; + TTestStreamParams& Params; }; - TTestStreamWrapper(TComputationMutables& mutables) + TTestStreamWrapper(TComputationMutables& mutables, TTestStreamParams& params) : TBaseComputation(mutables) + , Params(params) {} NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - return ctx.HolderFactory.Create<TStreamValue>(ctx); + return ctx.HolderFactory.Create<TStreamValue>(ctx, Params); } private: void RegisterDependencies() const final {} + + TTestStreamParams& Params; }; -IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx) { - return new TTestStreamWrapper(ctx.Mutables); +IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx, TTestStreamParams& params) { + return new TTestStreamWrapper(ctx.Mutables, params); } -TComputationNodeFactory GetNodeFactory() { - return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { +TComputationNodeFactory GetNodeFactory(TTestStreamParams& params) { + return [¶ms](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { if (callable.GetType()->GetName() == "TestYieldStream") { - return WrapTestStream(ctx); + return WrapTestStream(ctx, params); } return GetBuiltinFactory()(callable, ctx); }; @@ -456,7 +465,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) { } #if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 46u Y_UNIT_TEST_LLVM(TestHasLimitButPasstroughtYields) { - TSetup<LLVM> setup(GetNodeFactory()); + TTestStreamParams params; + params.TestYieldStreamData = {0, 1, 0, 2, TTestStreamParams::Yield, 0, TTestStreamParams::Yield, 1, 2, 0, 1, 3, 0, TTestStreamParams::Yield, 1, 2}; + TSetup<LLVM> setup(GetNodeFactory(params)); TProgramBuilder& pb = *setup.PgmBuilder; const auto stream = MakeStream<LLVM>(setup); @@ -486,6 +497,40 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) { UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish); } #endif +#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 46u + Y_UNIT_TEST_LLVM(TestSkipYieldRespectsMemLimit) { + TTestStreamParams params; + params.StringSize = 50000; + params.TestYieldStreamData = {0, TTestStreamParams::Yield, 2, TTestStreamParams::Yield, 3, TTestStreamParams::Yield, 4}; + TSetup<LLVM> setup(GetNodeFactory(params)); + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto stream = MakeStream<LLVM>(setup); + const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(stream), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Member(item, "a"), pb.Member(item, "b")}; }), -100000LL, + [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {state.front(), pb.AggrConcat(state.back(), items.back())}; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }), + [&](TRuntimeNode::TList items) { return items.back(); } + )); + const auto graph = setup.BuildGraph(pgmReturn); + const auto streamVal = graph->GetValue(); + NUdf::TUnboxedValue result; + + // skip first 2 yields + UNIT_ASSERT_VALUES_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield); + // return all the collected values + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish); + UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish); + } +#endif } Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) { |