diff options
author | aakulaga <aakulaga@ydb.tech> | 2023-06-08 15:13:31 +0300 |
---|---|---|
committer | aakulaga <aakulaga@ydb.tech> | 2023-06-08 15:13:31 +0300 |
commit | d2704d925a4e0569a9ed1e8ce9b4287bbd9503b4 (patch) | |
tree | cfe08d99db82b56ced7e868d0eba7d8f3c1b43a5 | |
parent | 51b3066edda71a1c65129e7e5f85aac77a4138b2 (diff) | |
download | ydb-d2704d925a4e0569a9ed1e8ce9b4287bbd9503b4.tar.gz |
Any join attribute added
Any join attribute added
First versin of Any
4 files changed, 386 insertions, 59 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 0e968459c2e..89aa7b057e1 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -69,16 +69,19 @@ struct TGraceJoinPacker { ui64 TotalIntColumnsNum = 0; // Total number of int columns ui64 TotalStrColumnsNum = 0; // Total number of string columns ui64 TotalIColumnsNum = 0; // Total number of interface-based columns - ui64 KeyIntColumnsNum = 0; // Total number of key int columns + ui64 KeyIntColumnsNum = 0; // Total number of key int columns in original table + ui64 PackedKeyIntColumnsNum = 0; // Length of ui64 array containing data of all key int columns after packing ui64 KeyStrColumnsNum = 0; // Total number of key string columns ui64 KeyIColumnsNum = 0; // Total number of interface-based columns ui64 DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum; + ui64 PackedDataIntColumnsNum = 0; // Length of ui64 array containing data of all non-key int columns after packing ui64 DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum; ui64 DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum; std::vector<GraceJoin::TColTypeInterface> ColumnInterfaces; + bool IsAny; // Flag to support any join attribute inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs - TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory); + TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny); }; @@ -198,10 +201,13 @@ void TGraceJoinPacker::Pack() { NYql::NUdf::TUnboxedValue value = *TuplePtrs[pi.ColumnIdx]; if (!value) { // Null value - ui64 currNullsIdx = i / (sizeof(ui64) * 8); - ui64 remShift = ( i - currNullsIdx * (sizeof(ui64) * 8) ); + ui64 currNullsIdx = (i + 1) / (sizeof(ui64) * 8); + ui64 remShift = ( (i + 1) - currNullsIdx * (sizeof(ui64) * 8) ); ui64 bitMask = (0x1) << remShift; TupleIntVals[currNullsIdx] |= bitMask; + if (pi.IsKeyColumn) { + TupleIntVals[0] |= (0x1); + } continue; } TType* type = pi.MKQLType; @@ -309,8 +315,8 @@ void TGraceJoinPacker::UnPack() { value = NYql::NUdf::TUnboxedValue(); continue; } - ui64 currNullsIdx = i / (sizeof(ui64) * 8); - ui64 remShift = ( i - currNullsIdx * (sizeof(ui64) * 8) ); + ui64 currNullsIdx = (i + 1) / (sizeof(ui64) * 8); + ui64 remShift = ( (i + 1) - currNullsIdx * (sizeof(ui64) * 8) ); ui64 bitMask = (0x1) << remShift; if ( TupleIntVals[currNullsIdx] & bitMask ) { value = NYql::NUdf::TUnboxedValue(); @@ -411,9 +417,10 @@ void TGraceJoinPacker::UnPack() { } -TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory) : +TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny) : ColumnTypes(columnTypes) - , HolderFactory(holderFactory) { + , HolderFactory(holderFactory) + , IsAny(isAny) { ui64 nColumns = ColumnTypes.size(); ui64 nKeyColumns = keyColumns.size(); @@ -467,7 +474,7 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum; DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum; - NullsBitmapSize = (nColumns / (8 * sizeof(ui64)) + 1) ; + NullsBitmapSize = ( (nColumns + 1)/ (8 * sizeof(ui64)) + 1) ; TupleIntVals.resize(2 * totalIntColumnsNum + NullsBitmapSize); TupleStrings.resize(totalStrColumnsNum); @@ -533,8 +540,8 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con currIdx++; } - KeyIntColumnsNum = (keyIntOffset + sizeof(ui64) - 1 ) / sizeof(ui64) - NullsBitmapSize; - DataIntColumnsNum = (currIntOffset + sizeof(ui64) - 1) / sizeof(ui64) - KeyIntColumnsNum - NullsBitmapSize; + PackedKeyIntColumnsNum = (keyIntOffset + sizeof(ui64) - 1 ) / sizeof(ui64) - NullsBitmapSize; + PackedDataIntColumnsNum = (currIntOffset + sizeof(ui64) - 1) / sizeof(ui64) - PackedKeyIntColumnsNum - NullsBitmapSize; GraceJoin::TColTypeInterface * cti_p = nullptr; @@ -542,7 +549,9 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con cti_p = ColumnInterfaces.data(); } - TablePtr = std::make_unique<GraceJoin::TTable>(KeyIntColumnsNum, KeyStrColumnsNum, DataIntColumnsNum, DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, cti_p ); + TablePtr = std::make_unique<GraceJoin::TTable>( + PackedKeyIntColumnsNum, KeyStrColumnsNum, PackedDataIntColumnsNum, + DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, NullsBitmapSize, cti_p, IsAny ); } @@ -553,12 +562,12 @@ public: TGraceJoinState(TMemoryUsageInfo* memInfo, IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight, - EJoinKind joinKind, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns, + EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns, const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames, const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory) : TBase(memInfo) - , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory)) - , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory)) + , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both))) + , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both))) , JoinedTablePtr(std::make_unique<GraceJoin::TTable>()) , JoinCompleted(std::make_unique<bool>(false)) , PartialJoinCompleted(std::make_unique<bool>(false)) @@ -568,6 +577,7 @@ public: , FlowLeft(flowLeft) , FlowRight(flowRight) , JoinKind(joinKind) + , AnyJoinSettings_(anyJoinSettings) , LeftKeyColumns(leftKeyColumns) , RightKeyColumns(rightKeyColumns) , LeftRenames(leftRenames) @@ -583,6 +593,7 @@ private: IComputationWideFlowNode* const FlowRight; const EJoinKind JoinKind; + const EAnyJoinSettings AnyJoinSettings_; const std::vector<ui32> LeftKeyColumns; const std::vector<ui32> RightKeyColumns; const std::vector<ui32> LeftRenames; @@ -604,7 +615,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr public: TGraceJoinWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight, - EJoinKind joinKind, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns, + EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns, std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames, std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes, std::vector<EValueRepresentation>&& outputRepresentations) @@ -612,6 +623,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr , FlowLeft(flowLeft) , FlowRight(flowRight) , JoinKind(joinKind) + , AnyJoinSettings_(anyJoinSettings) , LeftKeyColumns(std::move(leftKeyColumns)) , RightKeyColumns(std::move(rightKeyColumns)) , LeftRenames(std::move(leftRenames)) @@ -718,7 +730,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { state = ctx.HolderFactory.Create<TGraceJoinState>( - FlowLeft, FlowRight, JoinKind, LeftKeyColumns, RightKeyColumns, + FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns, LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes, ctx.HolderFactory); } @@ -726,6 +738,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr IComputationWideFlowNode *const FlowLeft; IComputationWideFlowNode *const FlowRight; const EJoinKind JoinKind; + const EAnyJoinSettings AnyJoinSettings_; const std::vector<ui32> LeftKeyColumns; const std::vector<ui32> RightKeyColumns; const std::vector<ui32> LeftRenames; @@ -880,6 +893,10 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(6)); const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>(); +// const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(8))->AsValue().Get<ui32>()); + + const EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None; + const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0)); const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1)); @@ -915,7 +932,7 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto } return new TGraceJoinWrapper( - ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), + ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations)); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index 0d94e0ba62b..89c82d99d6d 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -20,7 +20,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings TotalPacked++; TempTuple.clear(); - TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize + NumberOfKeyIntColumns); + TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns); if ( NumberOfKeyIColumns > 0 ) { for (ui32 i = 0; i < NumberOfKeyIColumns; i++) { @@ -69,6 +69,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings } +/* ui64 nullsBitmapIdx = NumberOfKeyColumns / (sizeof(ui64) * 8); ui64 remBits = (nullsBitmapIdx + 1) * sizeof(ui64) * 8 - NumberOfKeyColumns; @@ -80,6 +81,13 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings TempTuple[nullsBitmapIdx] = 0; nullsBitmapIdx++; } +*/ + + TempTuple[0] &= (0x1); // Setting only nulls in key bit, all other bits are ignored for key hash + for (ui32 i = 1; i < NullsBitmapSize_; i ++) { + TempTuple[i] = 0; + } + XXH64_hash_t hash = XXH64(TempTuple.data(), TempTuple.size() * sizeof(ui64), 0); @@ -87,18 +95,31 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings ui64 bucket = hash & BucketsMask; - TableBuckets[bucket].TuplesNum++; + std::vector<ui64, TMKQLAllocator<ui64>> & keyIntVals = TableBuckets[bucket].KeyIntVals; std::vector<ui32, TMKQLAllocator<ui32>> & stringsOffsets = TableBuckets[bucket].StringsOffsets; std::vector<ui64, TMKQLAllocator<ui64>> & dataIntVals = TableBuckets[bucket].DataIntVals; std::vector<char, TMKQLAllocator<char>> & stringVals = TableBuckets[bucket].StringsValues; + KeysHashTable & kh = TableBuckets[bucket].AnyHashTable; ui32 offset = keyIntVals.size(); // Offset of tuple inside the keyIntVals vector keyIntVals.push_back(hash); - keyIntVals.insert(keyIntVals.end(), intColumns, intColumns + NullsBitmapSize); - keyIntVals.insert(keyIntVals.end(), TempTuple.begin() + NullsBitmapSize, TempTuple.end()); + keyIntVals.insert(keyIntVals.end(), intColumns, intColumns + NullsBitmapSize_); + keyIntVals.insert(keyIntVals.end(), TempTuple.begin() + NullsBitmapSize_, TempTuple.end()); + + + + if (IsAny_) { + if ( !AddKeysToHashTable(kh, keyIntVals.begin() + offset) ) { + keyIntVals.resize(offset); + return; + } + } + + + TableBuckets[bucket].TuplesNum++; if (NumberOfStringColumns || NumberOfIColumns ) { stringsOffsets.push_back(offset); // Adding offset to tuple in keyIntVals vector @@ -125,7 +146,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings // Adding data values - ui64 * dataColumns = intColumns + NullsBitmapSize + NumberOfKeyIntColumns; + ui64 * dataColumns = intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns; dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns); // Adding strings values for data columns @@ -241,6 +262,30 @@ inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1, return true; } +// Resizes KeysHashTable to new slots, keeps old content. +void ResizeHashTable(KeysHashTable &t, ui64 newSlots){ + + std::vector<ui64> newTable(newSlots * t.SlotSize , 0); + for ( auto it = t.Table.begin(); it != t.Table.end(); it += t.SlotSize ) { + if ( *it == 0) + continue; + ui64 hash = *it; + ui64 newSlotNum = hash % (newSlots); + auto newIt = newTable.begin() + t.SlotSize * newSlotNum; + while (*newIt != 0) { + newIt += t.SlotSize; + if (newIt >= newTable.end()) { + newIt = newTable.begin(); + } + } + std::copy_n(it, t.SlotSize, newIt); + } + t.NSlots = newSlots; + t.Table = std::move(newTable); + +} + + // Joins two tables and returns join result in joined table. Tuples of joined table could be received by // joined table iterator void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples ) { @@ -271,6 +316,8 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef ui64 tuplesFound = 0; ui64 leftIdsMatch = 0; ui64 rightIdsMatch = 0; + ui64 t2AnySkipped = 0; + ui64 t1AnySkipped = 0; std::vector<ui64, TMKQLAllocator<ui64, EMemorySubPool::Temporary>> joinSlots, spillSlots, slotToIdx; std::vector<ui32, TMKQLAllocator<ui32, EMemorySubPool::Temporary>> stringsOffsets1, stringsOffsets2; @@ -288,10 +335,13 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket]; TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket]; + KeysHashTable& kh1 = bucket1->AnyHashTable; + KeysHashTable& kh2 = bucket2->AnyHashTable; + ui64 headerSize1 = JoinTable1->HeaderSize; ui64 headerSize2 = JoinTable2->HeaderSize; - ui64 nullsSize1 = JoinTable1->NullsBitmapSize; - ui64 nullsSize2 = JoinTable2->NullsBitmapSize; + ui64 nullsSize1 = JoinTable1->NullsBitmapSize_; + ui64 nullsSize2 = JoinTable2->NullsBitmapSize_; ui64 numberOfKeyIntColumns1 = JoinTable1->NumberOfKeyIntColumns; ui64 keyIntOffset1 = HashSize + nullsSize1; ui64 keyIntOffset2 = HashSize + nullsSize2; @@ -314,6 +364,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef slotSize = slotSize + avgStringsSize; } + ui64 nSlots = 3 * bucket2->TuplesNum + 1; joinSlots.clear(); spillSlots.clear(); @@ -321,18 +372,27 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef joinSlots.resize(nSlots*slotSize, 0); slotToIdx.resize(nSlots, 0); + kh1.NSlots = nSlots; + kh1.SlotSize = slotSize; + + + kh2.NSlots = nSlots; + kh2.SlotSize = slotSize; + ui32 tuple2Idx = 0; auto it2 = bucket2->KeyIntVals.begin(); while (it2 != bucket2->KeyIntVals.end() ) { + ui64 keysValSize; if ( JoinTable2->NumberOfKeyStringColumns > 0 || JoinTable2->NumberOfKeyIColumns > 0) { keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ; } else { keysValSize = headerSize2; } + ui64 hash = *it2; ui64 * nullsPtr = it2+1; - if (!HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns)) + if (!HasBitSet(nullsPtr, 1)) { ui64 slotNum = hash % nSlots; @@ -375,9 +435,10 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef keysValSize = headerSize1; } + ui64 hash = *it1; ui64 * nullsPtr = it1+1; - if (HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns)) + if (HasBitSet(nullsPtr, 1)) { it1 += keysValSize; tuple1Idx ++; @@ -430,7 +491,8 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef if ( JoinTable1->NumberOfKeyStringColumns == 0) { stringsMatch = true; } else { - if (headerMatch && std::equal( it1 + headerSize1, it1 + headerSize1 + JoinTable1->NumberOfKeyStringColumns, slotStringsStart )) { + ui64 stringsSize = *(it1 + headerSize1 - 1); + if (headerMatch && std::equal( it1 + headerSize1, it1 + headerSize1 + stringsSize, slotStringsStart )) { stringsMatch = true; } } @@ -533,6 +595,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef HasMoreLeftTuples_ = hasMoreLeftTuples; HasMoreRightTuples_ = hasMoreRightTuples; + } inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { @@ -556,14 +619,14 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { } - for ( ui64 i = 0; i < NumberOfKeyIntColumns + NullsBitmapSize; ++i) { + for ( ui64 i = 0; i < NumberOfKeyIntColumns + NullsBitmapSize_; ++i) { td.IntColumns[i] = tb.KeyIntVals[keyIntsOffset + HashSize + i]; } dataIntsOffset = NumberOfDataIntColumns * tupleId; for ( ui64 i = 0; i < NumberOfDataIntColumns; ++i) { - td.IntColumns[NumberOfKeyIntColumns + NullsBitmapSize + i] = tb.DataIntVals[dataIntsOffset + i]; + td.IntColumns[NumberOfKeyIntColumns + NullsBitmapSize_ + i] = tb.DataIntVals[dataIntsOffset + i]; } char *strPtr = nullptr; @@ -636,6 +699,101 @@ inline bool TTable::HasJoinedTupleId(TTable *joinedTable, ui32 &tupleId2) { } } + + +inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys) { + + if (t.NSlots == 0) { + t.SlotSize = HeaderSize + NumberOfKeyStringColumns * 2; + t.Table.resize(DefaultTuplesNum * t.SlotSize, 0); + t.NSlots = DefaultTuplesNum; + } + + if ( ( (t.NSlots - t.FillCount) * 100 ) / t.NSlots < 50 ) { + ResizeHashTable(t, 2 * t.NSlots); + } + + if ( HasBitSet(keys + HashSize, 1)) // Keys with null value + return false; + + ui64 hash = *keys; + ui64 slot = hash % t.NSlots; + auto it = t.Table.begin() + slot * t.SlotSize; + + ui64 keyIntOffset = HashSize + NullsBitmapSize_; + ui64 keysSize = HeaderSize; + ui64 keyStringsSize = 0; + if ( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) { + keyStringsSize = *(keys + HeaderSize - 1); + keysSize = HeaderSize + keyStringsSize; + } + + + while (*it != 0) { + + while (*it == hash) { + + ui64 storedStringsSize = 0; + ui64 storedKeysSize = HeaderSize; + if ( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) { + storedStringsSize = *(it + HeaderSize - 1); + storedKeysSize = HeaderSize + storedStringsSize; + } + + bool headerMatch = false; + bool stringsMatch = false; + bool iValuesMatch = false; + headerMatch = std::equal(it + keyIntOffset, it + HeaderSize, keys + keyIntOffset); + if (!headerMatch) { + break; + } + + if ( headerMatch && !(NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) ) { + return false; + } + + if (storedStringsSize != keyStringsSize) { + break; + } + + ui64 * stringsStart; + if (storedKeysSize < t.SlotSize) { + stringsStart = it + HeaderSize; + } else { + ui64 spillOffset = *(it + HeaderSize); + stringsStart = t.SpillData.begin() + spillOffset; + } + + stringsMatch = std::equal(keys + HeaderSize, keys + HeaderSize + keyStringsSize, stringsStart ); + + if ( headerMatch && stringsMatch ) { + return false; + } + + break; + + } + + it += t.SlotSize; + if (it >= t.Table.end()) { + it = t.Table.begin(); + } + } + + if (keysSize > t.SlotSize) { + ui64 spillDataOffset = t.SpillData.size(); + t.SpillData.insert(t.SpillData.end(), keys + HeaderSize, keys + keysSize); + std::copy_n(keys, HeaderSize, it); + *(it + HeaderSize) = spillDataOffset; + } else { + std::copy_n(keys, keysSize, it); + } + + t.FillCount++; + return true; + +} + inline bool HasRightIdMatch(ui64 currId, ui64 & rightIdIter, const std::vector<ui32, TMKQLAllocator<ui32>> & rightIds) { if (rightIdIter >= rightIds.size()) return false; @@ -993,6 +1151,7 @@ void TTable::Clear() { tb.InterfaceOffsets.clear(); tb.JoinIds.clear(); tb.RightIds.clear(); +// tb.AnyHashTable = KeysHashTable{0, 0, 0, {}, {}}; } @@ -1001,7 +1160,8 @@ void TTable::Clear() { // Creates new table with key columns and data columns TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns, - ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns, TColTypeInterface * colInterfaces ) : + ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns, + ui64 nullsBitmapSize, TColTypeInterface * colInterfaces, bool isAny ) : NumberOfKeyIntColumns(numberOfKeyIntColumns), NumberOfKeyStringColumns(numberOfKeyStringColumns), @@ -1009,7 +1169,9 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, NumberOfDataStringColumns(numberOfDataStringColumns), NumberOfKeyIColumns(numberOfKeyIColumns), NumberOfDataIColumns(numberOfDataIColumns), - ColInterfaces(colInterfaces) { + NullsBitmapSize_(nullsBitmapSize), + ColInterfaces(colInterfaces), + IsAny_(isAny) { NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns; NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns; @@ -1019,11 +1181,10 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, BytesInKeyIntColumns = NumberOfKeyIntColumns * sizeof(ui64); - NullsBitmapSize = NumberOfColumns / (8 * sizeof(ui64)) + 1; TotalStringsSize = (numberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0 ) ? 1 : 0; - HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; + HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; TableBuckets.resize(NumberOfBuckets); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 7725f038496..70cf0a16f79 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -9,7 +9,7 @@ namespace NKikimr { namespace NMiniKQL { namespace GraceJoin { -const ui64 BitsForNumberOfBuckets = 8; // 2^8 = 256 +const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32 const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1; const ui64 NumberOfBuckets = (0x00000001 << BitsForNumberOfBuckets); // Number of hashed keys buckets to distribute incoming tables tuples const ui64 DefaultTuplesNum = 1000; // Default initial number of tuples in one bucket to allocate memory @@ -42,6 +42,14 @@ struct JoinTuplesIds { ui32 id2 = 0; // Identifier of second table tuple as index in bucket }; +// To store keys values when making join only for unique keys (any join attribute) +struct KeysHashTable { + ui64 SlotSize = 0; // Slot size in hash table + ui64 NSlots = 0; // Total number of slots in table + ui64 FillCount = 0; // Number of ui64 slots which are filled + std::vector<ui64> Table; // Table to store keys data in particular slots + std::vector<ui64> SpillData; // Vector to store long data which cannot be fit in single hash table slot. +}; struct TTableBucket { ui64 TuplesNum = 0; // Total number of tuples in bucket @@ -50,8 +58,7 @@ struct TTableBucket { std::vector<char, TMKQLAllocator<char>> StringsValues; // Vector to store data strings values std::vector<ui32, TMKQLAllocator<ui32>> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple. std::vector<char, TMKQLAllocator<char>> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces - std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces - + std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets // of two tables with the same number @@ -59,6 +66,7 @@ struct TTableBucket { std::set<ui32> AllLeftMatchedIds; // All row ids of left join table which have matching rows in right table. To process streaming join mode. std::set<ui32> AllRightMatchedIds; // All row ids of right join table which matching rows in left table. To process streaming join mode. + KeysHashTable AnyHashTable; // Hash table to process join only for unique keys (any join attribute) }; @@ -100,11 +108,11 @@ class TTable { ui64 NumberOfDataColumns = 0; // Number of data columns in the Table ui64 NumberOfStringColumns = 0; // Total number of String Columns ui64 NumberOfIColumns = 0; // Total number of interface-based columns - ui64 NullsBitmapSize = 1; // Default size of ui64 values used for null columns bitmap. + ui64 NullsBitmapSize_ = 1; // Default size of ui64 values used for null columns bitmap. // Every bit set means null value. Order of columns is equal to order in AddTuple call. - // First key int column is bit 0 in bit mask, second - bit 1, etc. Bit 0 is least significant in bitmask. + // First key int column is bit 1 in bit mask, second - bit 2, etc. Bit 0 is least significant in bitmask and tells if key columns contain nulls. ui64 TotalStringsSize = 0; // Bytes in tuple header reserved to store total strings size key tuple columns - ui64 HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size + ui64 HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size ui64 BytesInKeyIntColumns = sizeof(ui64) * NumberOfKeyIntColumns; @@ -145,6 +153,9 @@ class TTable { // True if current iterator of tuple in joinedTable has corresponding joined tuple in second table. Id of joined tuple in second table returns in tupleId2. inline bool HasJoinedTupleId(TTable* joinedTable, ui32& tupleId2); + // Adds keys to KeysHashTable, return true if added, false if equal key already added + inline bool AddKeysToHashTable(KeysHashTable& t, ui64* keys); + ui64 TotalPacked = 0; // Total number of packed tuples ui64 TotalUnpacked = 0; // Total number of unpacked tuples @@ -154,6 +165,8 @@ class TTable { bool HasMoreLeftTuples_ = false; // True if join is not completed, rows from left table are coming bool HasMoreRightTuples_ = false; // True if join is not completed, rows from right table are coming + bool IsAny_ = false; // True if key duplicates need to be removed from table (any join) + public: // Adds new tuple to the table. intColumns, stringColumns - data of columns, @@ -182,7 +195,8 @@ public: // Creates new table with key columns and data columns TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0, ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0, - ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, TColTypeInterface * colInterfaces = nullptr); + ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, + ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false); }; diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp index 53d36b03bcd..a1939e7e786 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp @@ -199,15 +199,15 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { - const ui64 BigTableTuples = 60000; - const ui64 SmallTableTuples = 15000; + const ui64 BigTableTuples = 600000; + const ui64 SmallTableTuples = 150000; const ui64 BigTupleSize = 32; std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now(); for ( ui64 i = 0; i < BigTableTuples; i++) { - tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples ); + tuple[1] = std::rand() % SmallTableTuples; tuple[2] = tuple[1]; bigTable.AddTuple(tuple, strVals, strSizes); } @@ -215,7 +215,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { smallTable.AddTuple(tuple, bigStrVal, bigStrSize); for ( ui64 i = 0; i < SmallTableTuples + 1; i++) { - tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples ); + tuple[1] = std::rand() % SmallTableTuples; tuple[2] = tuple[1]; smallTable.AddTuple(tuple, strVals, strSizes); } @@ -233,7 +233,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { for ( ui64 i = 0; i < BigTableTuples; i++) { - tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples ); + tuple[1] = std::rand() % SmallTableTuples; tuple[2] = tuple[1]; bigTable.AddTuple(tuple, strVals, strSizes); } @@ -241,7 +241,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { smallTable.AddTuple(tuple, bigStrVal, bigStrSize); for ( ui64 i = 0; i < SmallTableTuples + 1; i++) { - tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples ); + tuple[1] = std::rand() % SmallTableTuples; tuple[2] = tuple[1]; smallTable.AddTuple(tuple, strVals, strSizes); } @@ -326,6 +326,137 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { CTEST << Endl; + } +} + +Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinAnyTest) { + Y_UNIT_TEST_LLVM(TestImp2) { + TSetup<LLVM> setup; + ui64 tuple[11] = {0,1,2,3,4,5,6,7,8,9,10}; + ui32 strSizes[2] = {4, 4}; + char * strVals[] = {(char *)"aaaaa", (char *)"bbbb"}; + + char * bigStrVal[] = {(char *)"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + (char *)"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}; + ui32 bigStrSize[2] = {151, 151}; + + + + GraceJoin::TTable bigTable (1,1,1,1,0,0,1, nullptr, true); + GraceJoin::TTable smallTable(1,1,1,1,0,0,1, nullptr, true); + GraceJoin::TTable joinTable (1,1,1,1,0,0,1, nullptr, true); + + std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now(); + + const ui64 TupleSize = 1024; + + ui64 bigTuple[TupleSize]; + + for (ui64 i = 0; i < TupleSize; i++) { + bigTuple[i] = std::rand() / ( RAND_MAX / 10000 ); + } + + ui64 milliseconds = 0; + + + + const ui64 BigTableTuples = 600000; + const ui64 SmallTableTuples = 150000; + const ui64 BigTupleSize = 32; + + std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now(); + + + for ( ui64 i = 0; i < BigTableTuples; i++) { + tuple[1] = i % SmallTableTuples; + tuple[2] = tuple[1]; + bigTable.AddTuple(tuple, strVals, strSizes); + } + + smallTable.AddTuple(tuple, bigStrVal, bigStrSize); + + for ( ui64 i = 0; i < SmallTableTuples + 1; i++) { + tuple[1] = i; + tuple[2] = tuple[1]; + smallTable.AddTuple(tuple, strVals, strSizes); + } + + std::chrono::steady_clock::time_point end03 = std::chrono::steady_clock::now(); + milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(end03 - begin03).count(); + CTEST << "Time for hash = " << milliseconds << "[ms]" << Endl; + CTEST << "Adding tuples speed: " << (BigTupleSize * (BigTableTuples + SmallTableTuples) * 1000) / ( milliseconds * 1024 * 1024) << "MB/sec" << Endl; + CTEST << Endl; + + std::vector<ui64> vals1, vals2; + std::vector<char *> strVals1, strVals2; + std::vector<ui32> strSizes1, strSizes2; + GraceJoin::TupleData td1, td2; + vals1.resize(100); + vals2.resize(100); + strVals1.resize(100); + strVals2.resize(100); + strSizes1.resize(100); + strSizes2.resize(100); + td1.IntColumns = vals1.data(); + td1.StrColumns = strVals1.data(); + td1.StrSizes = strSizes1.data(); + td2.IntColumns = vals2.data(); + td2.StrColumns = strVals2.data(); + td2.StrSizes = strSizes2.data(); + + ui64 numBigTuples = 0; + bigTable.ResetIterator(); + + std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now(); + + while(bigTable.NextTuple(td1)) { numBigTuples++; } + + CTEST << "Num of big tuples 1: " << numBigTuples << Endl; + + std::chrono::steady_clock::time_point end04 = std::chrono::steady_clock::now(); + CTEST << "Time for get 1 = " << std::chrono::duration_cast<std::chrono::milliseconds>(end04 - begin04).count() << "[ms]" << Endl; + CTEST << Endl; + + numBigTuples = 0; + bigTable.ResetIterator(); + + std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now(); + + while(bigTable.NextTuple(td2)) { numBigTuples++; } + + CTEST << "Num of big tuples 2: " << numBigTuples << Endl; + + std::chrono::steady_clock::time_point end041 = std::chrono::steady_clock::now(); + CTEST << "Time for get 2 = " << std::chrono::duration_cast<std::chrono::milliseconds>(end041 - begin041).count() << "[ms]" << Endl; + CTEST << Endl; + + + std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now(); + + joinTable.Join(smallTable,bigTable); + + std::chrono::steady_clock::time_point end05 = std::chrono::steady_clock::now(); + CTEST << "Time for join = " << std::chrono::duration_cast<std::chrono::milliseconds>(end05 - begin05).count() << "[ms]" << Endl; + CTEST << Endl; + + joinTable.ResetIterator(); + ui64 numJoinedTuples = 0; + + + std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now(); + + while(joinTable.NextJoinedData(td1, td2)) { numJoinedTuples++; } + + CTEST << "Num of joined tuples : " << numJoinedTuples << Endl; + + std::chrono::steady_clock::time_point end042 = std::chrono::steady_clock::now(); + CTEST << "Time for get joined tuples: = " << std::chrono::duration_cast<std::chrono::milliseconds>(end042 - begin042).count() << "[ms]" << Endl; + CTEST << Endl; + + + std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); + CTEST << "Time difference = " << std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count() << "[ms]" << Endl; + CTEST << Endl; } @@ -388,12 +519,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { NUdf::TUnboxedValue tuple; UNIT_ASSERT(iterator.Next(tuple)); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B"); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X"); - UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y"); UNIT_ASSERT(iterator.Next(tuple)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z"); + UNIT_ASSERT(iterator.Next(tuple)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X"); UNIT_ASSERT(!iterator.Next(tuple)); } @@ -525,12 +658,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { NUdf::TUnboxedValue tuple; UNIT_ASSERT(iterator.Next(tuple)); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B"); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X"); - UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y"); UNIT_ASSERT(iterator.Next(tuple)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z"); + UNIT_ASSERT(iterator.Next(tuple)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X"); UNIT_ASSERT(!iterator.Next(tuple)); } @@ -1263,6 +1398,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { UNIT_ASSERT(iterator.Next(tuple)); + UNIT_ASSERT(!tuple.GetElement(0)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z"); + UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B"); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X"); UNIT_ASSERT(iterator.Next(tuple)); @@ -1274,9 +1412,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y"); - UNIT_ASSERT(iterator.Next(tuple)); - UNIT_ASSERT(!tuple.GetElement(0)); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z"); UNIT_ASSERT(!iterator.Next(tuple)); UNIT_ASSERT(!iterator.Next(tuple)); } @@ -1403,11 +1538,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { NUdf::TUnboxedValue tuple; UNIT_ASSERT(iterator.Next(tuple)); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); - UNIT_ASSERT(!tuple.GetElement(1)); - UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "Z"); UNIT_ASSERT_VALUES_EQUAL(tuple.GetElement(1).Get<ui32>(), 3); + UNIT_ASSERT(iterator.Next(tuple)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); + UNIT_ASSERT(!tuple.GetElement(1)); UNIT_ASSERT(!iterator.Next(tuple)); UNIT_ASSERT(!iterator.Next(tuple)); } |