diff options
author | aakulaga <aakulaga@ydb.tech> | 2023-04-25 17:06:31 +0300 |
---|---|---|
committer | aakulaga <aakulaga@ydb.tech> | 2023-04-25 17:06:31 +0300 |
commit | 65f6a96c441ead6ae099f36f15473eaa3de654b5 (patch) | |
tree | 949faadb87e932e3b77d04358909ceebf6fc6f17 | |
parent | 4ef444a11e91cc3be7a395c0d212a2602f20cce6 (diff) | |
download | ydb-65f6a96c441ead6ae099f36f15473eaa3de654b5.tar.gz |
Fix stream mode for duplicates
Fix stream mode for duplicates
4 files changed, 162 insertions, 27 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index c662dcf37b..f94ced2d5f 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -20,6 +20,7 @@ #include <chrono> #include <format> +#include <limits> namespace NKikimr { namespace NMiniKQL { @@ -48,6 +49,7 @@ struct TGraceJoinPacker { ui64 TuplesPacked = 0; // Total number of packed tuples ui64 TuplesBatchPacked = 0; // Number of tuples packed during current join batch ui64 TuplesUnpacked = 0; // Total number of unpacked tuples + ui64 BatchSize = PartialJoinBatchSize; // Batch size for partial table packing and join std::chrono::time_point<std::chrono::system_clock> StartTime; // Start time of execution std::chrono::time_point<std::chrono::system_clock> EndTime; // End time of execution std::vector<ui64> TupleIntVals; // Packed value of all fixed length values of table tuple. Keys columns should be packed first. @@ -564,7 +566,13 @@ public: , LeftKeyColumns(leftKeyColumns) , RightKeyColumns(rightKeyColumns) , LeftRenames(leftRenames) - , RightRenames(rightRenames) {} + , RightRenames(rightRenames) + { + if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion ) { + LeftPacker->BatchSize = std::numeric_limits<ui64>::max(); + RightPacker->BatchSize = std::numeric_limits<ui64>::max(); + } + } private: IComputationWideFlowNode* const FlowLeft; IComputationWideFlowNode* const FlowRight; @@ -820,17 +828,17 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox *HaveMoreRightRows = false; } - if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= PartialJoinBatchSize ) { + if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize ) { *PartialJoinCompleted = true; - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind); + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); JoinedTablePtr->ResetIterator(); } - if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= PartialJoinBatchSize ) { + if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize ) { *PartialJoinCompleted = true; - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind); + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); JoinedTablePtr->ResetIterator(); } @@ -839,7 +847,7 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox *PartialJoinCompleted = true; LeftPacker->StartTime = std::chrono::system_clock::now(); RightPacker->StartTime = std::chrono::system_clock::now(); - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind); + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); JoinedTablePtr->ResetIterator(); LeftPacker->EndTime = std::chrono::system_clock::now(); RightPacker->EndTime = std::chrono::system_clock::now(); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index 76ea555487..31de921820 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -243,9 +243,16 @@ inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1, // Joins two tables and returns join result in joined table. Tuples of joined table could be received by // joined table iterator -void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { +void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples ) { + + if ( hasMoreLeftTuples ) + LeftTableBatch_ = true; + + if( hasMoreRightTuples ) + RightTableBatch_ = true; + JoinTable1 = &t1; JoinTable2 = &t2; @@ -262,6 +269,9 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { ui64 tuplesFound = 0; + ui64 leftIdsMatch = 0; + ui64 rightIdsMatch = 0; + std::vector<ui64, TMKQLAllocator<ui64, EMemorySubPool::Temporary>> joinSlots, spillSlots, slotToIdx; std::vector<ui32, TMKQLAllocator<ui32, EMemorySubPool::Temporary>> stringsOffsets1, stringsOffsets2; ui64 reservedSize = 6 * (DefaultTupleBytes * DefaultTuplesNum) / sizeof(ui64); @@ -479,18 +489,42 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { TableBuckets[bucket].JoinIds.assign(joinResults.begin(), joinResults.end()); + + std::vector<ui32, TMKQLAllocator<ui32>> & rightIds = TableBuckets[bucket].RightIds; + std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> & joinIds = TableBuckets[bucket].JoinIds; + std::set<ui32> & leftMatchedIds = TableBuckets[bucket].AllLeftMatchedIds; + std::set<ui32> & rightMatchedIds = TableBuckets[bucket].AllRightMatchedIds; + if ( JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion ) { - std::vector<ui32, TMKQLAllocator<ui32>> & rightIds = TableBuckets[bucket].RightIds; - std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> & joinIds = TableBuckets[bucket].JoinIds; rightIds.clear(); rightIds.reserve(joinIds.size()); for (const auto & id: joinIds) { rightIds.emplace_back(id.id2); + if (LeftTableBatch_ || RightTableBatch_) { + leftMatchedIds.insert(id.id1); + rightMatchedIds.insert(id.id2); + } } std::sort(rightIds.begin(), rightIds.end()); } + + if ( (JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly || JoinKind == EJoinKind::LeftSemi || + JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::RightSemi ) && + (RightTableBatch_ || LeftTableBatch_) ) + { + for (auto & jid: joinIds ) { + leftMatchedIds.insert(jid.id1); + } + leftIdsMatch += leftMatchedIds.size(); + + } + + } + HasMoreLeftTuples_ = hasMoreLeftTuples; + HasMoreRightTuples_ = hasMoreRightTuples; + } inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { @@ -649,7 +683,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } if ( JoinKind == EJoinKind::Left ) { - if(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while (HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { ui32 tupleId2; if (HasJoinedTupleId(JoinTable1, tupleId2)) { @@ -661,6 +695,17 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { return true; } else { + if (RightTableBatch_ && HasMoreRightTuples_ ) { + JoinTable1->CurrIterIndex++; + continue; + } + + std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds; + if ( leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex)) { + JoinTable1->CurrIterIndex++; + continue; + } + JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td1); td2.AllNulls = true; } @@ -673,7 +718,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } if ( JoinKind == EJoinKind::Right ) { - if(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { ui32 tupleId2; if (HasJoinedTupleId(JoinTable1, tupleId2)) { @@ -684,6 +729,18 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { if ( (CurrJoinIdsIterIndex == jids.size()) || ( JoinTable1->CurrIterIndex != jids[CurrJoinIdsIterIndex].id1) ) JoinTable1->CurrIterIndex++; return true; } else { + if (LeftTableBatch_ && HasMoreLeftTuples_ ) { + JoinTable1->CurrIterIndex++; + continue; + } + + std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds; + if ( leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex)) { + JoinTable1->CurrIterIndex++; + continue; + } + + JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td2); td1.AllNulls = true; } @@ -698,9 +755,20 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { if (JoinKind == EJoinKind::LeftOnly ) { + + if ( RightTableBatch_ && HasMoreRightTuples_ ) + return false; + while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { ui32 tupleId2; - if (!HasJoinedTupleId(JoinTable1, tupleId2)) + + bool globalMatchedId = false; + if ( RightTableBatch_ || LeftTableBatch_ ) { + std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds; + globalMatchedId = leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex); + } + + if (!HasJoinedTupleId(JoinTable1, tupleId2) && !globalMatchedId ) { JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td1); td2.AllNulls = true; @@ -718,9 +786,20 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } if (JoinKind == EJoinKind::RightOnly ) { + + if (LeftTableBatch_ && HasMoreLeftTuples_ ) + return false; + while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { ui32 tupleId2; - if (!HasJoinedTupleId(JoinTable1, tupleId2)) + + bool globalMatchedId = false; + if ( RightTableBatch_ || LeftTableBatch_ ) { + std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds; + globalMatchedId = leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex); + } + + if (!HasJoinedTupleId(JoinTable1, tupleId2) && !globalMatchedId ) { JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td2); td1.AllNulls = true; @@ -738,10 +817,15 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } - if ( JoinKind == EJoinKind::LeftSemi ) { + if ( JoinKind == EJoinKind::LeftSemi) { + + if (RightTableBatch_ && HasMoreRightTuples_ ) + return false; + while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { ui32 tupleId2; - if (HasJoinedTupleId(JoinTable1, tupleId2)) + + if ( !RightTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2)) { JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td1); @@ -752,15 +836,29 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { JoinTable1->CurrIterIndex++; return true; } + + + std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds; + if ( RightTableBatch_ && leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex) ) { + JoinTable1->GetTupleData(JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, td1); + td2.AllNulls = true; + JoinTable1->CurrIterIndex++; + return true; + } + JoinTable1->CurrIterIndex++; } return false; } if ( JoinKind == EJoinKind::RightSemi ) { + + if (LeftTableBatch_ && HasMoreLeftTuples_ ) + return false; + while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { ui32 tupleId2; - if (HasJoinedTupleId(JoinTable1, tupleId2)) + if ( !LeftTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2)) { JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td2); td1.AllNulls = true; @@ -770,6 +868,15 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { JoinTable1->CurrIterIndex++; return true; } + + std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds; + if ( LeftTableBatch_ && leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex) ) { + JoinTable1->GetTupleData(JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, td2); + td2.AllNulls = true; + JoinTable1->CurrIterIndex++; + return true; + } + JoinTable1->CurrIterIndex++; } return false; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 2f34e19e4f..7725f03849 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -52,9 +52,14 @@ struct TTableBucket { std::vector<char, TMKQLAllocator<char>> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces - std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets - // of two tables with the same number + std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets + // of two tables with the same number + std::vector<ui32, TMKQLAllocator<ui32>> RightIds; // Sorted Ids of right table joined tuples to process full join and exclusion join + + std::set<ui32> AllLeftMatchedIds; // All row ids of left join table which have matching rows in right table. To process streaming join mode. + std::set<ui32> AllRightMatchedIds; // All row ids of right join table which matching rows in left table. To process streaming join mode. + }; @@ -143,6 +148,12 @@ class TTable { ui64 TotalPacked = 0; // Total number of packed tuples ui64 TotalUnpacked = 0; // Total number of unpacked tuples + bool LeftTableBatch_ = false; // True if left table is processed in batch mode + bool RightTableBatch_ = false; // True if right table is procesed in batch mode + + bool HasMoreLeftTuples_ = false; // True if join is not completed, rows from left table are coming + bool HasMoreRightTuples_ = false; // True if join is not completed, rows from right table are coming + public: // Adds new tuple to the table. intColumns, stringColumns - data of columns, @@ -158,7 +169,8 @@ public: // Joins two tables and stores join result in table data. Tuples of joined table could be received by // joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table - void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner); + // hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false + void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false ); // Returns next jointed tuple data. Returs true if there are more tuples diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp index 2bb910d4ad..53d36b03bc 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp @@ -332,7 +332,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { } - Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { Y_UNIT_TEST_LLVM(TestInner1) { @@ -827,12 +826,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { const auto key2 = pb.NewDataLiteral<ui32>(2); const auto key3 = pb.NewDataLiteral<ui32>(2); const auto key4 = pb.NewDataLiteral<ui32>(3); + const auto key5 = pb.NewDataLiteral<ui32>(4); const auto payload1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("A"); const auto payload2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("B"); const auto payload3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("C"); - const auto payload4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("X"); - const auto payload5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Y"); - const auto payload6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Z"); + const auto payload4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("D"); + const auto payload5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("X"); + const auto payload6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Y"); + const auto payload7 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Z"); const auto tupleType = pb.NewTupleType({ pb.NewDataType(NUdf::TDataType<ui32>::Id), @@ -842,13 +843,15 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { const auto list1 = pb.NewList(tupleType, { pb.NewTuple({key1, payload1}), pb.NewTuple({key2, payload2}), - pb.NewTuple({key3, payload3}) + pb.NewTuple({key3, payload3}), + pb.NewTuple({key4, payload4}), + pb.NewTuple({key5, payload4}) }); const auto list2 = pb.NewList(tupleType, { - pb.NewTuple({key2, payload4}), - pb.NewTuple({key3, payload5}), - pb.NewTuple({key4, payload6}) + pb.NewTuple({key2, payload5}), + pb.NewTuple({key3, payload6}), + pb.NewTuple({key4, payload7}) }); const auto resultType = pb.NewFlowType(pb.NewMultiType({ @@ -868,8 +871,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { NUdf::TUnboxedValue tuple; UNIT_ASSERT(iterator.Next(tuple)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "D"); + UNIT_ASSERT_VALUES_EQUAL(tuple.GetElement(1).Get<ui32>(), 4); + + UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A"); UNIT_ASSERT_VALUES_EQUAL(tuple.GetElement(1).Get<ui32>(), 1); + UNIT_ASSERT(!iterator.Next(tuple)); UNIT_ASSERT(!iterator.Next(tuple)); } |