aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql
diff options
context:
space:
mode:
authormfilitov <mfilitov@yandex-team.com>2024-12-09 16:03:19 +0300
committermfilitov <mfilitov@yandex-team.com>2024-12-09 16:28:58 +0300
commit1b8df998edbde3b65541908dc34e7f59b17c4302 (patch)
treedc35abc99585d474f6e889942d03d78fab9a3cb5 /yql/essentials/minikql
parentcd83d367ca7edd078dd5eb9a97fda32212b5f3e8 (diff)
downloadydb-1b8df998edbde3b65541908dc34e7f59b17c4302.tar.gz
After spilling join inmemory buckets first
теперь при спиллинге сначала будут джоиниться бакеты, которые сейчас есть в памяти, а только потом уже загружаться новые бакеты commit_hash:bb66673affba8f5f65eb7ab79ce6d09b26f77ec2
Diffstat (limited to 'yql/essentials/minikql')
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp69
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp38
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h20
3 files changed, 70 insertions, 57 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
index 8b8dc02001..08986bc8fd 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
@@ -656,6 +656,15 @@ private:
break;
}
case EOperatingMode::ProcessSpilled: {
+ SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets);
+ for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i);
+
+ std::sort(SpilledBucketsJoinOrder.begin(), SpilledBucketsJoinOrder.end(), [&](ui32 lhs, ui32 rhs) {
+ auto lhs_in_memory = LeftPacker->TablePtr->IsBucketInMemory(lhs) + RightPacker->TablePtr->IsBucketInMemory(lhs);
+ auto rhs_in_memory = LeftPacker->TablePtr->IsBucketInMemory(rhs) + RightPacker->TablePtr->IsBucketInMemory(rhs);
+
+ return lhs_in_memory > rhs_in_memory;
+ });
MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
break;
}
@@ -871,8 +880,22 @@ private:
}
bool TryToReduceMemoryAndWait() {
- bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait();
- bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait();
+ if (!IsSpillingFinished()) return true;
+ i32 largestBucketsPairIndex = 0;
+ ui64 largestBucketsPairSize = 0;
+ for (ui32 bucket = 0; bucket < GraceJoin::NumberOfBuckets; ++bucket) {
+
+ ui64 leftBucketSize = LeftPacker->TablePtr->GetSizeOfBucket(bucket);
+ ui64 rightBucketSize = RightPacker->TablePtr->GetSizeOfBucket(bucket);
+ ui64 totalSize = leftBucketSize + rightBucketSize;
+ if (totalSize > largestBucketsPairSize) {
+ largestBucketsPairSize = totalSize;
+ largestBucketsPairIndex = bucket;
+ }
+ }
+
+ bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait(largestBucketsPairIndex);
+ bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait(largestBucketsPairIndex);
return isWaitingLeftForReduce || isWaitingRightForReduce;
}
@@ -930,54 +953,56 @@ EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedVal
}
EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) {
- while (NextBucketToJoin != GraceJoin::NumberOfBuckets) {
+ while (SpilledBucketsJoinOrderCurrentIndex != GraceJoin::NumberOfBuckets) {
UpdateSpilling();
if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;
- if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
- LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
+ ui32 nextBucketToJoin = SpilledBucketsJoinOrder[SpilledBucketsJoinOrderCurrentIndex];
+
+ if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(nextBucketToJoin)) {
+ LeftPacker->TablePtr->PrepareBucket(nextBucketToJoin);
}
- if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
- RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
+ if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(nextBucketToJoin)) {
+ RightPacker->TablePtr->PrepareBucket(nextBucketToJoin);
}
- if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
- LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
+ if (!LeftPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) {
+ LeftPacker->TablePtr->StartLoadingBucket(nextBucketToJoin);
}
- if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
- RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
+ if (!RightPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) {
+ RightPacker->TablePtr->StartLoadingBucket(nextBucketToJoin);
}
- if (LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
+ if (LeftPacker->TablePtr->IsBucketInMemory(nextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(nextBucketToJoin)) {
if (*PartialJoinCompleted) {
- while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, NextBucketToJoin + 1)) {
+ while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, nextBucketToJoin + 1)) {
UnpackJoinedData(output);
return EFetchResult::One;
}
LeftPacker->TuplesBatchPacked = 0;
- LeftPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket
- LeftPacker->TablePtr->ShrinkBucket(NextBucketToJoin);
+ LeftPacker->TablePtr->ClearBucket(nextBucketToJoin); // Clear content of returned bucket
+ LeftPacker->TablePtr->ShrinkBucket(nextBucketToJoin);
RightPacker->TuplesBatchPacked = 0;
- RightPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket
- RightPacker->TablePtr->ShrinkBucket(NextBucketToJoin);
+ RightPacker->TablePtr->ClearBucket(nextBucketToJoin); // Clear content of returned bucket
+ RightPacker->TablePtr->ShrinkBucket(nextBucketToJoin);
JoinedTablePtr->Clear();
JoinedTablePtr->ResetIterator();
*PartialJoinCompleted = false;
- NextBucketToJoin++;
+ SpilledBucketsJoinOrderCurrentIndex++;
} else {
*PartialJoinCompleted = true;
LeftPacker->StartTime = std::chrono::system_clock::now();
RightPacker->StartTime = std::chrono::system_clock::now();
if ( SelfJoinSameKeys_ ) {
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1);
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, nextBucketToJoin, nextBucketToJoin+1);
} else {
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1);
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, nextBucketToJoin, nextBucketToJoin+1);
}
JoinedTablePtr->ResetIterator();
@@ -1016,9 +1041,9 @@ private:
bool IsSpillingFinalized = false;
- ui32 NextBucketToJoin = 0;
-
NYql::NUdf::TCounter CounterOutputRows_;
+ ui32 SpilledBucketsJoinOrderCurrentIndex = 0;
+ std::vector<ui32> SpilledBucketsJoinOrder;
};
class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> {
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
index af35ed549f..f9b19fdfbc 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -489,7 +489,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
// slotSize, slotIdx and strPos is only for hashtable (table2)
ui64 bloomHits = 0;
ui64 bloomLookups = 0;
-
+
for (ui64 keysValSize = headerSize1; it1 != bucket1->KeyIntVals.end(); it1 += keysValSize, ++tuple1Idx ) {
if ( table1HasKeyStringColumns || table1HasKeyIColumns ) {
@@ -630,7 +630,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
HasMoreRightTuples_ = hasMoreRightTuples;
TuplesFound_ += tuplesFound;
-
+
}
inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
@@ -772,7 +772,7 @@ inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf:
continue;
if (NumberOfKeyIColumns > 0) {
- if (!CompareIColumns(
+ if (!CompareIColumns(
(char *) (slotStringsStart),
(char *) (keys + HeaderSize ),
iColumns,
@@ -903,22 +903,10 @@ ui64 TTable::GetSizeOfBucket(ui64 bucket) const {
+ TableBuckets[bucket].InterfaceOffsets.size() * sizeof(ui32);
}
-bool TTable::TryToReduceMemoryAndWait() {
- i32 largestBucketIndex = 0;
- ui64 largestBucketSize = 0;
- for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
- if (TableBucketsSpillers[bucket].IsProcessingSpilling()) return true;
-
- ui64 bucketSize = GetSizeOfBucket(bucket);
- if (bucketSize > largestBucketSize) {
- largestBucketSize = bucketSize;
- largestBucketIndex = bucket;
- }
- }
-
- if (largestBucketSize < SpillingSizeLimit/NumberOfBuckets) return false;
- if (const auto &tbs = TableBucketsStats[largestBucketIndex]; tbs.HashtableMatches) {
- auto &tb = TableBuckets[largestBucketIndex];
+bool TTable::TryToReduceMemoryAndWait(ui64 bucket) {
+ if (GetSizeOfBucket(bucket) < SpillingSizeLimit/NumberOfBuckets) return false;
+ if (const auto &tbs = TableBucketsStats[bucket]; tbs.HashtableMatches) {
+ auto &tb = TableBuckets[bucket];
if (tb.JoinSlots.size()) {
const auto slotSize = tbs.SlotSize;
@@ -946,10 +934,10 @@ bool TTable::TryToReduceMemoryAndWait() {
tb.JoinSlots.shrink_to_fit();
}
}
- TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
- TableBuckets[largestBucketIndex] = TTableBucket{};
+ TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
+ TableBuckets[bucket] = TTableBucket{};
- return TableBucketsSpillers[largestBucketIndex].IsProcessingSpilling();
+ return TableBucketsSpillers[bucket].IsProcessingSpilling();
}
void TTable::UpdateSpilling() {
@@ -987,7 +975,7 @@ void TTable::FinalizeSpilling() {
TableBucketsSpillers[bucket].Finalize();
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
TableBuckets[bucket] = TTableBucket{};
-
+
}
}
}
@@ -1288,7 +1276,7 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
case ENextVectorToProcess::InterfaceOffsets:
if (StateUi32Adapter.IsDataReady()) {
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
-
+
SpilledBucketsCount--;
if (SpilledBucketsCount == 0) {
NextVectorToProcess = ENextVectorToProcess::None;
@@ -1296,7 +1284,7 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
} else {
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
}
-
+
break;
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
index b374f269eb..a4846926d1 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
@@ -13,7 +13,7 @@ namespace GraceJoin {
class TTableBucketSpiller;
#define GRACEJOIN_DEBUG DEBUG
#define GRACEJOIN_TRACE TRACE
-
+
const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32
const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1;
const ui64 NumberOfBuckets = (0x00000001 << BitsForNumberOfBuckets); // Number of hashed keys buckets to distribute incoming tables tuples
@@ -108,7 +108,7 @@ class TBloomfilter {
/*
Table data stored in buckets. Table columns are interpreted either as integers, strings or some interface-based type,
-providing IHash, IEquate, IPack and IUnpack functions.
+providing IHash, IEquate, IPack and IUnpack functions.
External clients should transform (pack) data into appropriate presentation.
Key columns always first, following int columns, string columns and interface-based columns.
@@ -135,7 +135,7 @@ struct JoinTuplesIds {
// To store keys values when making join only for unique keys (any join attribute)
struct KeysHashTable {
ui64 SlotSize = 0; // Slot size in hash table
- ui64 NSlots = 0; // Total number of slots in table
+ ui64 NSlots = 0; // Total number of slots in table
ui64 FillCount = 0; // Number of ui64 slots which are filled
std::vector<ui64, TMKQLAllocator<ui64>> Table; // Table to store keys data in particular slots
std::vector<ui64, TMKQLAllocator<ui64>> SpillData; // Vector to store long data which cannot be fit in single hash table slot.
@@ -148,7 +148,7 @@ struct TTableBucket {
std::vector<ui32, TMKQLAllocator<ui32>> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple.
std::vector<char, TMKQLAllocator<char>> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces
std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces
- std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets
+ std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets
// of two tables with the same number
std::vector<ui32, TMKQLAllocator<ui32>> LeftIds; // Left-side ids missing in other table
@@ -181,7 +181,7 @@ struct TColTypeInterface {
NYql::NUdf::IHash::TPtr HashI = nullptr; // Interface to calculate hash of column value
NYql::NUdf::IEquate::TPtr EquateI = nullptr; // Interface to compare two column values
std::shared_ptr<TValuePacker> Packer; // Class to pack and unpack column values
- const THolderFactory& HolderFactory; // To use during unpacking
+ const THolderFactory& HolderFactory; // To use during unpacking
};
// Class that spills bucket data.
@@ -269,7 +269,7 @@ class TTable {
ui64 NumberOfDataIntColumns = 0; //Number of integer data columns in the Table
ui64 NumberOfDataStringColumns = 0; // Number of strings data columns in the Table
ui64 NumberOfDataIColumns = 0; // Number of interface - provided data columns
-
+
TColTypeInterface * ColInterfaces = nullptr; // Array of interfaces to work with corresponding columns data
@@ -285,7 +285,7 @@ class TTable {
ui64 HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size
ui64 BytesInKeyIntColumns = sizeof(ui64) * NumberOfKeyIntColumns;
-
+
// Table data is partitioned in buckets based on key value
std::vector<TTableBucket> TableBuckets;
// Statistics for buckets. Total number of tuples inside a single bucket and offsets.
@@ -365,7 +365,7 @@ public:
ui64 GetSizeOfBucket(ui64 bucket) const;
// This functions wind the largest bucket and spills it to the disk.
- bool TryToReduceMemoryAndWait();
+ bool TryToReduceMemoryAndWait(ui64 bucket);
// Update state of spilling. Must be called during each DoCalculate.
void UpdateSpilling();
@@ -406,7 +406,7 @@ public:
// Creates new table with key columns and data columns
TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
- ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
+ ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false);
enum class EAddTupleResult { Added, Unmatched, AnyMatch };
@@ -414,7 +414,7 @@ public:
// stringsSizes - sizes of strings columns. Indexes of null-value columns
// in the form of bit array should be first values of intColumns.
EAddTupleResult AddTuple(ui64* intColumns, char** stringColumns, ui32* stringsSizes, NYql::NUdf::TUnboxedValue * iColumns = nullptr, const TTable &other = {});
-
+
~TTable();
ui64 InitHashTableCount_ = 0;