aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormfilitov <mfilitov@yandex-team.com>2024-12-12 12:54:18 +0300
committermfilitov <mfilitov@yandex-team.com>2024-12-12 13:13:35 +0300
commita0ab79ee3d99df566509c5cb1e42129017424703 (patch)
tree655e4b876d150c020abe8dd749b6b447ef393387
parent560023d550a203f58c64cdf9844dac139f54a368 (diff)
downloadydb-a0ab79ee3d99df566509c5cb1e42129017424703.tar.gz
initial implementation of memory preallocation in gracejoin
Память преаллоцируется перед выполнением Join(). Если память преаллоцировать не удалось из-за ее нехватки, то включается спиллинг commit_hash:eccddfcf1304b4d64d819ced90744a163924addb
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp22
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp69
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h2
3 files changed, 75 insertions, 18 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
index 08986bc8fd..871321786b 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
@@ -643,12 +643,15 @@ private:
}
void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
+ LogMemoryUsage();
switch(mode) {
case EOperatingMode::InMemory: {
+ YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory";
MKQL_ENSURE(false, "Internal logic error");
break;
}
case EOperatingMode::Spilling: {
+ YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling";
MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
auto spiller = ctx.SpillerFactory->CreateSpiller();
RightPacker->TablePtr->InitializeBucketSpillers(spiller);
@@ -656,6 +659,7 @@ private:
break;
}
case EOperatingMode::ProcessSpilled: {
+ YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled";
SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets);
for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i);
@@ -843,9 +847,6 @@ private:
if (isYield == EFetchResult::One)
return isYield;
if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
- LogMemoryUsage();
- YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling";
-
SwitchMode(EOperatingMode::Spilling, ctx);
return EFetchResult::Yield;
}
@@ -861,14 +862,18 @@ private:
<< " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize
<< " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize
;
+
+ auto& leftTable = *LeftPacker->TablePtr;
+ auto& rightTable = SelfJoinSameKeys_ ? *LeftPacker->TablePtr : *RightPacker->TablePtr;
+ if (IsSpillingAllowed && ctx.SpillerFactory && !JoinedTablePtr->TryToPreallocateMemoryForJoin(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows)) {
+ SwitchMode(EOperatingMode::Spilling, ctx);
+ return EFetchResult::Yield;
+ }
+
*PartialJoinCompleted = true;
LeftPacker->StartTime = std::chrono::system_clock::now();
RightPacker->StartTime = std::chrono::system_clock::now();
- if ( SelfJoinSameKeys_ ) {
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
- } else {
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
- }
+ JoinedTablePtr->Join(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
JoinedTablePtr->ResetIterator();
LeftPacker->EndTime = std::chrono::system_clock::now();
RightPacker->EndTime = std::chrono::system_clock::now();
@@ -945,7 +950,6 @@ EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedVal
}
if (!IsReadyForSpilledDataProcessing()) return EFetchResult::Yield;
- YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching to ProcessSpilled";
SwitchMode(EOperatingMode::ProcessSpilled, ctx);
return EFetchResult::Finish;
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
index f9b19fdfbc..ae7c89daef 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -320,6 +320,63 @@ void ResizeHashTable(KeysHashTable &t, ui64 newSlots){
}
+bool IsTablesSwapRequired(ui64 tuplesNum1, ui64 tuplesNum2, bool table1Batch, bool table2Batch) {
+ return tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch;
+}
+
+ui64 ComputeJoinSlotsSizeForBucket(const TTableBucket& bucket, const TTableBucketStats& bucketStats, ui64 headerSize, bool tableHasKeyStringColumns, bool tableHasKeyIColumns) {
+ ui64 tuplesNum = bucketStats.TuplesNum;
+
+ ui64 avgStringsSize = (3 * (bucket.KeyIntVals.size() - tuplesNum * headerSize) ) / ( 2 * tuplesNum + 1) + 1;
+ ui64 slotSize = headerSize + 1; // Header [Short Strings] SlotIdx
+ if (tableHasKeyStringColumns || tableHasKeyIColumns) {
+ slotSize = slotSize + avgStringsSize;
+ }
+
+ return slotSize;
+}
+
+ui64 ComputeNumberOfSlots(ui64 tuplesNum) {
+ return (3 * tuplesNum + 1) | 1;
+}
+
+bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples) {
+ // If the batch is final or the only one, then the buckets are processed sequentially, the memory for the hash tables is freed immediately after processing.
+ // So, no preallocation is required.
+ if (!hasMoreLeftTuples && !hasMoreRightTuples) return true;
+
+ for (ui64 bucket = 0; bucket < GraceJoin::NumberOfBuckets; bucket++) {
+ ui64 tuplesNum1 = t1.TableBucketsStats[bucket].TuplesNum;
+ ui64 tuplesNum2 = t2.TableBucketsStats[bucket].TuplesNum;
+
+ TTable& tableForPreallocation = IsTablesSwapRequired(tuplesNum1, tuplesNum2, hasMoreLeftTuples || LeftTableBatch_, hasMoreRightTuples || RightTableBatch_) ? t1 : t2;
+ if (!tableForPreallocation.TableBucketsStats[bucket].TuplesNum || tableForPreallocation.TableBuckets[bucket].NSlots) continue;
+
+ TTableBucket& bucketForPreallocation = tableForPreallocation.TableBuckets[bucket];
+ const TTableBucketStats& bucketForPreallocationStats = tableForPreallocation.TableBucketsStats[bucket];
+
+ const auto nSlots = ComputeJoinSlotsSizeForBucket(bucketForPreallocation, bucketForPreallocationStats, tableForPreallocation.HeaderSize,
+ tableForPreallocation.NumberOfKeyStringColumns != 0, tableForPreallocation.NumberOfKeyIColumns != 0);
+ const auto slotSize = ComputeNumberOfSlots(tableForPreallocation.TableBucketsStats[bucket].TuplesNum);
+
+ try {
+ bucketForPreallocation.JoinSlots.reserve(nSlots*slotSize);
+ } catch (TMemoryLimitExceededException) {
+ for (ui64 i = 0; i < bucket; ++i) {
+ GraceJoin::TTableBucket * b1 = &JoinTable1->TableBuckets[i];
+ b1->JoinSlots.resize(0);
+ b1->JoinSlots.shrink_to_fit();
+ GraceJoin::TTableBucket * b2 = &JoinTable2->TableBuckets[i];
+ b2->JoinSlots.resize(0);
+ b2->JoinSlots.shrink_to_fit();
+ }
+ return false;
+ }
+ }
+
+ return true;
+}
+
// Joins two tables and returns join result in joined table. Tuples of joined table could be received by
// joined table iterator
@@ -368,7 +425,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
bool table2HasKeyStringColumns = (JoinTable2->NumberOfKeyStringColumns != 0);
bool table1HasKeyIColumns = (JoinTable1->NumberOfKeyIColumns != 0);
bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0);
- bool swapTables = tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch;
+ bool swapTables = IsTablesSwapRequired(tuplesNum1, tuplesNum2, table1Batch, table2Batch);
if (swapTables) {
@@ -402,13 +459,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
if (tuplesNum1 == 0 && (hasMoreRightTuples || hasMoreLeftTuples || !bucketStats2->HashtableMatches))
continue;
- ui64 slotSize = headerSize2 + 1; // Header [Short Strings] SlotIdx
-
- ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - tuplesNum2 * headerSize2) ) / ( 2 * tuplesNum2 + 1) + 1;
-
- if (table2HasKeyStringColumns || table2HasKeyIColumns ) {
- slotSize = slotSize + avgStringsSize;
- }
+ ui64 slotSize = ComputeJoinSlotsSizeForBucket(*bucket2, *bucketStats2, headerSize2, table2HasKeyStringColumns, table2HasKeyIColumns);
ui64 &nSlots = bucket2->NSlots;
auto &joinSlots = bucket2->JoinSlots;
@@ -417,7 +468,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
Y_DEBUG_ABORT_UNLESS(bucketStats2->SlotSize == 0 || bucketStats2->SlotSize == slotSize);
if (!nSlots) {
- nSlots = (3 * tuplesNum2 + 1) | 1;
+ nSlots = ComputeNumberOfSlots(tuplesNum2);
joinSlots.resize(nSlots*slotSize, 0);
bloomFilter.Resize(tuplesNum2);
initHashTable = true;
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
index a4846926d1..d6b9a54aca 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
@@ -346,6 +346,8 @@ public:
// Returns value of next tuple. Returs true if there are more tuples
bool NextTuple(TupleData& td);
+ bool TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples);
+
// Joins two tables and stores join result in table data. Tuples of joined table could be received by
// joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table
// hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false