diff options
author | Iuliia Sidorina <yulia@ydb.tech> | 2024-04-11 17:11:06 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-11 17:11:06 +0200 |
commit | cde0bcf70e3870035d8beaaca3c7501f6576b53d (patch) | |
tree | 6f5e102061f013d243a4275ce83287ef462d4231 | |
parent | dccc8c7a2fa1b0596997437703df8545895d1345 (diff) | |
download | ydb-cde0bcf70e3870035d8beaaca3c7501f6576b53d.tar.gz |
fix(kqp): pass column order to KqpIndexLookupJoin (#3654)
-rw-r--r-- | ydb/core/kqp/host/kqp_type_ann.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_compute.cpp | 50 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_program_builder.cpp | 34 | ||||
-rw-r--r-- | ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp | 47 |
4 files changed, 118 insertions, 27 deletions
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 7ec2a3f6c73..c697be8a853 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -1718,9 +1718,10 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx) TVector<const TItemExprType*> resultStructItems; for (const auto& item : leftRowType->GetItems()) { - resultStructItems.emplace_back( - ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", item->GetName()), item->GetItemType()) - ); + TString itemName = leftLabel.Value().empty() + ? TString(item->GetName()) + : TString::Join(leftLabel.Value(), ".", item->GetName()); + resultStructItems.emplace_back(ctx.MakeType<TItemExprType>(itemName, item->GetItemType())); } if (RightJoinSideAllowed(joinType.Value())) { @@ -1731,9 +1732,10 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx) ? ctx.MakeType<TOptionalExprType>(item->GetItemType()) : item->GetItemType(); - resultStructItems.emplace_back( - ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), itemType) - ); + TString itemName = rightLabel.Value().empty() + ? TString(item->GetName()) + : TString::Join(rightLabel.Value(), ".", item->GetName()); + resultStructItems.emplace_back(ctx.MakeType<TItemExprType>(itemName, itemType)); } } diff --git a/ydb/core/kqp/runtime/kqp_compute.cpp b/ydb/core/kqp/runtime/kqp_compute.cpp index 412ed9252cf..6443cb6f985 100644 --- a/ydb/core/kqp/runtime/kqp_compute.cpp +++ b/ydb/core/kqp/runtime/kqp_compute.cpp @@ -103,26 +103,24 @@ public: }; NUdf::TUnboxedValue FillResultItems(NUdf::TUnboxedValue leftRow, NUdf::TUnboxedValue rightRow, EOutputMode mode) { - auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsCount - : Self->LeftColumnsCount + Self->RightColumnsCount; + auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsIndices.size() + : Self->LeftColumnsIndices.size() + Self->RightColumnsIndices.size(); auto resultRow = Self->ResultRowCache.NewArray(Ctx, resultRowSize, ResultItems); - size_t resIdx = 0; - if (mode == EOutputMode::OnlyLeftRow || mode == EOutputMode::Both) { - for (size_t i = 0; i < Self->LeftColumnsCount; ++i) { - ResultItems[resIdx++] = std::move(leftRow.GetElement(i)); + for (size_t i = 0; i < Self->LeftColumnsIndices.size(); ++i) { + ResultItems[Self->LeftColumnsIndices[i]] = std::move(leftRow.GetElement(i)); } } if (mode == EOutputMode::Both) { if (rightRow.HasValue()) { - for (size_t i = 0; i < Self->RightColumnsCount; ++i) { - ResultItems[resIdx++] = std::move(rightRow.GetElement(i)); + for (size_t i = 0; i < Self->RightColumnsIndices.size(); ++i) { + ResultItems[Self->RightColumnsIndices[i]] = std::move(rightRow.GetElement(i)); } } else { - for (size_t i = 0; i < Self->RightColumnsCount; ++i) { - ResultItems[resIdx++] = NUdf::TUnboxedValuePod(); + for (size_t i = 0; i < Self->RightColumnsIndices.size(); ++i) { + ResultItems[Self->RightColumnsIndices[i]] = NUdf::TUnboxedValuePod(); } } } @@ -189,12 +187,12 @@ public: public: TKqpIndexLookupJoinWrapper(TComputationMutables& mutables, IComputationNode* inputNode, - EJoinKind joinType, ui64 leftColumnsCount, ui64 rightColumnsCount) + EJoinKind joinType, TVector<ui32>&& leftColumnsIndices, TVector<ui32>&& rightColumnsIndices) : TMutableComputationNode<TKqpIndexLookupJoinWrapper>(mutables) , InputNode(inputNode) , JoinType(joinType) - , LeftColumnsCount(leftColumnsCount) - , RightColumnsCount(rightColumnsCount) + , LeftColumnsIndices(std::move(leftColumnsIndices)) + , RightColumnsIndices(std::move(rightColumnsIndices)) , ResultRowCache(mutables) { } @@ -210,8 +208,8 @@ private: private: IComputationNode* InputNode; const EJoinKind JoinType; - const ui64 LeftColumnsCount; - const ui64 RightColumnsCount; + const TVector<ui32> LeftColumnsIndices; + const TVector<ui32> RightColumnsIndices; const TContainerCacheOnContext ResultRowCache; }; @@ -236,10 +234,26 @@ IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputation auto inputNode = LocateNode(ctx.NodeLocator, callable, 0); ui32 joinKind = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>(); - ui64 leftColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui64>(); - ui64 rightColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui64>(); + auto leftColumnsIndicesMap = AS_VALUE(TDictLiteral, callable.GetInput(2)); + auto rightColumnsIndicesMap = AS_VALUE(TDictLiteral, callable.GetInput(3)); + + TVector<ui32> leftColumnsIndices(leftColumnsIndicesMap->GetItemsCount()); + for (ui32 i = 0; i < leftColumnsIndicesMap->GetItemsCount(); ++i) { + auto item = leftColumnsIndicesMap->GetItem(i); + ui32 leftIndex = AS_VALUE(TDataLiteral, item.first)->AsValue().Get<ui32>(); + ui32 resultIndex = AS_VALUE(TDataLiteral, item.second)->AsValue().Get<ui32>(); + leftColumnsIndices[leftIndex] = resultIndex; + } + + TVector<ui32> rightColumnsIndices(rightColumnsIndicesMap->GetItemsCount()); + for (ui32 i = 0; i < rightColumnsIndicesMap->GetItemsCount(); ++i) { + auto item = rightColumnsIndicesMap->GetItem(i); + ui32 rightIndex = AS_VALUE(TDataLiteral, item.first)->AsValue().Get<ui32>(); + ui32 resultIndex = AS_VALUE(TDataLiteral, item.second)->AsValue().Get<ui32>(); + rightColumnsIndices[rightIndex] = resultIndex; + } - return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), leftColumnsCount, rightColumnsCount); + return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), std::move(leftColumnsIndices), std::move(rightColumnsIndices)); } } // namespace NMiniKQL diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp index 5a5ac97937b..e1be871344f 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.cpp +++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp @@ -348,12 +348,17 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c TStructTypeBuilder rowTypeBuilder(GetTypeEnvironment()); + TVector<TString> leftRowColumns; + leftRowColumns.reserve(leftRowType->GetMembersCount()); for (ui32 i = 0; i < leftRowType->GetMembersCount(); ++i) { TString newMemberName = leftLabel.empty() ? TString(leftRowType->GetMemberName(i)) : TString::Join(leftLabel, ".", leftRowType->GetMemberName(i)); rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i)); + leftRowColumns.push_back(newMemberName); } + TVector<TString> rightRowColumns; + rightRowColumns.reserve(rightRowType->GetMembersCount()); if (RightJoinSideAllowed(joinType)) { for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) { TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i)) @@ -368,16 +373,39 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c : rightRowType->GetMemberType(i); rowTypeBuilder.Add(newMemberName, memberType); + rightRowColumns.push_back(newMemberName); } } - auto returnType = NewStreamType(rowTypeBuilder.Build()); + auto resultRowStruct = rowTypeBuilder.Build(); + + TDictLiteralBuilder leftIndicesMap(GetTypeEnvironment(), + TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()), + TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()) + ); + + for (ui32 i = 0; i < leftRowColumns.size(); ++i) { + auto resultIndex = resultRowStruct->GetMemberIndex(leftRowColumns[i]); + leftIndicesMap.Add(NewDataLiteral<ui32>(i), NewDataLiteral<ui32>(resultIndex)); + } + + TDictLiteralBuilder rightIndicesMap(GetTypeEnvironment(), + TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()), + TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()) + ); + + for (ui32 i = 0; i < rightRowColumns.size(); ++i) { + auto resultIndex = resultRowStruct->GetMemberIndex(rightRowColumns[i]); + rightIndicesMap.Add(NewDataLiteral<ui32>(i), NewDataLiteral<ui32>(resultIndex)); + } + + auto returnType = NewStreamType(resultRowStruct); TCallableBuilder callableBuilder(Env, __func__, returnType); callableBuilder.Add(input); callableBuilder.Add(NewDataLiteral<ui32>((ui32)GetIndexLookupJoinKind(joinType))); - callableBuilder.Add(NewDataLiteral<ui64>(leftRowType->GetMembersCount())); - callableBuilder.Add(NewDataLiteral<ui64>(rightRowType->GetMembersCount())); + callableBuilder.Add(TRuntimeNode(leftIndicesMap.Build(), true)); + callableBuilder.Add(TRuntimeNode(rightIndicesMap.Build(), true)); return TRuntimeNode(callableBuilder.Build(), false); } diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 71340d420b7..eed4c92f29e 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -24,6 +24,12 @@ void PrepareTables(TSession session) { PRIMARY KEY (Key) ); + CREATE TABLE `/Root/Kv` ( + Key Int32, + Value String, + PRIMARY KEY (Key) + ); + CREATE TABLE `/Root/LaunchByProcessIdAndPinned` ( idx_processId Utf8, idx_pinned Bool, @@ -64,6 +70,12 @@ void PrepareTables(TSession session) { (NULL, "Value24"), (104, NULL); + REPLACE INTO `/Root/Kv` (Key, Value) VALUES + (1, "Value1"), + (2, "Value2"), + (3, "Value3"), + (4, "Value4"); + REPLACE INTO `/Root/LaunchByProcessIdAndPinned` (idx_processId, idx_pinned, idx_launchNumber) VALUES ("eProcess", false, 4), ("eProcess", true, 5), @@ -190,6 +202,41 @@ Y_UNIT_TEST_TWIN(Inner, StreamLookup) { ])", 2, StreamLookup); } +Y_UNIT_TEST_TWIN(JoinWithSubquery, StreamLookup) { + const auto query = R"( + $join = (SELECT l.Key AS lKey, l.Value AS lValue, r.Value AS rValue + FROM `/Root/Left` AS l + INNER JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ); + SELECT j.lValue AS Value FROM $join AS j INNER JOIN `/Root/Kv` AS kv + ON j.lKey = kv.Key; + )"; + + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookup); + + auto settings = TKikimrSettings().SetAppConfig(appConfig); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + PrepareTables(session); + + TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + + auto result = session.ExecuteDataQuery(Q_(query), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + CompareYson(R"([ + [["Value1"]]; + [["Value1"]]; + [["Value2"]]; + [["Value2"]] + ])", FormatResultSetYson(result.GetResultSet(0))); +} + Y_UNIT_TEST_TWIN(Left, StreamLookup) { Test( R"( |