aboutsummaryrefslogtreecommitdiffstats
path: root/yql
diff options
context:
space:
mode:
authormfilitov <mfilitov@yandex-team.com>2024-11-27 14:26:51 +0300
committermfilitov <mfilitov@yandex-team.com>2024-11-27 14:42:03 +0300
commit2d4ccf80e7eefd304f113a94a6d1a1b2e9eb5d4c (patch)
tree197487e1342a7f1117337c495276817896771a05 /yql
parentffe056ac974135f1d74039e670e77c7104942e66 (diff)
downloadydb-2d4ccf80e7eefd304f113a94a6d1a1b2e9eb5d4c.tar.gz
Fix memlimit with skip yields
В WideCombine неправильно работал MemLimit при включении skipYields. Память замерялась при заходе в DoCalculate и дальше при каждом Fetch проверялось, что разница между текущим значением и сохраненным изначально не превышает MemLimit. При включении skipYields эта логика ломается, так как разница в памяти между заходом в DoCalculate и Fetch не учитывает данные, накопленные на предыдущей итерации, когда случился Yield, но комбайнер ничего не вернул. Теперь разница накопленные на предыдущей итегации данные учитываются при проверке лимита commit_hash:299c172834c0b3657ed0ff2ab4171a7f18f5126c
Diffstat (limited to 'yql')
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp49
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp77
2 files changed, 107 insertions, 19 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
index 75e5e0dd6f..55fd5800b8 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -315,6 +315,7 @@ public:
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
CurrentPosition = 0;
Tongue = CurrentPage->data();
+ StoredDataSize = 0;
CleanupCurrentContext();
return true;
@@ -345,6 +346,7 @@ public:
EFetchResult InputStatus = EFetchResult::One;
NUdf::TUnboxedValuePod* Tongue = nullptr;
NUdf::TUnboxedValuePod* Throat = nullptr;
+ i64 StoredDataSize = 0;
private:
std::optional<TStorageIterator> ExtractIt;
@@ -904,6 +906,7 @@ private:
llvm::IntegerType* ValueType;
llvm::PointerType* PtrValueType;
llvm::IntegerType* StatusType;
+ llvm::IntegerType* StoredType;
protected:
using TBase::Context;
public:
@@ -912,6 +915,7 @@ public:
result.emplace_back(StatusType); //status
result.emplace_back(PtrValueType); //tongue
result.emplace_back(PtrValueType); //throat
+ result.emplace_back(StoredType); //StoredDataSize
result.emplace_back(Type::getInt32Ty(Context)); //size
result.emplace_back(Type::getInt32Ty(Context)); //size
return result;
@@ -929,11 +933,16 @@ public:
return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 2);
}
+ llvm::Constant* GetStored() {
+ return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + 3);
+ }
+
TLLVMFieldsStructureState(llvm::LLVMContext& context)
: TBase(context)
, ValueType(Type::getInt128Ty(Context))
, PtrValueType(PointerType::getUnqual(ValueType))
- , StatusType(Type::getInt32Ty(Context)) {
+ , StatusType(Type::getInt32Ty(Context))
+ , StoredType(Type::getInt64Ty(Context)) {
}
};
@@ -988,6 +997,10 @@ public:
ptr->InputStatus = Flow->FetchValues(ctx, fields);
if constexpr (SkipYields) {
if (EFetchResult::Yield == ptr->InputStatus) {
+ if (MemLimit) {
+ const auto currentUsage = ctx.HolderFactory.GetMemoryUsed();
+ ptr->StoredDataSize += currentUsage > initUsage ? currentUsage - initUsage : 0;
+ }
return EFetchResult::Yield;
} else if (EFetchResult::Finish == ptr->InputStatus) {
break;
@@ -1000,7 +1013,7 @@ public:
Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
- } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage));
+ } while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize));
ptr->PushStat(ctx.Stats);
}
@@ -1019,6 +1032,7 @@ public:
const auto valueType = Type::getInt128Ty(context);
const auto ptrValueType = PointerType::getUnqual(valueType);
const auto statusType = Type::getInt32Ty(context);
+ const auto storedType = Type::getInt64Ty(context);
TLLVMFieldsStructureState stateFields(context);
const auto stateType = StructType::get(context, stateFields.GetFieldsArray());
@@ -1113,6 +1127,28 @@ public:
block = save;
+ if (MemLimit) {
+ const auto storedPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetStored() }, "stored_ptr", block);
+ const auto lastStored = new LoadInst(storedType, storedPtr, "last_stored", block);
+ const auto currentUsage = GetMemoryUsed(MemLimit, ctx, block);
+
+
+ const auto skipSavingUsed = BasicBlock::Create(context, "skip_saving_used", ctx.Func);
+ const auto saveUsed = BasicBlock::Create(context, "save_used", ctx.Func);
+ const auto check = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGE, currentUsage, used, "check", block);
+ BranchInst::Create(saveUsed, skipSavingUsed, check, block);
+
+ block = saveUsed;
+
+ const auto usedMemory = BinaryOperator::CreateSub(GetMemoryUsed(MemLimit, ctx, block), used, "used_memory", block);
+ const auto inc = BinaryOperator::CreateAdd(lastStored, usedMemory, "inc", block);
+ new StoreInst(inc, storedPtr, block);
+
+ BranchInst::Create(skipSavingUsed, block);
+
+ block = skipSavingUsed;
+ }
+
new StoreInst(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), statusPtr, block);
result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), block);
BranchInst::Create(over, block);
@@ -1249,7 +1285,14 @@ public:
block = test;
- const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, used, ctx, block);
+ auto totalUsed = used;
+ if (MemLimit) {
+ const auto storedPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetStored() }, "stored_ptr", block);
+ const auto lastStored = new LoadInst(storedType, storedPtr, "last_stored", block);
+ totalUsed = BinaryOperator::CreateSub(used, lastStored, "decr", block);
+ }
+
+ const auto check = CheckAdjustedMemLimit<TrackRss>(MemLimit, totalUsed, ctx, block);
BranchInst::Create(done, loop, check, block);
block = done;
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
index 904f38ac67..8faf061d3b 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
@@ -15,8 +15,13 @@ namespace NMiniKQL {
namespace {
constexpr auto border = 9124596000000000ULL;
-constexpr ui64 g_Yield = std::numeric_limits<ui64>::max();
-constexpr ui64 g_TestYieldStreamData[] = {0, 1, 0, 2, g_Yield, 0, g_Yield, 1, 2, 0, 1, 3, 0, g_Yield, 1, 2};
+
+struct TTestStreamParams {
+ static constexpr ui64 Yield = std::numeric_limits<ui64>::max();
+
+ ui64 StringSize = 1;
+ std::vector<ui64> TestYieldStreamData;
+};
class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper> {
using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
@@ -25,19 +30,19 @@ public:
public:
using TBase = TComputationValue<TStreamValue>;
- TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx)
- : TBase(memInfo), CompCtx(compCtx)
+ TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, TTestStreamParams& params)
+ : TBase(memInfo), CompCtx(compCtx), Params(params)
{}
private:
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
- constexpr auto size = Y_ARRAY_SIZE(g_TestYieldStreamData);
+ auto size = Params.TestYieldStreamData.size();
if (Index == size) {
return NUdf::EFetchStatus::Finish;
}
- const auto val = g_TestYieldStreamData[Index];
- if (g_Yield == val) {
+ const auto val = Params.TestYieldStreamData[Index];
+ if (Params.Yield == val) {
++Index;
return NUdf::EFetchStatus::Yield;
}
@@ -45,7 +50,7 @@ public:
NUdf::TUnboxedValue* items = nullptr;
result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items);
items[0] = NUdf::TUnboxedValuePod(val);
- items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val)));
+ items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val) * Params.StringSize));
++Index;
@@ -55,27 +60,31 @@ public:
private:
TComputationContext& CompCtx;
ui64 Index = 0;
+ TTestStreamParams& Params;
};
- TTestStreamWrapper(TComputationMutables& mutables)
+ TTestStreamWrapper(TComputationMutables& mutables, TTestStreamParams& params)
: TBaseComputation(mutables)
+ , Params(params)
{}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- return ctx.HolderFactory.Create<TStreamValue>(ctx);
+ return ctx.HolderFactory.Create<TStreamValue>(ctx, Params);
}
private:
void RegisterDependencies() const final {}
+
+ TTestStreamParams& Params;
};
-IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx) {
- return new TTestStreamWrapper(ctx.Mutables);
+IComputationNode* WrapTestStream(const TComputationNodeFactoryContext& ctx, TTestStreamParams& params) {
+ return new TTestStreamWrapper(ctx.Mutables, params);
}
-TComputationNodeFactory GetNodeFactory() {
- return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+TComputationNodeFactory GetNodeFactory(TTestStreamParams& params) {
+ return [&params](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
if (callable.GetType()->GetName() == "TestYieldStream") {
- return WrapTestStream(ctx);
+ return WrapTestStream(ctx, params);
}
return GetBuiltinFactory()(callable, ctx);
};
@@ -456,7 +465,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
}
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 46u
Y_UNIT_TEST_LLVM(TestHasLimitButPasstroughtYields) {
- TSetup<LLVM> setup(GetNodeFactory());
+ TTestStreamParams params;
+ params.TestYieldStreamData = {0, 1, 0, 2, TTestStreamParams::Yield, 0, TTestStreamParams::Yield, 1, 2, 0, 1, 3, 0, TTestStreamParams::Yield, 1, 2};
+ TSetup<LLVM> setup(GetNodeFactory(params));
TProgramBuilder& pb = *setup.PgmBuilder;
const auto stream = MakeStream<LLVM>(setup);
@@ -486,6 +497,40 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
}
#endif
+#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 46u
+ Y_UNIT_TEST_LLVM(TestSkipYieldRespectsMemLimit) {
+ TTestStreamParams params;
+ params.StringSize = 50000;
+ params.TestYieldStreamData = {0, TTestStreamParams::Yield, 2, TTestStreamParams::Yield, 3, TTestStreamParams::Yield, 4};
+ TSetup<LLVM> setup(GetNodeFactory(params));
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto stream = MakeStream<LLVM>(setup);
+ const auto pgmReturn = pb.FromFlow(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(stream),
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Member(item, "a"), pb.Member(item, "b")}; }), -100000LL,
+ [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {state.front(), pb.AggrConcat(state.back(), items.back())}; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
+ [&](TRuntimeNode::TList items) { return items.back(); }
+ ));
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto streamVal = graph->GetValue();
+ NUdf::TUnboxedValue result;
+
+ // skip first 2 yields
+ UNIT_ASSERT_VALUES_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
+ // return all the collected values
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
+ UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
+ }
+#endif
}
Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) {