aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraakulaga <aakulaga@ydb.tech>2022-10-19 15:20:35 +0300
committeraakulaga <aakulaga@ydb.tech>2022-10-19 15:20:35 +0300
commitc0012de5c7fd72d2f30add432b82326036545906 (patch)
tree95248a7669f7e2ae9e5ecd9b7eb2c83a2d3b4304
parent377c1b9d204f622afa79cb685c837ca7195d1d3e (diff)
downloadydb-c0012de5c7fd72d2f30add432b82326036545906.tar.gz
TPC-H passed
TPC-H queries passed
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_join.cpp2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join.cpp1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp88
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp32
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp80
6 files changed, 153 insertions, 54 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_join.cpp b/ydb/library/yql/core/type_ann/type_ann_join.cpp
index dc6eaa097e..291991c48b 100644
--- a/ydb/library/yql/core/type_ann/type_ann_join.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_join.cpp
@@ -592,7 +592,7 @@ namespace NTypeAnnImpl {
}
auto columnType = GetFieldType(leftTupleType, *oldPos);
- if ((joinKind == "Right" || joinKind == "Full" || joinKind == "Exclusion") && !columnType->IsOptionalOrNull()) {
+ if ((joinKind == "Right" || joinKind == "Full" || joinKind == "Exclusion" ) && !columnType->IsOptionalOrNull()) {
columnType = ctx.Expr.MakeType<TOptionalExprType>(columnType);
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp
index 5fa805102d..31603195f5 100644
--- a/ydb/library/yql/dq/opt/dq_opt_join.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp
@@ -848,7 +848,6 @@ TExprBase DqBuildGraceJoin(const TDqJoin& join, TExprContext& ctx) {
static const std::set<std::string_view> supportedTypes = {
"Inner"sv,
"Left"sv,
- "Cross"sv,
"LeftOnly"sv,
"LeftSemi"sv,
"Right"sv,
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index 27c64571cb..c431c1a8a4 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -31,15 +31,20 @@ struct TColumnDataPackInfo {
NUdf::EDataSlot DataType = NUdf::EDataSlot::Uint32; // Data type of the column for standard types (TDataType)
bool IsKeyColumn = false; // True if this columns is key for join
bool IsString = false; // True if value is string
+ ui32 Offset = 0; // Offset of column in packed data
+// TValuePacker Packer; // Packer for composite data types
};
struct TGraceJoinPacker {
ui64 NullsBitmapSize = 0; // Number of ui64 values for nulls bitmap
+ ui64 TuplesPacked = 0; // Total number of packed tuples
+ ui64 TuplesUnpacked = 0; // Total number of unpacked tuples
+ std::chrono::time_point<std::chrono::system_clock> StartTime; // Start time of execution
+ std::chrono::time_point<std::chrono::system_clock> EndTime; // End time of execution
std::vector<ui64> TupleIntVals; // Packed value of all fixed length values of table tuple. Keys columns should be packed first.
std::vector<ui32> TupleStrSizes; // Sizes of all packed strings
std::vector<char*> TupleStrings; // All values of tuple strings
std::vector<ui32> Offsets; // Offsets of table column values in bytes
- std::vector<ui32> PackedIdx; // Indexes of initial columns after packing
std::vector<TType*> ColumnTypes; // Types of all columns
std::vector<TValuePacker> Packers; // Packers for composite data types
const THolderFactory& HolderFactory; // To use during unpacking
@@ -77,6 +82,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
}
if (colType->GetKind() != TType::EKind::Data) {
+ MKQL_ENSURE(false, "Unknown data type.");
res.IsString = true;
res.DataType = NUdf::EDataSlot::String;
return;
@@ -106,6 +112,10 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
res.Bytes = sizeof(i64); break;
case NUdf::EDataSlot::Uint64:
res.Bytes = sizeof(ui64); break;
+ case NUdf::EDataSlot::Float:
+ res.Bytes = sizeof(float); break;
+ case NUdf::EDataSlot::Double:
+ res.Bytes = sizeof(double); break;
case NUdf::EDataSlot::Date:
res.Bytes = sizeof(ui64); break;
case NUdf::EDataSlot::Datetime:
@@ -124,8 +134,11 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
res.Bytes = 10; break;
case NUdf::EDataSlot::Decimal:
res.Bytes = 16; break;
+ case NUdf::EDataSlot::String:
+ res.IsString = true; break;
default:
{
+ MKQL_ENSURE(false, "Unknown data type.");
res.IsString = true;
}
}
@@ -135,6 +148,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
void TGraceJoinPacker::Pack() {
+ TuplesPacked++;
for (ui64 i = 0; i < NullsBitmapSize; ++i) {
TupleIntVals[i] = 0; // Clearing nulls bit array. Bit 1 means particular column contains null value
}
@@ -142,16 +156,17 @@ void TGraceJoinPacker::Pack() {
for (ui64 i = 0; i < ColumnsPackInfo.size(); i++) {
const TColumnDataPackInfo &pi = ColumnsPackInfo[i];
- ui32 offset = Offsets[i];
- NYql::NUdf::TUnboxedValue value = *TuplePtrs[i];
+ ui32 offset = pi.Offset;
+
+ NYql::NUdf::TUnboxedValue value = *TuplePtrs[pi.ColumnIdx];
if (!value) { // Null value
- ui64 currNullsIdx = PackedIdx[i] / (sizeof(ui64) * 8);
- ui64 remShift = ( PackedIdx[i] - currNullsIdx * (sizeof(ui64) * 8) );
+ ui64 currNullsIdx = i / (sizeof(ui64) * 8);
+ ui64 remShift = ( i - currNullsIdx * (sizeof(ui64) * 8) );
ui64 bitMask = (0x1) << remShift;
TupleIntVals[currNullsIdx] |= bitMask;
continue;
}
- TType* type = ColumnTypes[i];
+ TType* type = pi.MKQLType;
TType* colType;
if (type->IsOptional()) {
@@ -182,14 +197,13 @@ void TGraceJoinPacker::Pack() {
case NUdf::EDataSlot::Uint16:
WriteUnaligned<ui16>(buffPtr, value.Get<ui16>()); break;
case NUdf::EDataSlot::Int32:
- *(reinterpret_cast<i32*> (buffPtr)) = value.Get<i32>(); break;
+ WriteUnaligned<i32>(buffPtr, value.Get<i32>()); break;
case NUdf::EDataSlot::Uint32:
WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break;
case NUdf::EDataSlot::Int64:
WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break;
case NUdf::EDataSlot::Uint64:
WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break;
- *(reinterpret_cast<ui64*> (buffPtr)) = value.Get<ui64>(); break;
case NUdf::EDataSlot::Float:
WriteUnaligned<float>(buffPtr, value.Get<float>()); break;
case NUdf::EDataSlot::Double:
@@ -233,7 +247,8 @@ void TGraceJoinPacker::Pack() {
}
default:
{
- auto str = TuplePtrs[i]->AsStringRef();
+
+ auto str = TuplePtrs[pi.ColumnIdx]->AsStringRef();
TupleStrings[offset] = str.Data();
TupleStrSizes[offset] = str.Size();
}
@@ -243,23 +258,24 @@ void TGraceJoinPacker::Pack() {
}
void TGraceJoinPacker::UnPack() {
+ TuplesUnpacked++;
for (ui64 i = 0; i < ColumnsPackInfo.size(); i++) {
const TColumnDataPackInfo &pi = ColumnsPackInfo[i];
- ui32 offset = Offsets[i];
- NYql::NUdf::TUnboxedValue & value = *TuplePtrs[i];
+ ui32 offset = pi.Offset;
+ NYql::NUdf::TUnboxedValue & value = *TuplePtrs[pi.ColumnIdx];
if (JoinTupleData.AllNulls) {
value = NYql::NUdf::TUnboxedValue();
continue;
}
- ui64 currNullsIdx = PackedIdx[i] / (sizeof(ui64) * 8);
- ui64 remShift = ( PackedIdx[i] - currNullsIdx * (sizeof(ui64) * 8) );
+ ui64 currNullsIdx = i / (sizeof(ui64) * 8);
+ ui64 remShift = ( i - currNullsIdx * (sizeof(ui64) * 8) );
ui64 bitMask = (0x1) << remShift;
if ( TupleIntVals[currNullsIdx] & bitMask ) {
value = NYql::NUdf::TUnboxedValue();
continue;
}
- TType * type = ColumnTypes[i];
+ TType * type = pi.MKQLType;
TType * colType;
if (type->IsOptional()) {
@@ -360,16 +376,24 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
auto colType = columnTypes[i];
auto packInfo = GetPackInfo(colType);
packInfo.ColumnIdx = i;
- allColumnsPackInfo.push_back(packInfo);
- Packers.push_back(TValuePacker(true,colType));
- }
+
- for ( auto const & keyColIdx: keyColumns ) {
- allColumnsPackInfo[keyColIdx].IsKeyColumn = true;
- }
+ ui32 keyColNums = std::count_if(keyColumns.begin(), keyColumns.end(), [&](ui32 k) {return k == i;});
+
+ if (keyColNums > 0) {
+ packInfo.IsKeyColumn = true;
+ for (ui32 j = 0; j < keyColNums; j++) {
+ ColumnsPackInfo.push_back(packInfo);
+ Packers.push_back(TValuePacker(true,colType));
+ }
+ } else {
+ ColumnsPackInfo.push_back(packInfo);
+ Packers.push_back(TValuePacker(true,colType));
+ }
+ }
- ColumnsPackInfo = allColumnsPackInfo;
+ nColumns = ColumnsPackInfo.size();
ui64 totalIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return !a.IsString; });
ui64 totalStrColumnsNum = nColumns - totalIntColumnsNum;
@@ -397,7 +421,7 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
JoinTupleData.StrColumns = TupleStrings.data();
JoinTupleData.StrSizes = TupleStrSizes.data();
- std::sort(allColumnsPackInfo.begin(), allColumnsPackInfo.end(), [](TColumnDataPackInfo & a, TColumnDataPackInfo & b)
+ std::sort(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo & a, TColumnDataPackInfo & b)
{
if (a.IsKeyColumn && !b.IsKeyColumn) return true;
if (a.Bytes > b.Bytes) return true;
@@ -406,7 +430,6 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
});
Offsets.resize(nColumns);
- PackedIdx.resize(nColumns);
TupleHolder.resize(nColumns);
TupleStringHolder.resize(nColumns);
@@ -416,15 +439,16 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
ui32 currStrOffset = 0;
ui32 currIdx = 0;
- for( auto const & p: allColumnsPackInfo ) {
+ for( auto & p: ColumnsPackInfo ) {
if ( !p.IsString ) {
+ p.Offset = currIntOffset;
Offsets[p.ColumnIdx] = currIntOffset;
currIntOffset += p.Bytes;
} else {
+ p.Offset = currStrOffset;
Offsets[p.ColumnIdx] = currStrOffset;
currStrOffset++;
}
- PackedIdx[p.ColumnIdx] = currIdx;
currIdx++;
}
@@ -625,11 +649,18 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox
const NKikimr::NMiniKQL::EFetchResult resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data());
if (resultLeft == EFetchResult::One) {
+ if (LeftPacker->TuplesPacked == 0) {
+ LeftPacker->StartTime = std::chrono::system_clock::now();
+ }
LeftPacker->Pack();
LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data());
}
if (resultRight == EFetchResult::One) {
+ if (RightPacker->TuplesPacked == 0) {
+ RightPacker->StartTime = std::chrono::system_clock::now();
+ }
+
RightPacker->Pack();
RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data());
}
@@ -640,9 +671,14 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox
if (resultRight == EFetchResult::Finish && resultLeft == EFetchResult::Finish && !*JoinCompleted) {
*JoinCompleted = true;
+ LeftPacker->StartTime = std::chrono::system_clock::now();
+ RightPacker->StartTime = std::chrono::system_clock::now();
JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind);
JoinedTablePtr->ResetIterator();
- }
+ LeftPacker->EndTime = std::chrono::system_clock::now();
+ RightPacker->EndTime = std::chrono::system_clock::now();
+
+ }
}
JoinedTuple->resize((LeftRenames.size() + RightRenames.size()) / 2);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
index 0f9cad8ab7..896a566d5d 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -15,12 +15,9 @@ namespace GraceJoin {
void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes ) {
+ TotalPacked++;
TempTuple.clear();
-// TempTuple.resize( NullsBitmapSize + NumberOfKeyIntColumns );
-// ui64 * memCopyStart = TempTuple.data() + TempTuple.size() - (NullsBitmapSize + NumberOfKeyIntColumns);
-// std::memcpy(memCopyStart, intColumns, (NullsBitmapSize + NumberOfKeyIntColumns)*sizeof(ui64));
-
TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize + NumberOfKeyIntColumns);
@@ -83,7 +80,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
}
// Adding data values
- ui64 * dataColumns = intColumns + NumberOfKeyIntColumns;
+ ui64 * dataColumns = intColumns + NullsBitmapSize + NumberOfKeyIntColumns;
dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns);
// Adding strings values for data columns
@@ -251,7 +248,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
ui32 tuple1Idx = 0;
auto it1 = bucket1->KeyIntVals.begin();
- while ( it1 != bucket1->KeyIntVals.end() ) {
+ while ( it1 < bucket1->KeyIntVals.end() ) {
ui64 valSize = (!JoinTable1->NumberOfKeyStringColumns) ? headerSize : headerSize + *(it1 + headerSize - JoinTable1->TotalStringsSize);
ui64 hash = *it1;
@@ -307,19 +304,14 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) {
it1 += valSize;
tuple1Idx ++;
}
-
- if ( JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly || JoinKind == EJoinKind::LeftSemi ||
- JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::RightSemi ||
- JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion )
+ std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b)
{
- std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b)
- {
- if (a.id1 < b.id1) return true;
- if (a.id1 == b.id1 && (a.id2 < b.id2)) return true;
- return false;
- });
+ if (a.id1 < b.id1) return true;
+ if (a.id1 == b.id1 && (a.id2 < b.id2)) return true;
+ return false;
+ });
- }
+
TableBuckets[bucket].JoinIds = std::move(joinResults);
if ( JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion ) {
std::vector<ui32> & rightIds = TableBuckets[bucket].RightIds;
@@ -361,7 +353,7 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
dataIntsOffset = NumberOfDataIntColumns * tupleId;
for ( ui64 i = 0; i < NumberOfDataIntColumns; ++i) {
- td.IntColumns[NumberOfKeyIntColumns + i] = tb.DataIntVals[dataIntsOffset + i];
+ td.IntColumns[NumberOfKeyIntColumns + NullsBitmapSize + i] = tb.DataIntVals[dataIntsOffset + i];
}
char *strPtr = nullptr;
@@ -410,7 +402,7 @@ inline bool TTable::HasJoinedTupleId(TTable *joinedTable, ui32 &tupleId2) {
}
else
{
- return false;
+ return false;
}
}
@@ -724,7 +716,7 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
TotalStringsSize = (numberOfKeyStringColumns > 0 ) ? 1 : 0;
- HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + TotalStringsSize;
+ HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + TotalStringsSize;
TableBuckets.resize(NumberOfBuckets);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
index 5ea68857df..934a897602 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
@@ -110,7 +110,9 @@ class TTable {
inline void GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData& td);
// True if current iterator of tuple in joinedTable has corresponding joined tuple in second table. Id of joined tuple in second table returns in tupleId2.
- inline bool HasJoinedTupleId(TTable* joinedTable, ui32& tupleId2);
+ inline bool HasJoinedTupleId(TTable* joinedTable, ui32& tupleId2);
+
+ ui64 TotalPacked = 0;
public:
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
index 99622ac38e..3a7133313b 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
@@ -207,7 +207,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
for ( ui64 i = 0; i < BigTableTuples; i++) {
- tuple[1] = i % (SmallTableTuples + 2);
+ tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples );
tuple[2] = tuple[1];
bigTable.AddTuple(tuple, strVals, strSizes);
}
@@ -215,7 +215,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
smallTable.AddTuple(tuple, bigStrVal, bigStrSize);
for ( ui64 i = 0; i < SmallTableTuples + 1; i++) {
- tuple[1] = i;
+ tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples );
tuple[2] = tuple[1];
smallTable.AddTuple(tuple, strVals, strSizes);
}
@@ -233,7 +233,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
for ( ui64 i = 0; i < BigTableTuples; i++) {
- tuple[1] = i % (SmallTableTuples + 2);
+ tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples );
tuple[2] = tuple[1];
bigTable.AddTuple(tuple, strVals, strSizes);
}
@@ -241,7 +241,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
smallTable.AddTuple(tuple, bigStrVal, bigStrSize);
for ( ui64 i = 0; i < SmallTableTuples + 1; i++) {
- tuple[1] = i;
+ tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples );
tuple[2] = tuple[1];
smallTable.AddTuple(tuple, strVals, strSizes);
}
@@ -300,7 +300,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now();
- joinTable.Join(bigTable,smallTable);
+ joinTable.Join(smallTable,bigTable);
std::chrono::steady_clock::time_point end05 = std::chrono::steady_clock::now();
CTEST << "Time for join = " << std::chrono::duration_cast<std::chrono::milliseconds>(end05 - begin05).count() << "[ms]" << Endl;
@@ -334,6 +334,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
+
Y_UNIT_TEST_LLVM(TestInner1) {
for (ui32 pass = 0; pass < 1; ++pass) {
@@ -401,6 +402,75 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
+ Y_UNIT_TEST_LLVM(TestInnerDoubleCondition1) {
+
+ for (ui32 pass = 0; pass < 1; ++pass) {
+ TSetup<LLVM> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto key1 = pb.NewDataLiteral<ui32>(1);
+ const auto key2 = pb.NewDataLiteral<ui32>(2);
+ const auto key3 = pb.NewDataLiteral<ui32>(4);
+ const auto key4 = pb.NewDataLiteral<ui32>(4);
+ const auto payload1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("A");
+ const auto payload2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("B");
+ const auto payload3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("C");
+ const auto payload4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("X");
+ const auto payload5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Y");
+ const auto payload6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Z");
+
+ const auto tupleType1 = pb.NewTupleType({
+ pb.NewDataType(NUdf::TDataType<ui32>::Id),
+ pb.NewDataType(NUdf::TDataType<char*>::Id)
+ });
+
+ const auto tupleType2 = pb.NewTupleType({
+ pb.NewDataType(NUdf::TDataType<ui32>::Id),
+ pb.NewDataType(NUdf::TDataType<ui32>::Id),
+ pb.NewDataType(NUdf::TDataType<char*>::Id)
+ });
+
+
+ const auto list1 = pb.NewList(tupleType1, {
+ pb.NewTuple({key1, payload1}),
+ pb.NewTuple({key2, payload2}),
+ pb.NewTuple({key3, payload3})
+ });
+
+ const auto list2 = pb.NewList(tupleType2, {
+ pb.NewTuple({key2, key2, payload4}),
+ pb.NewTuple({key3, key2, payload5}),
+ pb.NewTuple({key4, key1, payload6})
+ });
+
+
+ const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ pb.NewDataType(NUdf::TDataType<char*>::Id),
+ pb.NewDataType(NUdf::TDataType<char*>::Id)
+ }));
+
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.GraceJoin(
+ pb.ExpandMap(pb.ToFlow(list1), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
+ pb.ExpandMap(pb.ToFlow(list2), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }),
+ EJoinKind::Inner, {0U, 0U}, {0U, 1U}, {1U, 0U}, {2U, 1U}, resultType),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); })
+ );
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue tuple;
+
+ UNIT_ASSERT(iterator.Next(tuple));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X");
+ UNIT_ASSERT(!iterator.Next(tuple));
+
+ }
+
+
+ }
Y_UNIT_TEST_LLVM(TestInnerStringKey1) {