diff options
author | aakulaga <aakulaga@ydb.tech> | 2022-10-19 15:20:35 +0300 |
---|---|---|
committer | aakulaga <aakulaga@ydb.tech> | 2022-10-19 15:20:35 +0300 |
commit | c0012de5c7fd72d2f30add432b82326036545906 (patch) | |
tree | 95248a7669f7e2ae9e5ecd9b7eb2c83a2d3b4304 | |
parent | 377c1b9d204f622afa79cb685c837ca7195d1d3e (diff) | |
download | ydb-c0012de5c7fd72d2f30add432b82326036545906.tar.gz |
TPC-H passed
TPC-H queries passed
6 files changed, 153 insertions, 54 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_join.cpp b/ydb/library/yql/core/type_ann/type_ann_join.cpp index dc6eaa097e..291991c48b 100644 --- a/ydb/library/yql/core/type_ann/type_ann_join.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_join.cpp @@ -592,7 +592,7 @@ namespace NTypeAnnImpl { } auto columnType = GetFieldType(leftTupleType, *oldPos); - if ((joinKind == "Right" || joinKind == "Full" || joinKind == "Exclusion") && !columnType->IsOptionalOrNull()) { + if ((joinKind == "Right" || joinKind == "Full" || joinKind == "Exclusion" ) && !columnType->IsOptionalOrNull()) { columnType = ctx.Expr.MakeType<TOptionalExprType>(columnType); } diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp index 5fa805102d..31603195f5 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp @@ -848,7 +848,6 @@ TExprBase DqBuildGraceJoin(const TDqJoin& join, TExprContext& ctx) { static const std::set<std::string_view> supportedTypes = { "Inner"sv, "Left"sv, - "Cross"sv, "LeftOnly"sv, "LeftSemi"sv, "Right"sv, diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 27c64571cb..c431c1a8a4 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -31,15 +31,20 @@ struct TColumnDataPackInfo { NUdf::EDataSlot DataType = NUdf::EDataSlot::Uint32; // Data type of the column for standard types (TDataType) bool IsKeyColumn = false; // True if this columns is key for join bool IsString = false; // True if value is string + ui32 Offset = 0; // Offset of column in packed data +// TValuePacker Packer; // Packer for composite data types }; struct TGraceJoinPacker { ui64 NullsBitmapSize = 0; // Number of ui64 values for nulls bitmap + ui64 TuplesPacked = 0; // Total number of packed tuples + ui64 TuplesUnpacked = 0; // Total number of unpacked tuples + std::chrono::time_point<std::chrono::system_clock> StartTime; // Start time of execution + std::chrono::time_point<std::chrono::system_clock> EndTime; // End time of execution std::vector<ui64> TupleIntVals; // Packed value of all fixed length values of table tuple. Keys columns should be packed first. std::vector<ui32> TupleStrSizes; // Sizes of all packed strings std::vector<char*> TupleStrings; // All values of tuple strings std::vector<ui32> Offsets; // Offsets of table column values in bytes - std::vector<ui32> PackedIdx; // Indexes of initial columns after packing std::vector<TType*> ColumnTypes; // Types of all columns std::vector<TValuePacker> Packers; // Packers for composite data types const THolderFactory& HolderFactory; // To use during unpacking @@ -77,6 +82,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) { } if (colType->GetKind() != TType::EKind::Data) { + MKQL_ENSURE(false, "Unknown data type."); res.IsString = true; res.DataType = NUdf::EDataSlot::String; return; @@ -106,6 +112,10 @@ TColumnDataPackInfo GetPackInfo(TType* type) { res.Bytes = sizeof(i64); break; case NUdf::EDataSlot::Uint64: res.Bytes = sizeof(ui64); break; + case NUdf::EDataSlot::Float: + res.Bytes = sizeof(float); break; + case NUdf::EDataSlot::Double: + res.Bytes = sizeof(double); break; case NUdf::EDataSlot::Date: res.Bytes = sizeof(ui64); break; case NUdf::EDataSlot::Datetime: @@ -124,8 +134,11 @@ TColumnDataPackInfo GetPackInfo(TType* type) { res.Bytes = 10; break; case NUdf::EDataSlot::Decimal: res.Bytes = 16; break; + case NUdf::EDataSlot::String: + res.IsString = true; break; default: { + MKQL_ENSURE(false, "Unknown data type."); res.IsString = true; } } @@ -135,6 +148,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) { void TGraceJoinPacker::Pack() { + TuplesPacked++; for (ui64 i = 0; i < NullsBitmapSize; ++i) { TupleIntVals[i] = 0; // Clearing nulls bit array. Bit 1 means particular column contains null value } @@ -142,16 +156,17 @@ void TGraceJoinPacker::Pack() { for (ui64 i = 0; i < ColumnsPackInfo.size(); i++) { const TColumnDataPackInfo &pi = ColumnsPackInfo[i]; - ui32 offset = Offsets[i]; - NYql::NUdf::TUnboxedValue value = *TuplePtrs[i]; + ui32 offset = pi.Offset; + + NYql::NUdf::TUnboxedValue value = *TuplePtrs[pi.ColumnIdx]; if (!value) { // Null value - ui64 currNullsIdx = PackedIdx[i] / (sizeof(ui64) * 8); - ui64 remShift = ( PackedIdx[i] - currNullsIdx * (sizeof(ui64) * 8) ); + ui64 currNullsIdx = i / (sizeof(ui64) * 8); + ui64 remShift = ( i - currNullsIdx * (sizeof(ui64) * 8) ); ui64 bitMask = (0x1) << remShift; TupleIntVals[currNullsIdx] |= bitMask; continue; } - TType* type = ColumnTypes[i]; + TType* type = pi.MKQLType; TType* colType; if (type->IsOptional()) { @@ -182,14 +197,13 @@ void TGraceJoinPacker::Pack() { case NUdf::EDataSlot::Uint16: WriteUnaligned<ui16>(buffPtr, value.Get<ui16>()); break; case NUdf::EDataSlot::Int32: - *(reinterpret_cast<i32*> (buffPtr)) = value.Get<i32>(); break; + WriteUnaligned<i32>(buffPtr, value.Get<i32>()); break; case NUdf::EDataSlot::Uint32: WriteUnaligned<ui32>(buffPtr, value.Get<ui32>()); break; case NUdf::EDataSlot::Int64: WriteUnaligned<i64>(buffPtr, value.Get<i64>()); break; case NUdf::EDataSlot::Uint64: WriteUnaligned<ui64>(buffPtr, value.Get<ui64>()); break; - *(reinterpret_cast<ui64*> (buffPtr)) = value.Get<ui64>(); break; case NUdf::EDataSlot::Float: WriteUnaligned<float>(buffPtr, value.Get<float>()); break; case NUdf::EDataSlot::Double: @@ -233,7 +247,8 @@ void TGraceJoinPacker::Pack() { } default: { - auto str = TuplePtrs[i]->AsStringRef(); + + auto str = TuplePtrs[pi.ColumnIdx]->AsStringRef(); TupleStrings[offset] = str.Data(); TupleStrSizes[offset] = str.Size(); } @@ -243,23 +258,24 @@ void TGraceJoinPacker::Pack() { } void TGraceJoinPacker::UnPack() { + TuplesUnpacked++; for (ui64 i = 0; i < ColumnsPackInfo.size(); i++) { const TColumnDataPackInfo &pi = ColumnsPackInfo[i]; - ui32 offset = Offsets[i]; - NYql::NUdf::TUnboxedValue & value = *TuplePtrs[i]; + ui32 offset = pi.Offset; + NYql::NUdf::TUnboxedValue & value = *TuplePtrs[pi.ColumnIdx]; if (JoinTupleData.AllNulls) { value = NYql::NUdf::TUnboxedValue(); continue; } - ui64 currNullsIdx = PackedIdx[i] / (sizeof(ui64) * 8); - ui64 remShift = ( PackedIdx[i] - currNullsIdx * (sizeof(ui64) * 8) ); + ui64 currNullsIdx = i / (sizeof(ui64) * 8); + ui64 remShift = ( i - currNullsIdx * (sizeof(ui64) * 8) ); ui64 bitMask = (0x1) << remShift; if ( TupleIntVals[currNullsIdx] & bitMask ) { value = NYql::NUdf::TUnboxedValue(); continue; } - TType * type = ColumnTypes[i]; + TType * type = pi.MKQLType; TType * colType; if (type->IsOptional()) { @@ -360,16 +376,24 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con auto colType = columnTypes[i]; auto packInfo = GetPackInfo(colType); packInfo.ColumnIdx = i; - allColumnsPackInfo.push_back(packInfo); - Packers.push_back(TValuePacker(true,colType)); - } + - for ( auto const & keyColIdx: keyColumns ) { - allColumnsPackInfo[keyColIdx].IsKeyColumn = true; - } + ui32 keyColNums = std::count_if(keyColumns.begin(), keyColumns.end(), [&](ui32 k) {return k == i;}); + + if (keyColNums > 0) { + packInfo.IsKeyColumn = true; + for (ui32 j = 0; j < keyColNums; j++) { + ColumnsPackInfo.push_back(packInfo); + Packers.push_back(TValuePacker(true,colType)); + } + } else { + ColumnsPackInfo.push_back(packInfo); + Packers.push_back(TValuePacker(true,colType)); + } + } - ColumnsPackInfo = allColumnsPackInfo; + nColumns = ColumnsPackInfo.size(); ui64 totalIntColumnsNum = std::count_if(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo a) { return !a.IsString; }); ui64 totalStrColumnsNum = nColumns - totalIntColumnsNum; @@ -397,7 +421,7 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con JoinTupleData.StrColumns = TupleStrings.data(); JoinTupleData.StrSizes = TupleStrSizes.data(); - std::sort(allColumnsPackInfo.begin(), allColumnsPackInfo.end(), [](TColumnDataPackInfo & a, TColumnDataPackInfo & b) + std::sort(ColumnsPackInfo.begin(), ColumnsPackInfo.end(), [](TColumnDataPackInfo & a, TColumnDataPackInfo & b) { if (a.IsKeyColumn && !b.IsKeyColumn) return true; if (a.Bytes > b.Bytes) return true; @@ -406,7 +430,6 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con }); Offsets.resize(nColumns); - PackedIdx.resize(nColumns); TupleHolder.resize(nColumns); TupleStringHolder.resize(nColumns); @@ -416,15 +439,16 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con ui32 currStrOffset = 0; ui32 currIdx = 0; - for( auto const & p: allColumnsPackInfo ) { + for( auto & p: ColumnsPackInfo ) { if ( !p.IsString ) { + p.Offset = currIntOffset; Offsets[p.ColumnIdx] = currIntOffset; currIntOffset += p.Bytes; } else { + p.Offset = currStrOffset; Offsets[p.ColumnIdx] = currStrOffset; currStrOffset++; } - PackedIdx[p.ColumnIdx] = currIdx; currIdx++; } @@ -625,11 +649,18 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox const NKikimr::NMiniKQL::EFetchResult resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data()); if (resultLeft == EFetchResult::One) { + if (LeftPacker->TuplesPacked == 0) { + LeftPacker->StartTime = std::chrono::system_clock::now(); + } LeftPacker->Pack(); LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data()); } if (resultRight == EFetchResult::One) { + if (RightPacker->TuplesPacked == 0) { + RightPacker->StartTime = std::chrono::system_clock::now(); + } + RightPacker->Pack(); RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data()); } @@ -640,9 +671,14 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox if (resultRight == EFetchResult::Finish && resultLeft == EFetchResult::Finish && !*JoinCompleted) { *JoinCompleted = true; + LeftPacker->StartTime = std::chrono::system_clock::now(); + RightPacker->StartTime = std::chrono::system_clock::now(); JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind); JoinedTablePtr->ResetIterator(); - } + LeftPacker->EndTime = std::chrono::system_clock::now(); + RightPacker->EndTime = std::chrono::system_clock::now(); + + } } JoinedTuple->resize((LeftRenames.size() + RightRenames.size()) / 2); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index 0f9cad8ab7..896a566d5d 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -15,12 +15,9 @@ namespace GraceJoin { void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * stringsSizes ) { + TotalPacked++; TempTuple.clear(); -// TempTuple.resize( NullsBitmapSize + NumberOfKeyIntColumns ); -// ui64 * memCopyStart = TempTuple.data() + TempTuple.size() - (NullsBitmapSize + NumberOfKeyIntColumns); -// std::memcpy(memCopyStart, intColumns, (NullsBitmapSize + NumberOfKeyIntColumns)*sizeof(ui64)); - TempTuple.insert(TempTuple.end(), intColumns, intColumns + NullsBitmapSize + NumberOfKeyIntColumns); @@ -83,7 +80,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings } // Adding data values - ui64 * dataColumns = intColumns + NumberOfKeyIntColumns; + ui64 * dataColumns = intColumns + NullsBitmapSize + NumberOfKeyIntColumns; dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns); // Adding strings values for data columns @@ -251,7 +248,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { ui32 tuple1Idx = 0; auto it1 = bucket1->KeyIntVals.begin(); - while ( it1 != bucket1->KeyIntVals.end() ) { + while ( it1 < bucket1->KeyIntVals.end() ) { ui64 valSize = (!JoinTable1->NumberOfKeyStringColumns) ? headerSize : headerSize + *(it1 + headerSize - JoinTable1->TotalStringsSize); ui64 hash = *it1; @@ -307,19 +304,14 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind ) { it1 += valSize; tuple1Idx ++; } - - if ( JoinKind == EJoinKind::Left || JoinKind == EJoinKind::LeftOnly || JoinKind == EJoinKind::LeftSemi || - JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::RightSemi || - JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion ) + std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b) { - std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b) - { - if (a.id1 < b.id1) return true; - if (a.id1 == b.id1 && (a.id2 < b.id2)) return true; - return false; - }); + if (a.id1 < b.id1) return true; + if (a.id1 == b.id1 && (a.id2 < b.id2)) return true; + return false; + }); - } + TableBuckets[bucket].JoinIds = std::move(joinResults); if ( JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion ) { std::vector<ui32> & rightIds = TableBuckets[bucket].RightIds; @@ -361,7 +353,7 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { dataIntsOffset = NumberOfDataIntColumns * tupleId; for ( ui64 i = 0; i < NumberOfDataIntColumns; ++i) { - td.IntColumns[NumberOfKeyIntColumns + i] = tb.DataIntVals[dataIntsOffset + i]; + td.IntColumns[NumberOfKeyIntColumns + NullsBitmapSize + i] = tb.DataIntVals[dataIntsOffset + i]; } char *strPtr = nullptr; @@ -410,7 +402,7 @@ inline bool TTable::HasJoinedTupleId(TTable *joinedTable, ui32 &tupleId2) { } else { - return false; + return false; } } @@ -724,7 +716,7 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns, TotalStringsSize = (numberOfKeyStringColumns > 0 ) ? 1 : 0; - HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + TotalStringsSize; + HeaderSize = HashSize + NullsBitmapSize + NumberOfKeyIntColumns + TotalStringsSize; TableBuckets.resize(NumberOfBuckets); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 5ea68857df..934a897602 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -110,7 +110,9 @@ class TTable { inline void GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData& td); // True if current iterator of tuple in joinedTable has corresponding joined tuple in second table. Id of joined tuple in second table returns in tupleId2. - inline bool HasJoinedTupleId(TTable* joinedTable, ui32& tupleId2); + inline bool HasJoinedTupleId(TTable* joinedTable, ui32& tupleId2); + + ui64 TotalPacked = 0; public: diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp index 99622ac38e..3a7133313b 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp @@ -207,7 +207,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { for ( ui64 i = 0; i < BigTableTuples; i++) { - tuple[1] = i % (SmallTableTuples + 2); + tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples ); tuple[2] = tuple[1]; bigTable.AddTuple(tuple, strVals, strSizes); } @@ -215,7 +215,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { smallTable.AddTuple(tuple, bigStrVal, bigStrSize); for ( ui64 i = 0; i < SmallTableTuples + 1; i++) { - tuple[1] = i; + tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples ); tuple[2] = tuple[1]; smallTable.AddTuple(tuple, strVals, strSizes); } @@ -233,7 +233,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { for ( ui64 i = 0; i < BigTableTuples; i++) { - tuple[1] = i % (SmallTableTuples + 2); + tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples ); tuple[2] = tuple[1]; bigTable.AddTuple(tuple, strVals, strSizes); } @@ -241,7 +241,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { smallTable.AddTuple(tuple, bigStrVal, bigStrSize); for ( ui64 i = 0; i < SmallTableTuples + 1; i++) { - tuple[1] = i; + tuple[1] = std::rand() / ( RAND_MAX / SmallTableTuples ); tuple[2] = tuple[1]; smallTable.AddTuple(tuple, strVals, strSizes); } @@ -300,7 +300,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now(); - joinTable.Join(bigTable,smallTable); + joinTable.Join(smallTable,bigTable); std::chrono::steady_clock::time_point end05 = std::chrono::steady_clock::now(); CTEST << "Time for join = " << std::chrono::duration_cast<std::chrono::milliseconds>(end05 - begin05).count() << "[ms]" << Endl; @@ -334,6 +334,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { + Y_UNIT_TEST_LLVM(TestInner1) { for (ui32 pass = 0; pass < 1; ++pass) { @@ -401,6 +402,75 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } + Y_UNIT_TEST_LLVM(TestInnerDoubleCondition1) { + + for (ui32 pass = 0; pass < 1; ++pass) { + TSetup<LLVM> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto key1 = pb.NewDataLiteral<ui32>(1); + const auto key2 = pb.NewDataLiteral<ui32>(2); + const auto key3 = pb.NewDataLiteral<ui32>(4); + const auto key4 = pb.NewDataLiteral<ui32>(4); + const auto payload1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("A"); + const auto payload2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("B"); + const auto payload3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("C"); + const auto payload4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("X"); + const auto payload5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Y"); + const auto payload6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Z"); + + const auto tupleType1 = pb.NewTupleType({ + pb.NewDataType(NUdf::TDataType<ui32>::Id), + pb.NewDataType(NUdf::TDataType<char*>::Id) + }); + + const auto tupleType2 = pb.NewTupleType({ + pb.NewDataType(NUdf::TDataType<ui32>::Id), + pb.NewDataType(NUdf::TDataType<ui32>::Id), + pb.NewDataType(NUdf::TDataType<char*>::Id) + }); + + + const auto list1 = pb.NewList(tupleType1, { + pb.NewTuple({key1, payload1}), + pb.NewTuple({key2, payload2}), + pb.NewTuple({key3, payload3}) + }); + + const auto list2 = pb.NewList(tupleType2, { + pb.NewTuple({key2, key2, payload4}), + pb.NewTuple({key3, key2, payload5}), + pb.NewTuple({key4, key1, payload6}) + }); + + + const auto resultType = pb.NewFlowType(pb.NewTupleType({ + pb.NewDataType(NUdf::TDataType<char*>::Id), + pb.NewDataType(NUdf::TDataType<char*>::Id) + })); + + const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.GraceJoin( + pb.ExpandMap(pb.ToFlow(list1), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), + pb.ExpandMap(pb.ToFlow(list2), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }), + EJoinKind::Inner, {0U, 0U}, {0U, 1U}, {1U, 0U}, {2U, 1U}, resultType), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); }) + ); + + const auto graph = setup.BuildGraph(pgmReturn); + + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue tuple; + + UNIT_ASSERT(iterator.Next(tuple)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X"); + UNIT_ASSERT(!iterator.Next(tuple)); + + } + + + } Y_UNIT_TEST_LLVM(TestInnerStringKey1) { |