diff options
author | mfilitov <mfilitov@yandex-team.com> | 2025-01-10 13:35:51 +0300 |
---|---|---|
committer | mfilitov <mfilitov@yandex-team.com> | 2025-01-10 14:26:36 +0300 |
commit | b830866dd04884b8eec69bab3c745c7c25ecb3f4 (patch) | |
tree | 57a7d7f3028fa56d51b03ee3a9d6ec0765b11534 | |
parent | 1fcf8fe3af79804f1afd9cab433e99f995f3b322 (diff) | |
download | ydb-b830866dd04884b8eec69bab3c745c7c25ecb3f4.tar.gz |
added more cases to switch to spilling and to flush data
Fixes for tpch10000 q4 crashing with memLimit.
Changes:
1. MemLimit exception is now handled in WideCombiner. Now, if there is not enough memory during hashtable grow, WideCombiner will flush and send all the accumulated data to WideLastCombiner.
2. Handled the exception when there is not enough memory in the bucket after the state has been split. Now, if the hashtable grows unsuccessfully, the bucket is sent to spilling.
commit_hash:0e19a47b070954414cc67d34f09ba8cb74a21fd1
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp | 47 |
1 files changed, 31 insertions, 16 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index 1407d63cdc..4c3bf3a059 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -31,6 +31,10 @@ extern TStatKey Combine_MaxRowsCount; namespace { +bool HasMemoryForProcessing() { + return !TlsAllocState->IsMemoryYellowZoneEnabled(); +} + struct TMyValueEqual { TMyValueEqual(const TKeyTypes& types) : Types(types) @@ -244,7 +248,7 @@ private: return KeyWidth + StateWidth; } public: - TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false) + TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = true) : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) { CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod()); CurrentPosition = 0; @@ -282,6 +286,7 @@ public: if (isNew) { GrowStates(); } + IsOutOfMemory = IsOutOfMemory || !HasMemoryForProcessing(); return isNew; } @@ -298,10 +303,6 @@ public: } } - bool CheckIsOutOfMemory() const { - return IsOutOfMemory; - } - template<bool SkipYields> bool ReadMore() { if constexpr (SkipYields) { @@ -320,6 +321,7 @@ public: CurrentPosition = 0; Tongue = CurrentPage->data(); StoredDataSize = 0; + IsOutOfMemory = false; CleanupCurrentContext(); return true; @@ -352,13 +354,13 @@ public: NUdf::TUnboxedValuePod* Tongue = nullptr; NUdf::TUnboxedValuePod* Throat = nullptr; i64 StoredDataSize = 0; + bool IsOutOfMemory = false; NYql::NUdf::TCounter CounterOutputRows_; private: std::optional<TStorageIterator> ExtractIt; const ui32 KeyWidth, StateWidth; const bool AllowOutOfMemory; - bool IsOutOfMemory = false; ui64 CurrentPosition = 0; TRow* CurrentPage = nullptr; TStorage Storage; @@ -484,7 +486,7 @@ public: ETasteResult TasteIt() { if (GetMode() == EOperatingMode::InMemory) { bool isNew = InMemoryProcessingState.TasteIt(); - if (InMemoryProcessingState.CheckIsOutOfMemory()) { + if (InMemoryProcessingState.IsOutOfMemory) { StateWantsToSpill = true; } Throat = InMemoryProcessingState.Throat; @@ -653,8 +655,12 @@ private: static_cast<NUdf::TUnboxedValue&>(processingState.Throat[i - KeyWidth]) = std::move(keyAndState[i]); } - if (InMemoryBucketsCount && !HasMemoryForProcessing() && IsSpillingWhileStateSplitAllowed()) { + if (InMemoryBucketsCount && !HasMemoryForProcessing() && IsSpillingWhileStateSplitAllowed() || processingState.IsOutOfMemory) { ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber(); + if (processingState.IsOutOfMemory) { + bucketNumToSpill = bucketId; + processingState.IsOutOfMemory = false; + } SplitStateSpillingBucket = bucketNumToSpill; @@ -861,7 +867,7 @@ private: for (auto &b: SpilledBuckets) { b.SpilledState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, KeyAndStateType, 5_MB); b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, UsedInputItemType, 5_MB); - b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal); + b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, false); } break; } @@ -889,10 +895,6 @@ private: Mode = mode; } - bool HasMemoryForProcessing() const { - return !TlsAllocState->IsMemoryYellowZoneEnabled(); - } - bool IsSwitchToSpillingModeCondition() const { return !HasMemoryForProcessing() || TlsAllocState->GetMaximumLimitValueReached(); } @@ -942,6 +944,7 @@ private: llvm::PointerType* PtrValueType; llvm::IntegerType* StatusType; llvm::IntegerType* StoredType; + llvm::IntegerType* BoolType; protected: using TBase::Context; public: @@ -951,6 +954,7 @@ public: result.emplace_back(PtrValueType); //tongue result.emplace_back(PtrValueType); //throat result.emplace_back(StoredType); //StoredDataSize + result.emplace_back(BoolType); //IsOutOfMemory result.emplace_back(Type::getInt32Ty(Context)); //size result.emplace_back(Type::getInt32Ty(Context)); //size return result; @@ -972,12 +976,17 @@ public: return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 3); } + llvm::Constant* GetIsOutOfMemory() { + return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 4); + } + TLLVMFieldsStructureState(llvm::LLVMContext& context) : TBase(context) , ValueType(Type::getInt128Ty(Context)) , PtrValueType(PointerType::getUnqual(ValueType)) , StatusType(Type::getInt32Ty(Context)) - , StoredType(Type::getInt64Ty(Context)) { + , StoredType(Type::getInt64Ty(Context)) + , BoolType(Type::getInt1Ty(Context)) { } }; @@ -1048,7 +1057,7 @@ public: Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue)); Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat)); - } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize)); + } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize) && !ptr->IsOutOfMemory); ptr->PushStat(ctx.Stats); } @@ -1328,7 +1337,13 @@ public: } const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, totalUsed, ctx, block); - BranchInst::Create(done, loop, check, block); + + const auto isOutOfMemoryPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetIsOutOfMemory() }, "is_out_of_memory_ptr", block); + const auto isOutOfMemory = new LoadInst(Type::getInt1Ty(context), isOutOfMemoryPtr, "is_out_of_memory", block); + const auto checkIsOutOfMemory = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, isOutOfMemory, ConstantInt::getTrue(context), "check_is_out_of_memory", block); + + const auto any = BinaryOperator::CreateOr(check, checkIsOutOfMemory, "any", block); + BranchInst::Create(done, loop, any, block); block = done; |