aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormfilitov <mfilitov@yandex-team.com>2025-01-10 13:35:51 +0300
committermfilitov <mfilitov@yandex-team.com>2025-01-10 14:26:36 +0300
commitb830866dd04884b8eec69bab3c745c7c25ecb3f4 (patch)
tree57a7d7f3028fa56d51b03ee3a9d6ec0765b11534
parent1fcf8fe3af79804f1afd9cab433e99f995f3b322 (diff)
downloadydb-b830866dd04884b8eec69bab3c745c7c25ecb3f4.tar.gz
added more cases to switch to spilling and to flush data
Fixes for tpch10000 q4 crashing with memLimit. Changes: 1. MemLimit exception is now handled in WideCombiner. Now, if there is not enough memory during hashtable grow, WideCombiner will flush and send all the accumulated data to WideLastCombiner. 2. Handled the exception when there is not enough memory in the bucket after the state has been split. Now, if the hashtable grows unsuccessfully, the bucket is sent to spilling. commit_hash:0e19a47b070954414cc67d34f09ba8cb74a21fd1
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp47
1 files changed, 31 insertions, 16 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
index 1407d63cdc..4c3bf3a059 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -31,6 +31,10 @@ extern TStatKey Combine_MaxRowsCount;
namespace {
+bool HasMemoryForProcessing() {
+ return !TlsAllocState->IsMemoryYellowZoneEnabled();
+}
+
struct TMyValueEqual {
TMyValueEqual(const TKeyTypes& types)
: Types(types)
@@ -244,7 +248,7 @@ private:
return KeyWidth + StateWidth;
}
public:
- TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false)
+ TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = true)
: TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) {
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
CurrentPosition = 0;
@@ -282,6 +286,7 @@ public:
if (isNew) {
GrowStates();
}
+ IsOutOfMemory = IsOutOfMemory || !HasMemoryForProcessing();
return isNew;
}
@@ -298,10 +303,6 @@ public:
}
}
- bool CheckIsOutOfMemory() const {
- return IsOutOfMemory;
- }
-
template<bool SkipYields>
bool ReadMore() {
if constexpr (SkipYields) {
@@ -320,6 +321,7 @@ public:
CurrentPosition = 0;
Tongue = CurrentPage->data();
StoredDataSize = 0;
+ IsOutOfMemory = false;
CleanupCurrentContext();
return true;
@@ -352,13 +354,13 @@ public:
NUdf::TUnboxedValuePod* Tongue = nullptr;
NUdf::TUnboxedValuePod* Throat = nullptr;
i64 StoredDataSize = 0;
+ bool IsOutOfMemory = false;
NYql::NUdf::TCounter CounterOutputRows_;
private:
std::optional<TStorageIterator> ExtractIt;
const ui32 KeyWidth, StateWidth;
const bool AllowOutOfMemory;
- bool IsOutOfMemory = false;
ui64 CurrentPosition = 0;
TRow* CurrentPage = nullptr;
TStorage Storage;
@@ -484,7 +486,7 @@ public:
ETasteResult TasteIt() {
if (GetMode() == EOperatingMode::InMemory) {
bool isNew = InMemoryProcessingState.TasteIt();
- if (InMemoryProcessingState.CheckIsOutOfMemory()) {
+ if (InMemoryProcessingState.IsOutOfMemory) {
StateWantsToSpill = true;
}
Throat = InMemoryProcessingState.Throat;
@@ -653,8 +655,12 @@ private:
static_cast<NUdf::TUnboxedValue&>(processingState.Throat[i - KeyWidth]) = std::move(keyAndState[i]);
}
- if (InMemoryBucketsCount && !HasMemoryForProcessing() && IsSpillingWhileStateSplitAllowed()) {
+ if (InMemoryBucketsCount && !HasMemoryForProcessing() && IsSpillingWhileStateSplitAllowed() || processingState.IsOutOfMemory) {
ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber();
+ if (processingState.IsOutOfMemory) {
+ bucketNumToSpill = bucketId;
+ processingState.IsOutOfMemory = false;
+ }
SplitStateSpillingBucket = bucketNumToSpill;
@@ -861,7 +867,7 @@ private:
for (auto &b: SpilledBuckets) {
b.SpilledState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, KeyAndStateType, 5_MB);
b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, UsedInputItemType, 5_MB);
- b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal);
+ b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, false);
}
break;
}
@@ -889,10 +895,6 @@ private:
Mode = mode;
}
- bool HasMemoryForProcessing() const {
- return !TlsAllocState->IsMemoryYellowZoneEnabled();
- }
-
bool IsSwitchToSpillingModeCondition() const {
return !HasMemoryForProcessing() || TlsAllocState->GetMaximumLimitValueReached();
}
@@ -942,6 +944,7 @@ private:
llvm::PointerType* PtrValueType;
llvm::IntegerType* StatusType;
llvm::IntegerType* StoredType;
+ llvm::IntegerType* BoolType;
protected:
using TBase::Context;
public:
@@ -951,6 +954,7 @@ public:
result.emplace_back(PtrValueType); //tongue
result.emplace_back(PtrValueType); //throat
result.emplace_back(StoredType); //StoredDataSize
+ result.emplace_back(BoolType); //IsOutOfMemory
result.emplace_back(Type::getInt32Ty(Context)); //size
result.emplace_back(Type::getInt32Ty(Context)); //size
return result;
@@ -972,12 +976,17 @@ public:
return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 3);
}
+ llvm::Constant* GetIsOutOfMemory() {
+ return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 4);
+ }
+
TLLVMFieldsStructureState(llvm::LLVMContext& context)
: TBase(context)
, ValueType(Type::getInt128Ty(Context))
, PtrValueType(PointerType::getUnqual(ValueType))
, StatusType(Type::getInt32Ty(Context))
- , StoredType(Type::getInt64Ty(Context)) {
+ , StoredType(Type::getInt64Ty(Context))
+ , BoolType(Type::getInt1Ty(Context)) {
}
};
@@ -1048,7 +1057,7 @@ public:
Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
- } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize));
+ } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize) && !ptr->IsOutOfMemory);
ptr->PushStat(ctx.Stats);
}
@@ -1328,7 +1337,13 @@ public:
}
const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, totalUsed, ctx, block);
- BranchInst::Create(done, loop, check, block);
+
+ const auto isOutOfMemoryPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetIsOutOfMemory() }, "is_out_of_memory_ptr", block);
+ const auto isOutOfMemory = new LoadInst(Type::getInt1Ty(context), isOutOfMemoryPtr, "is_out_of_memory", block);
+ const auto checkIsOutOfMemory = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, isOutOfMemory, ConstantInt::getTrue(context), "check_is_out_of_memory", block);
+
+ const auto any = BinaryOperator::CreateOr(check, checkIsOutOfMemory, "any", block);
+ BranchInst::Create(done, loop, any, block);
block = done;