diff options
author | mfilitov <mfilitov@yandex-team.com> | 2024-12-12 12:54:18 +0300 |
---|---|---|
committer | mfilitov <mfilitov@yandex-team.com> | 2024-12-12 13:13:35 +0300 |
commit | a0ab79ee3d99df566509c5cb1e42129017424703 (patch) | |
tree | 655e4b876d150c020abe8dd749b6b447ef393387 | |
parent | 560023d550a203f58c64cdf9844dac139f54a368 (diff) | |
download | ydb-a0ab79ee3d99df566509c5cb1e42129017424703.tar.gz |
initial implementation of memory preallocation in gracejoin
Память преаллоцируется перед выполнением Join(). Если память преаллоцировать не удалось из-за ее нехватки, то включается спиллинг
commit_hash:eccddfcf1304b4d64d819ced90744a163924addb
3 files changed, 75 insertions, 18 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp index 08986bc8fd..871321786b 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp @@ -643,12 +643,15 @@ private: } void SwitchMode(EOperatingMode mode, TComputationContext& ctx) { + LogMemoryUsage(); switch(mode) { case EOperatingMode::InMemory: { + YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory"; MKQL_ENSURE(false, "Internal logic error"); break; } case EOperatingMode::Spilling: { + YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling"; MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error"); auto spiller = ctx.SpillerFactory->CreateSpiller(); RightPacker->TablePtr->InitializeBucketSpillers(spiller); @@ -656,6 +659,7 @@ private: break; } case EOperatingMode::ProcessSpilled: { + YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled"; SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets); for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i); @@ -843,9 +847,6 @@ private: if (isYield == EFetchResult::One) return isYield; if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) { - LogMemoryUsage(); - YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling"; - SwitchMode(EOperatingMode::Spilling, ctx); return EFetchResult::Yield; } @@ -861,14 +862,18 @@ private: << " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize << " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize ; + + auto& leftTable = *LeftPacker->TablePtr; + auto& rightTable = SelfJoinSameKeys_ ? *LeftPacker->TablePtr : *RightPacker->TablePtr; + if (IsSpillingAllowed && ctx.SpillerFactory && !JoinedTablePtr->TryToPreallocateMemoryForJoin(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows)) { + SwitchMode(EOperatingMode::Spilling, ctx); + return EFetchResult::Yield; + } + *PartialJoinCompleted = true; LeftPacker->StartTime = std::chrono::system_clock::now(); RightPacker->StartTime = std::chrono::system_clock::now(); - if ( SelfJoinSameKeys_ ) { - JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); - } else { - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); - } + JoinedTablePtr->Join(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); JoinedTablePtr->ResetIterator(); LeftPacker->EndTime = std::chrono::system_clock::now(); RightPacker->EndTime = std::chrono::system_clock::now(); @@ -945,7 +950,6 @@ EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedVal } if (!IsReadyForSpilledDataProcessing()) return EFetchResult::Yield; - YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching to ProcessSpilled"; SwitchMode(EOperatingMode::ProcessSpilled, ctx); return EFetchResult::Finish; } diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp index f9b19fdfbc..ae7c89daef 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -320,6 +320,63 @@ void ResizeHashTable(KeysHashTable &t, ui64 newSlots){ } +bool IsTablesSwapRequired(ui64 tuplesNum1, ui64 tuplesNum2, bool table1Batch, bool table2Batch) { + return tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch; +} + +ui64 ComputeJoinSlotsSizeForBucket(const TTableBucket& bucket, const TTableBucketStats& bucketStats, ui64 headerSize, bool tableHasKeyStringColumns, bool tableHasKeyIColumns) { + ui64 tuplesNum = bucketStats.TuplesNum; + + ui64 avgStringsSize = (3 * (bucket.KeyIntVals.size() - tuplesNum * headerSize) ) / ( 2 * tuplesNum + 1) + 1; + ui64 slotSize = headerSize + 1; // Header [Short Strings] SlotIdx + if (tableHasKeyStringColumns || tableHasKeyIColumns) { + slotSize = slotSize + avgStringsSize; + } + + return slotSize; +} + +ui64 ComputeNumberOfSlots(ui64 tuplesNum) { + return (3 * tuplesNum + 1) | 1; +} + +bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples) { + // If the batch is final or the only one, then the buckets are processed sequentially, the memory for the hash tables is freed immediately after processing. + // So, no preallocation is required. + if (!hasMoreLeftTuples && !hasMoreRightTuples) return true; + + for (ui64 bucket = 0; bucket < GraceJoin::NumberOfBuckets; bucket++) { + ui64 tuplesNum1 = t1.TableBucketsStats[bucket].TuplesNum; + ui64 tuplesNum2 = t2.TableBucketsStats[bucket].TuplesNum; + + TTable& tableForPreallocation = IsTablesSwapRequired(tuplesNum1, tuplesNum2, hasMoreLeftTuples || LeftTableBatch_, hasMoreRightTuples || RightTableBatch_) ? t1 : t2; + if (!tableForPreallocation.TableBucketsStats[bucket].TuplesNum || tableForPreallocation.TableBuckets[bucket].NSlots) continue; + + TTableBucket& bucketForPreallocation = tableForPreallocation.TableBuckets[bucket]; + const TTableBucketStats& bucketForPreallocationStats = tableForPreallocation.TableBucketsStats[bucket]; + + const auto nSlots = ComputeJoinSlotsSizeForBucket(bucketForPreallocation, bucketForPreallocationStats, tableForPreallocation.HeaderSize, + tableForPreallocation.NumberOfKeyStringColumns != 0, tableForPreallocation.NumberOfKeyIColumns != 0); + const auto slotSize = ComputeNumberOfSlots(tableForPreallocation.TableBucketsStats[bucket].TuplesNum); + + try { + bucketForPreallocation.JoinSlots.reserve(nSlots*slotSize); + } catch (TMemoryLimitExceededException) { + for (ui64 i = 0; i < bucket; ++i) { + GraceJoin::TTableBucket * b1 = &JoinTable1->TableBuckets[i]; + b1->JoinSlots.resize(0); + b1->JoinSlots.shrink_to_fit(); + GraceJoin::TTableBucket * b2 = &JoinTable2->TableBuckets[i]; + b2->JoinSlots.resize(0); + b2->JoinSlots.shrink_to_fit(); + } + return false; + } + } + + return true; +} + // Joins two tables and returns join result in joined table. Tuples of joined table could be received by // joined table iterator @@ -368,7 +425,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef bool table2HasKeyStringColumns = (JoinTable2->NumberOfKeyStringColumns != 0); bool table1HasKeyIColumns = (JoinTable1->NumberOfKeyIColumns != 0); bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0); - bool swapTables = tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch; + bool swapTables = IsTablesSwapRequired(tuplesNum1, tuplesNum2, table1Batch, table2Batch); if (swapTables) { @@ -402,13 +459,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef if (tuplesNum1 == 0 && (hasMoreRightTuples || hasMoreLeftTuples || !bucketStats2->HashtableMatches)) continue; - ui64 slotSize = headerSize2 + 1; // Header [Short Strings] SlotIdx - - ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - tuplesNum2 * headerSize2) ) / ( 2 * tuplesNum2 + 1) + 1; - - if (table2HasKeyStringColumns || table2HasKeyIColumns ) { - slotSize = slotSize + avgStringsSize; - } + ui64 slotSize = ComputeJoinSlotsSizeForBucket(*bucket2, *bucketStats2, headerSize2, table2HasKeyStringColumns, table2HasKeyIColumns); ui64 &nSlots = bucket2->NSlots; auto &joinSlots = bucket2->JoinSlots; @@ -417,7 +468,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef Y_DEBUG_ABORT_UNLESS(bucketStats2->SlotSize == 0 || bucketStats2->SlotSize == slotSize); if (!nSlots) { - nSlots = (3 * tuplesNum2 + 1) | 1; + nSlots = ComputeNumberOfSlots(tuplesNum2); joinSlots.resize(nSlots*slotSize, 0); bloomFilter.Resize(tuplesNum2); initHashTable = true; diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h index a4846926d1..d6b9a54aca 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h @@ -346,6 +346,8 @@ public: // Returns value of next tuple. Returs true if there are more tuples bool NextTuple(TupleData& td); + bool TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples); + // Joins two tables and stores join result in table data. Tuples of joined table could be received by // joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table // hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false |