aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormfilitov <mfilitov@yandex-team.com>2025-01-30 16:03:55 +0300
committermfilitov <mfilitov@yandex-team.com>2025-01-30 16:24:09 +0300
commit84ee27dd08dbee487d76317a243f3eb1633c1b84 (patch)
tree8229b63ac69b8235447edb3de71308f85cded2ec
parente0a1b4cfd271b5ede39825cf60d93f619c8d1c58 (diff)
downloadydb-84ee27dd08dbee487d76317a243f3eb1633c1b84.tar.gz
flush widecombiner earlier in case of low memory
Fixes for tpch10000 q4 crashing with memLimit. Changes: 1. MemLimit exception is now handled in WideCombiner. Now, if there is not enough memory during hashtable grow, WideCombiner will flush and send all the accumulated data to WideLastCombiner. 2. WideCombiner will also flush all the data in case if allocator is in YellowZone. In this case data will be flushed and hashtable will be recreated. commit_hash:bf2dcd3b3ddbfb08c39c5a698f10bbd0f4ad14bd
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp72
1 files changed, 45 insertions, 27 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
index b755ab1551..e14f3c5082 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -31,6 +31,10 @@ extern TStatKey Combine_MaxRowsCount;
namespace {
+bool HasMemoryForProcessing() {
+ return !TlsAllocState->IsMemoryYellowZoneEnabled();
+}
+
struct TMyValueEqual {
TMyValueEqual(const TKeyTypes& types)
: Types(types)
@@ -244,11 +248,12 @@ private:
return KeyWidth + StateWidth;
}
public:
- TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false)
- : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) {
+ TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = true)
+ : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), Hash(hash), Equal(equal) {
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
CurrentPosition = 0;
Tongue = CurrentPage->data();
+ States = std::make_unique<TStates>(Hash, Equal, CountRowsOnPage);
}
~TState() {
@@ -261,7 +266,7 @@ public:
ExtractIt.reset();
Storage.clear();
- States.Clear();
+ States->Clear();
CleanupCurrentContext();
}
@@ -269,7 +274,7 @@ public:
bool TasteIt() {
Y_ABORT_UNLESS(!ExtractIt);
bool isNew = false;
- auto itInsert = States.Insert(Tongue, isNew);
+ auto itInsert = States->Insert(Tongue, isNew);
if (isNew) {
CurrentPosition += RowSize();
if (CurrentPosition == CurrentPage->size()) {
@@ -278,16 +283,17 @@ public:
}
Tongue = CurrentPage->data() + CurrentPosition;
}
- Throat = States.GetKey(itInsert) + KeyWidth;
+ Throat = States->GetKey(itInsert) + KeyWidth;
if (isNew) {
GrowStates();
}
+ IsOutOfMemory = IsOutOfMemory || (!HasMemoryForProcessing() && States->GetSize() > 1000);
return isNew;
}
void GrowStates() {
try {
- States.CheckGrow();
+ States->CheckGrow();
} catch (TMemoryLimitExceededException) {
YQL_LOG(INFO) << "State failed to grow";
if (IsOutOfMemory || !AllowOutOfMemory) {
@@ -298,10 +304,6 @@ public:
}
}
- bool CheckIsOutOfMemory() const {
- return IsOutOfMemory;
- }
-
template<bool SkipYields>
bool ReadMore() {
if constexpr (SkipYields) {
@@ -309,38 +311,44 @@ public:
return true;
}
- if (!States.Empty())
+ if (!States->Empty())
return false;
{
TStorage localStorage;
std::swap(localStorage, Storage);
}
+
+ if (IsOutOfMemory) {
+ States = std::make_unique<TStates>(Hash, Equal, CountRowsOnPage);
+ }
+
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
CurrentPosition = 0;
Tongue = CurrentPage->data();
StoredDataSize = 0;
+ IsOutOfMemory = false;
CleanupCurrentContext();
return true;
}
void PushStat(IStatsRegistry* stats) const {
- if (!States.Empty()) {
- MKQL_SET_MAX_STAT(stats, Combine_MaxRowsCount, static_cast<i64>(States.GetSize()));
+ if (!States->Empty()) {
+ MKQL_SET_MAX_STAT(stats, Combine_MaxRowsCount, static_cast<i64>(States->GetSize()));
MKQL_INC_STAT(stats, Combine_FlushesCount);
}
}
NUdf::TUnboxedValuePod* Extract() {
if (!ExtractIt) {
- ExtractIt.emplace(Storage, RowSize(), States.GetSize());
+ ExtractIt.emplace(Storage, RowSize(), States->GetSize());
} else {
ExtractIt->Next();
}
if (!ExtractIt->IsValid()) {
ExtractIt.reset();
- States.Clear();
+ States->Clear();
return nullptr;
}
NUdf::TUnboxedValuePod* result = ExtractIt->GetValuePtr();
@@ -352,17 +360,19 @@ public:
NUdf::TUnboxedValuePod* Tongue = nullptr;
NUdf::TUnboxedValuePod* Throat = nullptr;
i64 StoredDataSize = 0;
+ bool IsOutOfMemory = false;
NYql::NUdf::TCounter CounterOutputRows_;
private:
std::optional<TStorageIterator> ExtractIt;
const ui32 KeyWidth, StateWidth;
const bool AllowOutOfMemory;
- bool IsOutOfMemory = false;
ui64 CurrentPosition = 0;
TRow* CurrentPage = nullptr;
TStorage Storage;
- TStates States;
+ std::unique_ptr<TStates> States;
+ const THashFunc Hash;
+ const TEqualsFunc Equal;
};
class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
@@ -484,7 +494,7 @@ public:
ETasteResult TasteIt() {
if (GetMode() == EOperatingMode::InMemory) {
bool isNew = InMemoryProcessingState.TasteIt();
- if (InMemoryProcessingState.CheckIsOutOfMemory()) {
+ if (InMemoryProcessingState.IsOutOfMemory) {
StateWantsToSpill = true;
}
Throat = InMemoryProcessingState.Throat;
@@ -861,7 +871,7 @@ private:
for (auto &b: SpilledBuckets) {
b.SpilledState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, KeyAndStateType, 5_MB);
b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, UsedInputItemType, 5_MB);
- b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal);
+ b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, false);
}
break;
}
@@ -889,10 +899,6 @@ private:
Mode = mode;
}
- bool HasMemoryForProcessing() const {
- return !TlsAllocState->IsMemoryYellowZoneEnabled();
- }
-
bool IsSwitchToSpillingModeCondition() const {
return !HasMemoryForProcessing() || TlsAllocState->GetMaximumLimitValueReached();
}
@@ -942,6 +948,7 @@ private:
llvm::PointerType* PtrValueType;
llvm::IntegerType* StatusType;
llvm::IntegerType* StoredType;
+ llvm::IntegerType* BoolType;
protected:
using TBase::Context;
public:
@@ -951,6 +958,7 @@ public:
result.emplace_back(PtrValueType); //tongue
result.emplace_back(PtrValueType); //throat
result.emplace_back(StoredType); //StoredDataSize
+ result.emplace_back(BoolType); //IsOutOfMemory
result.emplace_back(Type::getInt32Ty(Context)); //size
result.emplace_back(Type::getInt32Ty(Context)); //size
return result;
@@ -972,13 +980,17 @@ public:
return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 3);
}
+ llvm::Constant* GetIsOutOfMemory() {
+ return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 4);
+ }
+
TLLVMFieldsStructureState(llvm::LLVMContext& context)
: TBase(context)
, ValueType(Type::getInt128Ty(Context))
, PtrValueType(PointerType::getUnqual(ValueType))
, StatusType(Type::getInt32Ty(Context))
- , StoredType(Type::getInt64Ty(Context)) {
-
+ , StoredType(Type::getInt64Ty(Context))
+ , BoolType(Type::getInt1Ty(Context)) {
}
};
#endif
@@ -1048,7 +1060,7 @@ public:
Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
- } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize));
+ } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize) && !ptr->IsOutOfMemory);
ptr->PushStat(ctx.Stats);
}
@@ -1328,7 +1340,13 @@ public:
}
const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, totalUsed, ctx, block);
- BranchInst::Create(done, loop, check, block);
+
+ const auto isOutOfMemoryPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetIsOutOfMemory() }, "is_out_of_memory_ptr", block);
+ const auto isOutOfMemory = new LoadInst(Type::getInt1Ty(context), isOutOfMemoryPtr, "is_out_of_memory", block);
+ const auto checkIsOutOfMemory = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, isOutOfMemory, ConstantInt::getTrue(context), "check_is_out_of_memory", block);
+
+ const auto any = BinaryOperator::CreateOr(check, checkIsOutOfMemory, "any", block);
+ BranchInst::Create(done, loop, any, block);
block = done;