diff options
author | mfilitov <mfilitov@yandex-team.com> | 2024-12-09 16:03:19 +0300 |
---|---|---|
committer | mfilitov <mfilitov@yandex-team.com> | 2024-12-09 16:28:58 +0300 |
commit | 1b8df998edbde3b65541908dc34e7f59b17c4302 (patch) | |
tree | dc35abc99585d474f6e889942d03d78fab9a3cb5 | |
parent | cd83d367ca7edd078dd5eb9a97fda32212b5f3e8 (diff) | |
download | ydb-1b8df998edbde3b65541908dc34e7f59b17c4302.tar.gz |
After spilling join inmemory buckets first
теперь при спиллинге сначала будут джоиниться бакеты, которые сейчас есть в памяти, а только потом уже загружаться новые бакеты
commit_hash:bb66673affba8f5f65eb7ab79ce6d09b26f77ec2
3 files changed, 70 insertions, 57 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp index 8b8dc02001..08986bc8fd 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp @@ -656,6 +656,15 @@ private: break; } case EOperatingMode::ProcessSpilled: { + SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets); + for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i); + + std::sort(SpilledBucketsJoinOrder.begin(), SpilledBucketsJoinOrder.end(), [&](ui32 lhs, ui32 rhs) { + auto lhs_in_memory = LeftPacker->TablePtr->IsBucketInMemory(lhs) + RightPacker->TablePtr->IsBucketInMemory(lhs); + auto rhs_in_memory = LeftPacker->TablePtr->IsBucketInMemory(rhs) + RightPacker->TablePtr->IsBucketInMemory(rhs); + + return lhs_in_memory > rhs_in_memory; + }); MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error"); break; } @@ -871,8 +880,22 @@ private: } bool TryToReduceMemoryAndWait() { - bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait(); - bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait(); + if (!IsSpillingFinished()) return true; + i32 largestBucketsPairIndex = 0; + ui64 largestBucketsPairSize = 0; + for (ui32 bucket = 0; bucket < GraceJoin::NumberOfBuckets; ++bucket) { + + ui64 leftBucketSize = LeftPacker->TablePtr->GetSizeOfBucket(bucket); + ui64 rightBucketSize = RightPacker->TablePtr->GetSizeOfBucket(bucket); + ui64 totalSize = leftBucketSize + rightBucketSize; + if (totalSize > largestBucketsPairSize) { + largestBucketsPairSize = totalSize; + largestBucketsPairIndex = bucket; + } + } + + bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait(largestBucketsPairIndex); + bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait(largestBucketsPairIndex); return isWaitingLeftForReduce || isWaitingRightForReduce; } @@ -930,54 +953,56 @@ EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedVal } EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) { - while (NextBucketToJoin != GraceJoin::NumberOfBuckets) { + while (SpilledBucketsJoinOrderCurrentIndex != GraceJoin::NumberOfBuckets) { UpdateSpilling(); if (IsRestoringSpilledBuckets()) return EFetchResult::Yield; - if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) { - LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin); + ui32 nextBucketToJoin = SpilledBucketsJoinOrder[SpilledBucketsJoinOrderCurrentIndex]; + + if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(nextBucketToJoin)) { + LeftPacker->TablePtr->PrepareBucket(nextBucketToJoin); } - if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) { - RightPacker->TablePtr->PrepareBucket(NextBucketToJoin); + if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(nextBucketToJoin)) { + RightPacker->TablePtr->PrepareBucket(nextBucketToJoin); } - if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) { - LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin); + if (!LeftPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) { + LeftPacker->TablePtr->StartLoadingBucket(nextBucketToJoin); } - if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) { - RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin); + if (!RightPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) { + RightPacker->TablePtr->StartLoadingBucket(nextBucketToJoin); } - if (LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) { + if (LeftPacker->TablePtr->IsBucketInMemory(nextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) { if (*PartialJoinCompleted) { - while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, NextBucketToJoin + 1)) { + while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, nextBucketToJoin + 1)) { UnpackJoinedData(output); return EFetchResult::One; } LeftPacker->TuplesBatchPacked = 0; - LeftPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket - LeftPacker->TablePtr->ShrinkBucket(NextBucketToJoin); + LeftPacker->TablePtr->ClearBucket(nextBucketToJoin); // Clear content of returned bucket + LeftPacker->TablePtr->ShrinkBucket(nextBucketToJoin); RightPacker->TuplesBatchPacked = 0; - RightPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket - RightPacker->TablePtr->ShrinkBucket(NextBucketToJoin); + RightPacker->TablePtr->ClearBucket(nextBucketToJoin); // Clear content of returned bucket + RightPacker->TablePtr->ShrinkBucket(nextBucketToJoin); JoinedTablePtr->Clear(); JoinedTablePtr->ResetIterator(); *PartialJoinCompleted = false; - NextBucketToJoin++; + SpilledBucketsJoinOrderCurrentIndex++; } else { *PartialJoinCompleted = true; LeftPacker->StartTime = std::chrono::system_clock::now(); RightPacker->StartTime = std::chrono::system_clock::now(); if ( SelfJoinSameKeys_ ) { - JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1); + JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, nextBucketToJoin, nextBucketToJoin+1); } else { - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1); + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, nextBucketToJoin, nextBucketToJoin+1); } JoinedTablePtr->ResetIterator(); @@ -1016,9 +1041,9 @@ private: bool IsSpillingFinalized = false; - ui32 NextBucketToJoin = 0; - NYql::NUdf::TCounter CounterOutputRows_; + ui32 SpilledBucketsJoinOrderCurrentIndex = 0; + std::vector<ui32> SpilledBucketsJoinOrder; }; class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> { diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp index af35ed549f..f9b19fdfbc 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -489,7 +489,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef // slotSize, slotIdx and strPos is only for hashtable (table2) ui64 bloomHits = 0; ui64 bloomLookups = 0; - + for (ui64 keysValSize = headerSize1; it1 != bucket1->KeyIntVals.end(); it1 += keysValSize, ++tuple1Idx ) { if ( table1HasKeyStringColumns || table1HasKeyIColumns ) { @@ -630,7 +630,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef HasMoreRightTuples_ = hasMoreRightTuples; TuplesFound_ += tuplesFound; - + } inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { @@ -772,7 +772,7 @@ inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf: continue; if (NumberOfKeyIColumns > 0) { - if (!CompareIColumns( + if (!CompareIColumns( (char *) (slotStringsStart), (char *) (keys + HeaderSize ), iColumns, @@ -903,22 +903,10 @@ ui64 TTable::GetSizeOfBucket(ui64 bucket) const { + TableBuckets[bucket].InterfaceOffsets.size() * sizeof(ui32); } -bool TTable::TryToReduceMemoryAndWait() { - i32 largestBucketIndex = 0; - ui64 largestBucketSize = 0; - for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) { - if (TableBucketsSpillers[bucket].IsProcessingSpilling()) return true; - - ui64 bucketSize = GetSizeOfBucket(bucket); - if (bucketSize > largestBucketSize) { - largestBucketSize = bucketSize; - largestBucketIndex = bucket; - } - } - - if (largestBucketSize < SpillingSizeLimit/NumberOfBuckets) return false; - if (const auto &tbs = TableBucketsStats[largestBucketIndex]; tbs.HashtableMatches) { - auto &tb = TableBuckets[largestBucketIndex]; +bool TTable::TryToReduceMemoryAndWait(ui64 bucket) { + if (GetSizeOfBucket(bucket) < SpillingSizeLimit/NumberOfBuckets) return false; + if (const auto &tbs = TableBucketsStats[bucket]; tbs.HashtableMatches) { + auto &tb = TableBuckets[bucket]; if (tb.JoinSlots.size()) { const auto slotSize = tbs.SlotSize; @@ -946,10 +934,10 @@ bool TTable::TryToReduceMemoryAndWait() { tb.JoinSlots.shrink_to_fit(); } } - TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex])); - TableBuckets[largestBucketIndex] = TTableBucket{}; + TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket])); + TableBuckets[bucket] = TTableBucket{}; - return TableBucketsSpillers[largestBucketIndex].IsProcessingSpilling(); + return TableBucketsSpillers[bucket].IsProcessingSpilling(); } void TTable::UpdateSpilling() { @@ -987,7 +975,7 @@ void TTable::FinalizeSpilling() { TableBucketsSpillers[bucket].Finalize(); TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket])); TableBuckets[bucket] = TTableBucket{}; - + } } } @@ -1288,7 +1276,7 @@ void TTableBucketSpiller::ProcessBucketRestoration() { case ENextVectorToProcess::InterfaceOffsets: if (StateUi32Adapter.IsDataReady()) { AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector()); - + SpilledBucketsCount--; if (SpilledBucketsCount == 0) { NextVectorToProcess = ENextVectorToProcess::None; @@ -1296,7 +1284,7 @@ void TTableBucketSpiller::ProcessBucketRestoration() { } else { NextVectorToProcess = ENextVectorToProcess::KeyAndVals; } - + break; } diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h index b374f269eb..a4846926d1 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h @@ -13,7 +13,7 @@ namespace GraceJoin { class TTableBucketSpiller; #define GRACEJOIN_DEBUG DEBUG #define GRACEJOIN_TRACE TRACE - + const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32 const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1; const ui64 NumberOfBuckets = (0x00000001 << BitsForNumberOfBuckets); // Number of hashed keys buckets to distribute incoming tables tuples @@ -108,7 +108,7 @@ class TBloomfilter { /* Table data stored in buckets. Table columns are interpreted either as integers, strings or some interface-based type, -providing IHash, IEquate, IPack and IUnpack functions. +providing IHash, IEquate, IPack and IUnpack functions. External clients should transform (pack) data into appropriate presentation. Key columns always first, following int columns, string columns and interface-based columns. @@ -135,7 +135,7 @@ struct JoinTuplesIds { // To store keys values when making join only for unique keys (any join attribute) struct KeysHashTable { ui64 SlotSize = 0; // Slot size in hash table - ui64 NSlots = 0; // Total number of slots in table + ui64 NSlots = 0; // Total number of slots in table ui64 FillCount = 0; // Number of ui64 slots which are filled std::vector<ui64, TMKQLAllocator<ui64>> Table; // Table to store keys data in particular slots std::vector<ui64, TMKQLAllocator<ui64>> SpillData; // Vector to store long data which cannot be fit in single hash table slot. @@ -148,7 +148,7 @@ struct TTableBucket { std::vector<ui32, TMKQLAllocator<ui32>> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple. std::vector<char, TMKQLAllocator<char>> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces - std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets + std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets // of two tables with the same number std::vector<ui32, TMKQLAllocator<ui32>> LeftIds; // Left-side ids missing in other table @@ -181,7 +181,7 @@ struct TColTypeInterface { NYql::NUdf::IHash::TPtr HashI = nullptr; // Interface to calculate hash of column value NYql::NUdf::IEquate::TPtr EquateI = nullptr; // Interface to compare two column values std::shared_ptr<TValuePacker> Packer; // Class to pack and unpack column values - const THolderFactory& HolderFactory; // To use during unpacking + const THolderFactory& HolderFactory; // To use during unpacking }; // Class that spills bucket data. @@ -269,7 +269,7 @@ class TTable { ui64 NumberOfDataIntColumns = 0; //Number of integer data columns in the Table ui64 NumberOfDataStringColumns = 0; // Number of strings data columns in the Table ui64 NumberOfDataIColumns = 0; // Number of interface - provided data columns - + TColTypeInterface * ColInterfaces = nullptr; // Array of interfaces to work with corresponding columns data @@ -285,7 +285,7 @@ class TTable { ui64 HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size ui64 BytesInKeyIntColumns = sizeof(ui64) * NumberOfKeyIntColumns; - + // Table data is partitioned in buckets based on key value std::vector<TTableBucket> TableBuckets; // Statistics for buckets. Total number of tuples inside a single bucket and offsets. @@ -365,7 +365,7 @@ public: ui64 GetSizeOfBucket(ui64 bucket) const; // This functions wind the largest bucket and spills it to the disk. - bool TryToReduceMemoryAndWait(); + bool TryToReduceMemoryAndWait(ui64 bucket); // Update state of spilling. Must be called during each DoCalculate. void UpdateSpilling(); @@ -406,7 +406,7 @@ public: // Creates new table with key columns and data columns TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0, ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0, - ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, + ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false); enum class EAddTupleResult { Added, Unmatched, AnyMatch }; @@ -414,7 +414,7 @@ public: // stringsSizes - sizes of strings columns. Indexes of null-value columns // in the form of bit array should be first values of intColumns. EAddTupleResult AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes, NYql::NUdf::TUnboxedValue * iColumns = nullptr, const TTable &other = {}); - + ~TTable(); ui64 InitHashTableCount_ = 0; |