aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraakulaga <aakulaga@ydb.tech>2023-06-08 15:13:31 +0300
committeraakulaga <aakulaga@ydb.tech>2023-06-08 15:13:31 +0300
commitd2704d925a4e0569a9ed1e8ce9b4287bbd9503b4 (patch)
treecfe08d99db82b56ced7e868d0eba7d8f3c1b43a5
parent51b3066edda71a1c65129e7e5f85aac77a4138b2 (diff)
downloadydb-d2704d925a4e0569a9ed1e8ce9b4287bbd9503b4.tar.gz
Any join attribute added
Any join attribute added First versin of Any
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp53
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp193
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h28
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp171
4 files changed, 386 insertions, 59 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index 0e968459c2e..89aa7b057e1 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -69,16 +69,19 @@ struct TGraceJoinPacker {
ui64 TotalIntColumnsNum = 0; // Total number of int columns
ui64 TotalStrColumnsNum = 0; // Total number of string columns
ui64 TotalIColumnsNum = 0; // Total number of interface-based columns
- ui64 KeyIntColumnsNum = 0; // Total number of key int columns
+ ui64 KeyIntColumnsNum = 0; // Total number of key int columns in original table
+ ui64 PackedKeyIntColumnsNum = 0; // Length of ui64 array containing data of all key int columns after packing
ui64 KeyStrColumnsNum = 0; // Total number of key string columns
ui64 KeyIColumnsNum = 0; // Total number of interface-based columns
ui64 DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum;
+ ui64 PackedDataIntColumnsNum = 0; // Length of ui64 array containing data of all non-key int columns after packing
ui64 DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum;
ui64 DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum;
std::vector<GraceJoin::TColTypeInterface> ColumnInterfaces;
+ bool IsAny; // Flag to support any join attribute
inline void Pack() ; // Packs new tuple from TupleHolder and TuplePtrs to TupleIntVals, TupleStrSizes, TupleStrings
inline void UnPack(); // Unpacks packed values from TupleIntVals, TupleStrSizes, TupleStrings into TupleHolder and TuplePtrs
- TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory);
+ TGraceJoinPacker(const std::vector<TType*>& columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny);
};
@@ -198,10 +201,13 @@ void TGraceJoinPacker::Pack() {
NYql::NUdf::TUnboxedValue value = *TuplePtrs[pi.ColumnIdx];
if (!value) { // Null value
- ui64 currNullsIdx = i / (sizeof(ui64) * 8);
- ui64 remShift = ( i - currNullsIdx * (sizeof(ui64) * 8) );
+ ui64 currNullsIdx = (i + 1) / (sizeof(ui64) * 8);
+ ui64 remShift = ( (i + 1) - currNullsIdx * (sizeof(ui64) * 8) );
ui64 bitMask = (0x1) << remShift;
TupleIntVals[currNullsIdx] |= bitMask;
+ if (pi.IsKeyColumn) {
+ TupleIntVals[0] |= (0x1);
+ }
continue;
}
TType* type = pi.MKQLType;
@@ -309,8 +315,8 @@ void TGraceJoinPacker::UnPack() {
value = NYql::NUdf::TUnboxedValue();
continue;
}
- ui64 currNullsIdx = i / (sizeof(ui64) * 8);
- ui64 remShift = ( i - currNullsIdx * (sizeof(ui64) * 8) );
+ ui64 currNullsIdx = (i + 1) / (sizeof(ui64) * 8);
+ ui64 remShift = ( (i + 1) - currNullsIdx * (sizeof(ui64) * 8) );
ui64 bitMask = (0x1) << remShift;
if ( TupleIntVals[currNullsIdx] & bitMask ) {
value = NYql::NUdf::TUnboxedValue();
@@ -411,9 +417,10 @@ void TGraceJoinPacker::UnPack() {
}
-TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory) :
+TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, const std::vector<ui32>& keyColumns, const THolderFactory& holderFactory, bool isAny) :
ColumnTypes(columnTypes)
- , HolderFactory(holderFactory) {
+ , HolderFactory(holderFactory)
+ , IsAny(isAny) {
ui64 nColumns = ColumnTypes.size();
ui64 nKeyColumns = keyColumns.size();
@@ -467,7 +474,7 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum;
DataIColumnsNum = TotalIColumnsNum - KeyIColumnsNum;
- NullsBitmapSize = (nColumns / (8 * sizeof(ui64)) + 1) ;
+ NullsBitmapSize = ( (nColumns + 1)/ (8 * sizeof(ui64)) + 1) ;
TupleIntVals.resize(2 * totalIntColumnsNum + NullsBitmapSize);
TupleStrings.resize(totalStrColumnsNum);
@@ -533,8 +540,8 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
currIdx++;
}
- KeyIntColumnsNum = (keyIntOffset + sizeof(ui64) - 1 ) / sizeof(ui64) - NullsBitmapSize;
- DataIntColumnsNum = (currIntOffset + sizeof(ui64) - 1) / sizeof(ui64) - KeyIntColumnsNum - NullsBitmapSize;
+ PackedKeyIntColumnsNum = (keyIntOffset + sizeof(ui64) - 1 ) / sizeof(ui64) - NullsBitmapSize;
+ PackedDataIntColumnsNum = (currIntOffset + sizeof(ui64) - 1) / sizeof(ui64) - PackedKeyIntColumnsNum - NullsBitmapSize;
GraceJoin::TColTypeInterface * cti_p = nullptr;
@@ -542,7 +549,9 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
cti_p = ColumnInterfaces.data();
}
- TablePtr = std::make_unique<GraceJoin::TTable>(KeyIntColumnsNum, KeyStrColumnsNum, DataIntColumnsNum, DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, cti_p );
+ TablePtr = std::make_unique<GraceJoin::TTable>(
+ PackedKeyIntColumnsNum, KeyStrColumnsNum, PackedDataIntColumnsNum,
+ DataStrColumnsNum, KeyIColumnsNum, DataIColumnsNum, NullsBitmapSize, cti_p, IsAny );
}
@@ -553,12 +562,12 @@ public:
TGraceJoinState(TMemoryUsageInfo* memInfo,
IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight,
- EJoinKind joinKind, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
+ EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames,
const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory)
: TBase(memInfo)
- , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory))
- , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory))
+ , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both)))
+ , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both)))
, JoinedTablePtr(std::make_unique<GraceJoin::TTable>())
, JoinCompleted(std::make_unique<bool>(false))
, PartialJoinCompleted(std::make_unique<bool>(false))
@@ -568,6 +577,7 @@ public:
, FlowLeft(flowLeft)
, FlowRight(flowRight)
, JoinKind(joinKind)
+ , AnyJoinSettings_(anyJoinSettings)
, LeftKeyColumns(leftKeyColumns)
, RightKeyColumns(rightKeyColumns)
, LeftRenames(leftRenames)
@@ -583,6 +593,7 @@ private:
IComputationWideFlowNode* const FlowRight;
const EJoinKind JoinKind;
+ const EAnyJoinSettings AnyJoinSettings_;
const std::vector<ui32> LeftKeyColumns;
const std::vector<ui32> RightKeyColumns;
const std::vector<ui32> LeftRenames;
@@ -604,7 +615,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
public:
TGraceJoinWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight,
- EJoinKind joinKind, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns,
+ EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns,
std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames,
std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes,
std::vector<EValueRepresentation>&& outputRepresentations)
@@ -612,6 +623,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
, FlowLeft(flowLeft)
, FlowRight(flowRight)
, JoinKind(joinKind)
+ , AnyJoinSettings_(anyJoinSettings)
, LeftKeyColumns(std::move(leftKeyColumns))
, RightKeyColumns(std::move(rightKeyColumns))
, LeftRenames(std::move(leftRenames))
@@ -718,7 +730,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TGraceJoinState>(
- FlowLeft, FlowRight, JoinKind, LeftKeyColumns, RightKeyColumns,
+ FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns,
LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes,
ctx.HolderFactory);
}
@@ -726,6 +738,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
IComputationWideFlowNode *const FlowLeft;
IComputationWideFlowNode *const FlowRight;
const EJoinKind JoinKind;
+ const EAnyJoinSettings AnyJoinSettings_;
const std::vector<ui32> LeftKeyColumns;
const std::vector<ui32> RightKeyColumns;
const std::vector<ui32> LeftRenames;
@@ -880,6 +893,10 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(6));
const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
+// const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(8))->AsValue().Get<ui32>());
+
+ const EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None;
+
const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0));
const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1));
@@ -915,7 +932,7 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
}
return new TGraceJoinWrapper(
- ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind),
+ ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings,
std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations));
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
index 0d94e0ba62b..89c82d99d6d 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -20,7 +20,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
TotalPacked++;
TempTuple.clear();
- TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize + NumberOfKeyIntColumns);
+ TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns);
if ( NumberOfKeyIColumns > 0 ) {
for (ui32 i = 0; i < NumberOfKeyIColumns; i++) {
@@ -69,6 +69,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
}
+/*
ui64 nullsBitmapIdx = NumberOfKeyColumns / (sizeof(ui64) * 8);
ui64 remBits = (nullsBitmapIdx + 1) * sizeof(ui64) * 8 - NumberOfKeyColumns;
@@ -80,6 +81,13 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
TempTuple[nullsBitmapIdx] = 0;
nullsBitmapIdx++;
}
+*/
+
+ TempTuple[0] &= (0x1); // Setting only nulls in key bit, all other bits are ignored for key hash
+ for (ui32 i = 1; i < NullsBitmapSize_; i ++) {
+ TempTuple[i] = 0;
+ }
+
XXH64_hash_t hash = XXH64(TempTuple.data(), TempTuple.size() * sizeof(ui64), 0);
@@ -87,18 +95,31 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
ui64 bucket = hash & BucketsMask;
- TableBuckets[bucket].TuplesNum++;
+
std::vector<ui64, TMKQLAllocator<ui64>> & keyIntVals = TableBuckets[bucket].KeyIntVals;
std::vector<ui32, TMKQLAllocator<ui32>> & stringsOffsets = TableBuckets[bucket].StringsOffsets;
std::vector<ui64, TMKQLAllocator<ui64>> & dataIntVals = TableBuckets[bucket].DataIntVals;
std::vector<char, TMKQLAllocator<char>> & stringVals = TableBuckets[bucket].StringsValues;
+ KeysHashTable & kh = TableBuckets[bucket].AnyHashTable;
ui32 offset = keyIntVals.size(); // Offset of tuple inside the keyIntVals vector
keyIntVals.push_back(hash);
- keyIntVals.insert(keyIntVals.end(), intColumns, intColumns + NullsBitmapSize);
- keyIntVals.insert(keyIntVals.end(), TempTuple.begin() + NullsBitmapSize, TempTuple.end());
+ keyIntVals.insert(keyIntVals.end(), intColumns, intColumns + NullsBitmapSize_);
+ keyIntVals.insert(keyIntVals.end(), TempTuple.begin() + NullsBitmapSize_, TempTuple.end());
+
+
+
+ if (IsAny_) {
+ if ( !AddKeysToHashTable(kh, keyIntVals.begin() + offset) ) {
+ keyIntVals.resize(offset);
+ return;
+ }
+ }
+
+
+ TableBuckets[bucket].TuplesNum++;
if (NumberOfStringColumns || NumberOfIColumns ) {
stringsOffsets.push_back(offset); // Adding offset to tuple in keyIntVals vector
@@ -125,7 +146,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
// Adding data values
- ui64 * dataColumns = intColumns + NullsBitmapSize + NumberOfKeyIntColumns;
+ ui64 * dataColumns = intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns;
dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns);
// Adding strings values for data columns
@@ -241,6 +262,30 @@ inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1,
return true;
}
+// Resizes KeysHashTable to new slots, keeps old content.
+void ResizeHashTable(KeysHashTable &t, ui64 newSlots){
+
+ std::vector<ui64> newTable(newSlots * t.SlotSize , 0);
+ for ( auto it = t.Table.begin(); it != t.Table.end(); it += t.SlotSize ) {
+ if ( *it == 0)
+ continue;
+ ui64 hash = *it;
+ ui64 newSlotNum = hash % (newSlots);
+ auto newIt = newTable.begin() + t.SlotSize * newSlotNum;
+ while (*newIt != 0) {
+ newIt += t.SlotSize;
+ if (newIt >= newTable.end()) {
+ newIt = newTable.begin();
+ }
+ }
+ std::copy_n(it, t.SlotSize, newIt);
+ }
+ t.NSlots = newSlots;
+ t.Table = std::move(newTable);
+
+}
+
+
// Joins two tables and returns join result in joined table. Tuples of joined table could be received by
// joined table iterator
void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples ) {
@@ -271,6 +316,8 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
ui64 tuplesFound = 0;
ui64 leftIdsMatch = 0;
ui64 rightIdsMatch = 0;
+ ui64 t2AnySkipped = 0;
+ ui64 t1AnySkipped = 0;
std::vector<ui64, TMKQLAllocator<ui64, EMemorySubPool::Temporary>> joinSlots, spillSlots, slotToIdx;
std::vector<ui32, TMKQLAllocator<ui32, EMemorySubPool::Temporary>> stringsOffsets1, stringsOffsets2;
@@ -288,10 +335,13 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket];
TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket];
+ KeysHashTable& kh1 = bucket1->AnyHashTable;
+ KeysHashTable& kh2 = bucket2->AnyHashTable;
+
ui64 headerSize1 = JoinTable1->HeaderSize;
ui64 headerSize2 = JoinTable2->HeaderSize;
- ui64 nullsSize1 = JoinTable1->NullsBitmapSize;
- ui64 nullsSize2 = JoinTable2->NullsBitmapSize;
+ ui64 nullsSize1 = JoinTable1->NullsBitmapSize_;
+ ui64 nullsSize2 = JoinTable2->NullsBitmapSize_;
ui64 numberOfKeyIntColumns1 = JoinTable1->NumberOfKeyIntColumns;
ui64 keyIntOffset1 = HashSize + nullsSize1;
ui64 keyIntOffset2 = HashSize + nullsSize2;
@@ -314,6 +364,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
slotSize = slotSize + avgStringsSize;
}
+
ui64 nSlots = 3 * bucket2->TuplesNum + 1;
joinSlots.clear();
spillSlots.clear();
@@ -321,18 +372,27 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
joinSlots.resize(nSlots*slotSize, 0);
slotToIdx.resize(nSlots, 0);
+ kh1.NSlots = nSlots;
+ kh1.SlotSize = slotSize;
+
+
+ kh2.NSlots = nSlots;
+ kh2.SlotSize = slotSize;
+
ui32 tuple2Idx = 0;
auto it2 = bucket2->KeyIntVals.begin();
while (it2 != bucket2->KeyIntVals.end() ) {
+
ui64 keysValSize;
if ( JoinTable2->NumberOfKeyStringColumns > 0 || JoinTable2->NumberOfKeyIColumns > 0) {
keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ;
} else {
keysValSize = headerSize2;
}
+
ui64 hash = *it2;
ui64 * nullsPtr = it2+1;
- if (!HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns))
+ if (!HasBitSet(nullsPtr, 1))
{
ui64 slotNum = hash % nSlots;
@@ -375,9 +435,10 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
keysValSize = headerSize1;
}
+
ui64 hash = *it1;
ui64 * nullsPtr = it1+1;
- if (HasBitSet(nullsPtr, JoinTable1->NumberOfKeyColumns))
+ if (HasBitSet(nullsPtr, 1))
{
it1 += keysValSize;
tuple1Idx ++;
@@ -430,7 +491,8 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
if ( JoinTable1->NumberOfKeyStringColumns == 0) {
stringsMatch = true;
} else {
- if (headerMatch && std::equal( it1 + headerSize1, it1 + headerSize1 + JoinTable1->NumberOfKeyStringColumns, slotStringsStart )) {
+ ui64 stringsSize = *(it1 + headerSize1 - 1);
+ if (headerMatch && std::equal( it1 + headerSize1, it1 + headerSize1 + stringsSize, slotStringsStart )) {
stringsMatch = true;
}
}
@@ -533,6 +595,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
HasMoreLeftTuples_ = hasMoreLeftTuples;
HasMoreRightTuples_ = hasMoreRightTuples;
+
}
inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
@@ -556,14 +619,14 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
}
- for ( ui64 i = 0; i < NumberOfKeyIntColumns + NullsBitmapSize; ++i) {
+ for ( ui64 i = 0; i < NumberOfKeyIntColumns + NullsBitmapSize_; ++i) {
td.IntColumns[i] = tb.KeyIntVals[keyIntsOffset + HashSize + i];
}
dataIntsOffset = NumberOfDataIntColumns * tupleId;
for ( ui64 i = 0; i < NumberOfDataIntColumns; ++i) {
- td.IntColumns[NumberOfKeyIntColumns + NullsBitmapSize + i] = tb.DataIntVals[dataIntsOffset + i];
+ td.IntColumns[NumberOfKeyIntColumns + NullsBitmapSize_ + i] = tb.DataIntVals[dataIntsOffset + i];
}
char *strPtr = nullptr;
@@ -636,6 +699,101 @@ inline bool TTable::HasJoinedTupleId(TTable *joinedTable, ui32 &tupleId2) {
}
}
+
+
+inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys) {
+
+ if (t.NSlots == 0) {
+ t.SlotSize = HeaderSize + NumberOfKeyStringColumns * 2;
+ t.Table.resize(DefaultTuplesNum * t.SlotSize, 0);
+ t.NSlots = DefaultTuplesNum;
+ }
+
+ if ( ( (t.NSlots - t.FillCount) * 100 ) / t.NSlots < 50 ) {
+ ResizeHashTable(t, 2 * t.NSlots);
+ }
+
+ if ( HasBitSet(keys + HashSize, 1)) // Keys with null value
+ return false;
+
+ ui64 hash = *keys;
+ ui64 slot = hash % t.NSlots;
+ auto it = t.Table.begin() + slot * t.SlotSize;
+
+ ui64 keyIntOffset = HashSize + NullsBitmapSize_;
+ ui64 keysSize = HeaderSize;
+ ui64 keyStringsSize = 0;
+ if ( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) {
+ keyStringsSize = *(keys + HeaderSize - 1);
+ keysSize = HeaderSize + keyStringsSize;
+ }
+
+
+ while (*it != 0) {
+
+ while (*it == hash) {
+
+ ui64 storedStringsSize = 0;
+ ui64 storedKeysSize = HeaderSize;
+ if ( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) {
+ storedStringsSize = *(it + HeaderSize - 1);
+ storedKeysSize = HeaderSize + storedStringsSize;
+ }
+
+ bool headerMatch = false;
+ bool stringsMatch = false;
+ bool iValuesMatch = false;
+ headerMatch = std::equal(it + keyIntOffset, it + HeaderSize, keys + keyIntOffset);
+ if (!headerMatch) {
+ break;
+ }
+
+ if ( headerMatch && !(NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) ) {
+ return false;
+ }
+
+ if (storedStringsSize != keyStringsSize) {
+ break;
+ }
+
+ ui64 * stringsStart;
+ if (storedKeysSize < t.SlotSize) {
+ stringsStart = it + HeaderSize;
+ } else {
+ ui64 spillOffset = *(it + HeaderSize);
+ stringsStart = t.SpillData.begin() + spillOffset;
+ }
+
+ stringsMatch = std::equal(keys + HeaderSize, keys + HeaderSize + keyStringsSize, stringsStart );
+
+ if ( headerMatch && stringsMatch ) {
+ return false;
+ }
+
+ break;
+
+ }
+
+ it += t.SlotSize;
+ if (it >= t.Table.end()) {
+ it = t.Table.begin();
+ }
+ }
+
+ if (keysSize > t.SlotSize) {
+ ui64 spillDataOffset = t.SpillData.size();
+ t.SpillData.insert(t.SpillData.end(), keys + HeaderSize, keys + keysSize);
+ std::copy_n(keys, HeaderSize, it);
+ *(it + HeaderSize) = spillDataOffset;
+ } else {
+ std::copy_n(keys, keysSize, it);
+ }
+
+ t.FillCount++;
+ return true;
+
+}
+
inline bool HasRightIdMatch(ui64 currId, ui64 & rightIdIter, const std::vector<ui32, TMKQLAllocator<ui32>> & rightIds) {
if (rightIdIter >= rightIds.size()) return false;
@@ -993,6 +1151,7 @@ void TTable::Clear() {
tb.InterfaceOffsets.clear();
tb.JoinIds.clear();
tb.RightIds.clear();
+// tb.AnyHashTable = KeysHashTable{0, 0, 0, {}, {}};
}
@@ -1001,7 +1160,8 @@ void TTable::Clear() {
// Creates new table with key columns and data columns
TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
ui64 numberOfDataIntColumns, ui64 numberOfDataStringColumns,
- ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns, TColTypeInterface * colInterfaces ) :
+ ui64 numberOfKeyIColumns, ui64 numberOfDataIColumns,
+ ui64 nullsBitmapSize, TColTypeInterface * colInterfaces, bool isAny ) :
NumberOfKeyIntColumns(numberOfKeyIntColumns),
NumberOfKeyStringColumns(numberOfKeyStringColumns),
@@ -1009,7 +1169,9 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
NumberOfDataStringColumns(numberOfDataStringColumns),
NumberOfKeyIColumns(numberOfKeyIColumns),
NumberOfDataIColumns(numberOfDataIColumns),
- ColInterfaces(colInterfaces) {
+ NullsBitmapSize_(nullsBitmapSize),
+ ColInterfaces(colInterfaces),
+ IsAny_(isAny) {
NumberOfKeyColumns = NumberOfKeyIntColumns + NumberOfKeyStringColumns + NumberOfKeyIColumns;
NumberOfDataColumns = NumberOfDataIntColumns + NumberOfDataStringColumns + NumberOfDataIColumns;
@@ -1019,11 +1181,10 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
BytesInKeyIntColumns = NumberOfKeyIntColumns * sizeof(ui64);
- NullsBitmapSize = NumberOfColumns / (8 * sizeof(ui64)) + 1;
TotalStringsSize = (numberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0 ) ? 1 : 0;
- HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize;
+ HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize;
TableBuckets.resize(NumberOfBuckets);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
index 7725f038496..70cf0a16f79 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
@@ -9,7 +9,7 @@ namespace NKikimr {
namespace NMiniKQL {
namespace GraceJoin {
-const ui64 BitsForNumberOfBuckets = 8; // 2^8 = 256
+const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32
const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1;
const ui64 NumberOfBuckets = (0x00000001 << BitsForNumberOfBuckets); // Number of hashed keys buckets to distribute incoming tables tuples
const ui64 DefaultTuplesNum = 1000; // Default initial number of tuples in one bucket to allocate memory
@@ -42,6 +42,14 @@ struct JoinTuplesIds {
ui32 id2 = 0; // Identifier of second table tuple as index in bucket
};
+// To store keys values when making join only for unique keys (any join attribute)
+struct KeysHashTable {
+ ui64 SlotSize = 0; // Slot size in hash table
+ ui64 NSlots = 0; // Total number of slots in table
+ ui64 FillCount = 0; // Number of ui64 slots which are filled
+ std::vector<ui64> Table; // Table to store keys data in particular slots
+ std::vector<ui64> SpillData; // Vector to store long data which cannot be fit in single hash table slot.
+};
struct TTableBucket {
ui64 TuplesNum = 0; // Total number of tuples in bucket
@@ -50,8 +58,7 @@ struct TTableBucket {
std::vector<char, TMKQLAllocator<char>> StringsValues; // Vector to store data strings values
std::vector<ui32, TMKQLAllocator<ui32>> StringsOffsets; // Vector to store strings values sizes (offsets in StringsValues are calculated) for particular tuple.
std::vector<char, TMKQLAllocator<char>> InterfaceValues; // Vector to store types to work through external-provided IHash, IEquate interfaces
- std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces
-
+ std::vector<ui32, TMKQLAllocator<ui32>> InterfaceOffsets; // Vector to store sizes of columns to work through IHash, IEquate interfaces
std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds>> JoinIds; // Results of join operations stored as index of tuples in buckets
// of two tables with the same number
@@ -59,6 +66,7 @@ struct TTableBucket {
std::set<ui32> AllLeftMatchedIds; // All row ids of left join table which have matching rows in right table. To process streaming join mode.
std::set<ui32> AllRightMatchedIds; // All row ids of right join table which matching rows in left table. To process streaming join mode.
+ KeysHashTable AnyHashTable; // Hash table to process join only for unique keys (any join attribute)
};
@@ -100,11 +108,11 @@ class TTable {
ui64 NumberOfDataColumns = 0; // Number of data columns in the Table
ui64 NumberOfStringColumns = 0; // Total number of String Columns
ui64 NumberOfIColumns = 0; // Total number of interface-based columns
- ui64 NullsBitmapSize = 1; // Default size of ui64 values used for null columns bitmap.
+ ui64 NullsBitmapSize_ = 1; // Default size of ui64 values used for null columns bitmap.
// Every bit set means null value. Order of columns is equal to order in AddTuple call.
- // First key int column is bit 0 in bit mask, second - bit 1, etc. Bit 0 is least significant in bitmask.
+ // First key int column is bit 1 in bit mask, second - bit 2, etc. Bit 0 is least significant in bitmask and tells if key columns contain nulls.
ui64 TotalStringsSize = 0; // Bytes in tuple header reserved to store total strings size key tuple columns
- ui64 HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size
+ ui64 HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize; // Header of all tuples size
ui64 BytesInKeyIntColumns = sizeof(ui64) * NumberOfKeyIntColumns;
@@ -145,6 +153,9 @@ class TTable {
// True if current iterator of tuple in joinedTable has corresponding joined tuple in second table. Id of joined tuple in second table returns in tupleId2.
inline bool HasJoinedTupleId(TTable* joinedTable, ui32& tupleId2);
+ // Adds keys to KeysHashTable, return true if added, false if equal key already added
+ inline bool AddKeysToHashTable(KeysHashTable& t, ui64* keys);
+
ui64 TotalPacked = 0; // Total number of packed tuples
ui64 TotalUnpacked = 0; // Total number of unpacked tuples
@@ -154,6 +165,8 @@ class TTable {
bool HasMoreLeftTuples_ = false; // True if join is not completed, rows from left table are coming
bool HasMoreRightTuples_ = false; // True if join is not completed, rows from right table are coming
+ bool IsAny_ = false; // True if key duplicates need to be removed from table (any join)
+
public:
// Adds new tuple to the table. intColumns, stringColumns - data of columns,
@@ -182,7 +195,8 @@ public:
// Creates new table with key columns and data columns
TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
- ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, TColTypeInterface * colInterfaces = nullptr);
+ ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
+ ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false);
};
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
index 53d36b03bcd..a1939e7e786 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
@@ -199,15 +199,15 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
- const ui64 BigTableTuples = 60000;
- const ui64 SmallTableTuples = 15000;
+ const ui64 BigTableTuples = 600000;
+ const ui64 SmallTableTuples = 150000;
const ui64 BigTupleSize = 32;
std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now();
for ( ui64 i = 0; i < BigTableTuples; i++) {
- tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples );
+ tuple[1] = std::rand() % SmallTableTuples;
tuple[2] = tuple[1];
bigTable.AddTuple(tuple, strVals, strSizes);
}
@@ -215,7 +215,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
smallTable.AddTuple(tuple, bigStrVal, bigStrSize);
for ( ui64 i = 0; i < SmallTableTuples + 1; i++) {
- tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples );
+ tuple[1] = std::rand() % SmallTableTuples;
tuple[2] = tuple[1];
smallTable.AddTuple(tuple, strVals, strSizes);
}
@@ -233,7 +233,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
for ( ui64 i = 0; i < BigTableTuples; i++) {
- tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples );
+ tuple[1] = std::rand() % SmallTableTuples;
tuple[2] = tuple[1];
bigTable.AddTuple(tuple, strVals, strSizes);
}
@@ -241,7 +241,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
smallTable.AddTuple(tuple, bigStrVal, bigStrSize);
for ( ui64 i = 0; i < SmallTableTuples + 1; i++) {
- tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples );
+ tuple[1] = std::rand() % SmallTableTuples;
tuple[2] = tuple[1];
smallTable.AddTuple(tuple, strVals, strSizes);
}
@@ -326,6 +326,137 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
CTEST << Endl;
+ }
+}
+
+Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinAnyTest) {
+ Y_UNIT_TEST_LLVM(TestImp2) {
+ TSetup<LLVM> setup;
+ ui64 tuple[11] = {0,1,2,3,4,5,6,7,8,9,10};
+ ui32 strSizes[2] = {4, 4};
+ char * strVals[] = {(char *)"aaaaa", (char *)"bbbb"};
+
+ char * bigStrVal[] = {(char *)"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ (char *)"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"};
+ ui32 bigStrSize[2] = {151, 151};
+
+
+
+ GraceJoin::TTable bigTable (1,1,1,1,0,0,1, nullptr, true);
+ GraceJoin::TTable smallTable(1,1,1,1,0,0,1, nullptr, true);
+ GraceJoin::TTable joinTable (1,1,1,1,0,0,1, nullptr, true);
+
+ std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
+
+ const ui64 TupleSize = 1024;
+
+ ui64 bigTuple[TupleSize];
+
+ for (ui64 i = 0; i < TupleSize; i++) {
+ bigTuple[i] = std::rand() / ( RAND_MAX / 10000 );
+ }
+
+ ui64 milliseconds = 0;
+
+
+
+ const ui64 BigTableTuples = 600000;
+ const ui64 SmallTableTuples = 150000;
+ const ui64 BigTupleSize = 32;
+
+ std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now();
+
+
+ for ( ui64 i = 0; i < BigTableTuples; i++) {
+ tuple[1] = i % SmallTableTuples;
+ tuple[2] = tuple[1];
+ bigTable.AddTuple(tuple, strVals, strSizes);
+ }
+
+ smallTable.AddTuple(tuple, bigStrVal, bigStrSize);
+
+ for ( ui64 i = 0; i < SmallTableTuples + 1; i++) {
+ tuple[1] = i;
+ tuple[2] = tuple[1];
+ smallTable.AddTuple(tuple, strVals, strSizes);
+ }
+
+ std::chrono::steady_clock::time_point end03 = std::chrono::steady_clock::now();
+ milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(end03 - begin03).count();
+ CTEST << "Time for hash = " << milliseconds << "[ms]" << Endl;
+ CTEST << "Adding tuples speed: " << (BigTupleSize * (BigTableTuples + SmallTableTuples) * 1000) / ( milliseconds * 1024 * 1024) << "MB/sec" << Endl;
+ CTEST << Endl;
+
+ std::vector<ui64> vals1, vals2;
+ std::vector<char *> strVals1, strVals2;
+ std::vector<ui32> strSizes1, strSizes2;
+ GraceJoin::TupleData td1, td2;
+ vals1.resize(100);
+ vals2.resize(100);
+ strVals1.resize(100);
+ strVals2.resize(100);
+ strSizes1.resize(100);
+ strSizes2.resize(100);
+ td1.IntColumns = vals1.data();
+ td1.StrColumns = strVals1.data();
+ td1.StrSizes = strSizes1.data();
+ td2.IntColumns = vals2.data();
+ td2.StrColumns = strVals2.data();
+ td2.StrSizes = strSizes2.data();
+
+ ui64 numBigTuples = 0;
+ bigTable.ResetIterator();
+
+ std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now();
+
+ while(bigTable.NextTuple(td1)) { numBigTuples++; }
+
+ CTEST << "Num of big tuples 1: " << numBigTuples << Endl;
+
+ std::chrono::steady_clock::time_point end04 = std::chrono::steady_clock::now();
+ CTEST << "Time for get 1 = " << std::chrono::duration_cast<std::chrono::milliseconds>(end04 - begin04).count() << "[ms]" << Endl;
+ CTEST << Endl;
+
+ numBigTuples = 0;
+ bigTable.ResetIterator();
+
+ std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now();
+
+ while(bigTable.NextTuple(td2)) { numBigTuples++; }
+
+ CTEST << "Num of big tuples 2: " << numBigTuples << Endl;
+
+ std::chrono::steady_clock::time_point end041 = std::chrono::steady_clock::now();
+ CTEST << "Time for get 2 = " << std::chrono::duration_cast<std::chrono::milliseconds>(end041 - begin041).count() << "[ms]" << Endl;
+ CTEST << Endl;
+
+
+ std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now();
+
+ joinTable.Join(smallTable,bigTable);
+
+ std::chrono::steady_clock::time_point end05 = std::chrono::steady_clock::now();
+ CTEST << "Time for join = " << std::chrono::duration_cast<std::chrono::milliseconds>(end05 - begin05).count() << "[ms]" << Endl;
+ CTEST << Endl;
+
+ joinTable.ResetIterator();
+ ui64 numJoinedTuples = 0;
+
+
+ std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now();
+
+ while(joinTable.NextJoinedData(td1, td2)) { numJoinedTuples++; }
+
+ CTEST << "Num of joined tuples : " << numJoinedTuples << Endl;
+
+ std::chrono::steady_clock::time_point end042 = std::chrono::steady_clock::now();
+ CTEST << "Time for get joined tuples: = " << std::chrono::duration_cast<std::chrono::milliseconds>(end042 - begin042).count() << "[ms]" << Endl;
+ CTEST << Endl;
+
+
+ std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
+ CTEST << "Time difference = " << std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count() << "[ms]" << Endl;
+ CTEST << Endl;
}
@@ -388,12 +519,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
NUdf::TUnboxedValue tuple;
UNIT_ASSERT(iterator.Next(tuple));
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B");
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X");
- UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y");
UNIT_ASSERT(iterator.Next(tuple));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z");
+ UNIT_ASSERT(iterator.Next(tuple));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X");
UNIT_ASSERT(!iterator.Next(tuple));
}
@@ -525,12 +658,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
NUdf::TUnboxedValue tuple;
UNIT_ASSERT(iterator.Next(tuple));
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B");
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X");
- UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y");
UNIT_ASSERT(iterator.Next(tuple));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z");
+ UNIT_ASSERT(iterator.Next(tuple));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X");
UNIT_ASSERT(!iterator.Next(tuple));
}
@@ -1263,6 +1398,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
UNIT_ASSERT(iterator.Next(tuple));
+ UNIT_ASSERT(!tuple.GetElement(0));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z");
+ UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B");
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X");
UNIT_ASSERT(iterator.Next(tuple));
@@ -1274,9 +1412,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y");
- UNIT_ASSERT(iterator.Next(tuple));
- UNIT_ASSERT(!tuple.GetElement(0));
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z");
UNIT_ASSERT(!iterator.Next(tuple));
UNIT_ASSERT(!iterator.Next(tuple));
}
@@ -1403,11 +1538,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
NUdf::TUnboxedValue tuple;
UNIT_ASSERT(iterator.Next(tuple));
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
- UNIT_ASSERT(!tuple.GetElement(1));
- UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "Z");
UNIT_ASSERT_VALUES_EQUAL(tuple.GetElement(1).Get<ui32>(), 3);
+ UNIT_ASSERT(iterator.Next(tuple));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
+ UNIT_ASSERT(!tuple.GetElement(1));
UNIT_ASSERT(!iterator.Next(tuple));
UNIT_ASSERT(!iterator.Next(tuple));
}