aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraakulaga <aakulaga@ydb.tech>2023-04-25 17:06:31 +0300
committeraakulaga <aakulaga@ydb.tech>2023-04-25 17:06:31 +0300
commit65f6a96c441ead6ae099f36f15473eaa3de654b5 (patch)
tree949faadb87e932e3b77d04358909ceebf6fc6f17
parent4ef444a11e91cc3be7a395c0d212a2602f20cce6 (diff)
downloadydb-65f6a96c441ead6ae099f36f15473eaa3de654b5.tar.gz
Fix stream mode for duplicates
Fix stream mode for duplicates
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp20
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp127
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h18
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp24
4 files changed, 162 insertions, 27 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index c662dcf37b..f94ced2d5f 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -20,6 +20,7 @@
#include <chrono>
#include <format>
+#include <limits>
namespace NKikimr {
namespace NMiniKQL {
@@ -48,6 +49,7 @@ struct TGraceJoinPacker {
ui64 TuplesPacked = 0; // Total number of packed tuples
ui64 TuplesBatchPacked = 0; // Number of tuples packed during current join batch
ui64 TuplesUnpacked = 0; // Total number of unpacked tuples
+ ui64 BatchSize = PartialJoinBatchSize; // Batch size for partial table packing and join
std::chrono::time_point<std::chrono::system_clock> StartTime; // Start time of execution
std::chrono::time_point<std::chrono::system_clock> EndTime; // End time of execution
std::vector<ui64> TupleIntVals; // Packed value of all fixed length values of table tuple. Keys columns should be packed first.
@@ -564,7 +566,13 @@ public:
, LeftKeyColumns(leftKeyColumns)
, RightKeyColumns(rightKeyColumns)
, LeftRenames(leftRenames)
- , RightRenames(rightRenames) {}
+ , RightRenames(rightRenames)
+ {
+ if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion ) {
+ LeftPacker->BatchSize = std::numeric_limits<ui64>::max();
+ RightPacker->BatchSize = std::numeric_limits<ui64>::max();
+ }
+ }
private:
IComputationWideFlowNode* const FlowLeft;
IComputationWideFlowNode* const FlowRight;
@@ -820,17 +828,17 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox
*HaveMoreRightRows = false;
}
- if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= PartialJoinBatchSize ) {
+ if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize ) {
*PartialJoinCompleted = true;
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind);
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
JoinedTablePtr->ResetIterator();
}
- if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= PartialJoinBatchSize ) {
+ if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize ) {
*PartialJoinCompleted = true;
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind);
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
JoinedTablePtr->ResetIterator();
}
@@ -839,7 +847,7 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox
*PartialJoinCompleted = true;
LeftPacker->StartTime = std::chrono::system_clock::now();
RightPacker->StartTime = std::chrono::system_clock::now();
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind);
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
JoinedTablePtr->ResetIterator();
LeftPacker->EndTime = std::chrono::system_clock::now();
RightPacker->EndTime = std::chrono::system_clock::now();
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
index 76ea555487..31de921820 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -243,9 +243,16 @@ inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1,
// Joins two tables and returns join result in joined table. Tuples of joined table could be received by
// joined table iterator
-void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
+void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples ) {
+
+ if ( hasMoreLeftTuples )
+ LeftTableBatch_ = true;
+
+ if( hasMoreRightTuples )
+ RightTableBatch_ = true;
+
JoinTable1 = &t1;
JoinTable2 = &t2;
@@ -262,6 +269,9 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
ui64 tuplesFound = 0;
+ ui64 leftIdsMatch = 0;
+ ui64 rightIdsMatch = 0;
+
std::vector<ui64, TMKQLAllocator<ui64, EMemorySubPool::Temporary>> joinSlots, spillSlots, slotToIdx;
std::vector<ui32, TMKQLAllocator<ui32, EMemorySubPool::Temporary>> stringsOffsets1, stringsOffsets2;
ui64 reservedSize = 6 * (DefaultTupleBytes * DefaultTuplesNum) / sizeof(ui64);
@@ -479,18 +489,42 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
TableBuckets[bucket].JoinIds.assign(joinResults.begin(), joinResults.end());
+
+ std::vector<ui32, TMKQLAllocator<ui32>> & rightIds = TableBuckets[bucket].RightIds;
+ std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> & joinIds = TableBuckets[bucket].JoinIds;
+ std::set<ui32> & leftMatchedIds = TableBuckets[bucket].AllLeftMatchedIds;
+ std::set<ui32> & rightMatchedIds = TableBuckets[bucket].AllRightMatchedIds;
+
if ( JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion ) {
- std::vector<ui32, TMKQLAllocator<ui32>> & rightIds = TableBuckets[bucket].RightIds;
- std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> & joinIds = TableBuckets[bucket].JoinIds;
rightIds.clear();
rightIds.reserve(joinIds.size());
for (const auto & id: joinIds) {
rightIds.emplace_back(id.id2);
+ if (LeftTableBatch_ || RightTableBatch_) {
+ leftMatchedIds.insert(id.id1);
+ rightMatchedIds.insert(id.id2);
+ }
}
std::sort(rightIds.begin(), rightIds.end());
}
+
+ if ( (JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly || JoinKind == EJoinKind::LeftSemi ||
+ JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::RightSemi ) &&
+ (RightTableBatch_ || LeftTableBatch_) )
+ {
+ for (auto & jid: joinIds ) {
+ leftMatchedIds.insert(jid.id1);
+ }
+ leftIdsMatch += leftMatchedIds.size();
+
+ }
+
+
}
+ HasMoreLeftTuples_ = hasMoreLeftTuples;
+ HasMoreRightTuples_ = hasMoreRightTuples;
+
}
inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
@@ -649,7 +683,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}
if ( JoinKind == EJoinKind::Left ) {
- if(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
+ while (HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
@@ -661,6 +695,17 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
return true;
} else {
+ if (RightTableBatch_ && HasMoreRightTuples_ ) {
+ JoinTable1->CurrIterIndex++;
+ continue;
+ }
+
+ std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds;
+ if ( leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex)) {
+ JoinTable1->CurrIterIndex++;
+ continue;
+ }
+
JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td1);
td2.AllNulls = true;
}
@@ -673,7 +718,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}
if ( JoinKind == EJoinKind::Right ) {
- if(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
+ while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
@@ -684,6 +729,18 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if ( (CurrJoinIdsIterIndex == jids.size()) || ( JoinTable1->CurrIterIndex != jids[CurrJoinIdsIterIndex].id1) ) JoinTable1->CurrIterIndex++;
return true;
} else {
+ if (LeftTableBatch_ && HasMoreLeftTuples_ ) {
+ JoinTable1->CurrIterIndex++;
+ continue;
+ }
+
+ std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds;
+ if ( leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex)) {
+ JoinTable1->CurrIterIndex++;
+ continue;
+ }
+
+
JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td2);
td1.AllNulls = true;
}
@@ -698,9 +755,20 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if (JoinKind == EJoinKind::LeftOnly ) {
+
+ if ( RightTableBatch_ && HasMoreRightTuples_ )
+ return false;
+
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
ui32 tupleId2;
- if (!HasJoinedTupleId(JoinTable1, tupleId2))
+
+ bool globalMatchedId = false;
+ if ( RightTableBatch_ || LeftTableBatch_ ) {
+ std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds;
+ globalMatchedId = leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex);
+ }
+
+ if (!HasJoinedTupleId(JoinTable1, tupleId2) && !globalMatchedId )
{
JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td1);
td2.AllNulls = true;
@@ -718,9 +786,20 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}
if (JoinKind == EJoinKind::RightOnly ) {
+
+ if (LeftTableBatch_ && HasMoreLeftTuples_ )
+ return false;
+
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
ui32 tupleId2;
- if (!HasJoinedTupleId(JoinTable1, tupleId2))
+
+ bool globalMatchedId = false;
+ if ( RightTableBatch_ || LeftTableBatch_ ) {
+ std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds;
+ globalMatchedId = leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex);
+ }
+
+ if (!HasJoinedTupleId(JoinTable1, tupleId2) && !globalMatchedId )
{
JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td2);
td1.AllNulls = true;
@@ -738,10 +817,15 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}
- if ( JoinKind == EJoinKind::LeftSemi ) {
+ if ( JoinKind == EJoinKind::LeftSemi) {
+
+ if (RightTableBatch_ && HasMoreRightTuples_ )
+ return false;
+
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
ui32 tupleId2;
- if (HasJoinedTupleId(JoinTable1, tupleId2))
+
+ if ( !RightTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
{
JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td1);
@@ -752,15 +836,29 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
JoinTable1->CurrIterIndex++;
return true;
}
+
+
+ std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds;
+ if ( RightTableBatch_ && leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex) ) {
+ JoinTable1->GetTupleData(JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, td1);
+ td2.AllNulls = true;
+ JoinTable1->CurrIterIndex++;
+ return true;
+ }
+
JoinTable1->CurrIterIndex++;
}
return false;
}
if ( JoinKind == EJoinKind::RightSemi ) {
+
+ if (LeftTableBatch_ && HasMoreLeftTuples_ )
+ return false;
+
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
ui32 tupleId2;
- if (HasJoinedTupleId(JoinTable1, tupleId2))
+ if ( !LeftTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
{
JoinTable1->GetTupleData(CurrIterBucket, JoinTable1->CurrIterIndex, td2);
td1.AllNulls = true;
@@ -770,6 +868,15 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
JoinTable1->CurrIterIndex++;
return true;
}
+
+ std::set<ui32> & leftMatchedIds = TableBuckets[JoinTable1->CurrIterBucket].AllLeftMatchedIds;
+ if ( LeftTableBatch_ && leftMatchedIds.contains( (ui32) JoinTable1->CurrIterIndex) ) {
+ JoinTable1->GetTupleData(JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, td2);
+ td2.AllNulls = true;
+ JoinTable1->CurrIterIndex++;
+ return true;
+ }
+
JoinTable1->CurrIterIndex++;
}
return false;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
index 2f34e19e4f..7725f03849 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
@@ -52,9 +52,14 @@ struct TTableBucket {
std::vector<char, TMKQLAllocator<char>> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces
std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces
- std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets
- // of two tables with the same number
+ std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets
+ // of two tables with the same number
+
std::vector<ui32, TMKQLAllocator<ui32>> RightIds; // Sorted Ids of right table joined tuples to process full join and exclusion join
+
+ std::set<ui32> AllLeftMatchedIds; // All row ids of left join table which have matching rows in right table. To process streaming join mode.
+ std::set<ui32> AllRightMatchedIds; // All row ids of right join table which matching rows in left table. To process streaming join mode.
+
};
@@ -143,6 +148,12 @@ class TTable {
ui64 TotalPacked = 0; // Total number of packed tuples
ui64 TotalUnpacked = 0; // Total number of unpacked tuples
+ bool LeftTableBatch_ = false; // True if left table is processed in batch mode
+ bool RightTableBatch_ = false; // True if right table is procesed in batch mode
+
+ bool HasMoreLeftTuples_ = false; // True if join is not completed, rows from left table are coming
+ bool HasMoreRightTuples_ = false; // True if join is not completed, rows from right table are coming
+
public:
// Adds new tuple to the table. intColumns, stringColumns - data of columns,
@@ -158,7 +169,8 @@ public:
// Joins two tables and stores join result in table data. Tuples of joined table could be received by
// joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table
- void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner);
+ // hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false
+ void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false );
// Returns next jointed tuple data. Returs true if there are more tuples
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
index 2bb910d4ad..53d36b03bc 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
@@ -332,7 +332,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
}
-
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
Y_UNIT_TEST_LLVM(TestInner1) {
@@ -827,12 +826,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
const auto key2 = pb.NewDataLiteral<ui32>(2);
const auto key3 = pb.NewDataLiteral<ui32>(2);
const auto key4 = pb.NewDataLiteral<ui32>(3);
+ const auto key5 = pb.NewDataLiteral<ui32>(4);
const auto payload1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("A");
const auto payload2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("B");
const auto payload3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("C");
- const auto payload4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("X");
- const auto payload5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Y");
- const auto payload6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Z");
+ const auto payload4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("D");
+ const auto payload5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("X");
+ const auto payload6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Y");
+ const auto payload7 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Z");
const auto tupleType = pb.NewTupleType({
pb.NewDataType(NUdf::TDataType<ui32>::Id),
@@ -842,13 +843,15 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
const auto list1 = pb.NewList(tupleType, {
pb.NewTuple({key1, payload1}),
pb.NewTuple({key2, payload2}),
- pb.NewTuple({key3, payload3})
+ pb.NewTuple({key3, payload3}),
+ pb.NewTuple({key4, payload4}),
+ pb.NewTuple({key5, payload4})
});
const auto list2 = pb.NewList(tupleType, {
- pb.NewTuple({key2, payload4}),
- pb.NewTuple({key3, payload5}),
- pb.NewTuple({key4, payload6})
+ pb.NewTuple({key2, payload5}),
+ pb.NewTuple({key3, payload6}),
+ pb.NewTuple({key4, payload7})
});
const auto resultType = pb.NewFlowType(pb.NewMultiType({
@@ -868,8 +871,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
NUdf::TUnboxedValue tuple;
UNIT_ASSERT(iterator.Next(tuple));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "D");
+ UNIT_ASSERT_VALUES_EQUAL(tuple.GetElement(1).Get<ui32>(), 4);
+
+ UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A");
UNIT_ASSERT_VALUES_EQUAL(tuple.GetElement(1).Get<ui32>(), 1);
+
UNIT_ASSERT(!iterator.Next(tuple));
UNIT_ASSERT(!iterator.Next(tuple));
}