aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <yulia@ydb.tech>2024-04-11 17:11:06 +0200
committerGitHub <noreply@github.com>2024-04-11 17:11:06 +0200
commitcde0bcf70e3870035d8beaaca3c7501f6576b53d (patch)
tree6f5e102061f013d243a4275ce83287ef462d4231
parentdccc8c7a2fa1b0596997437703df8545895d1345 (diff)
downloadydb-cde0bcf70e3870035d8beaaca3c7501f6576b53d.tar.gz
fix(kqp): pass column order to KqpIndexLookupJoin (#3654)
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp14
-rw-r--r--ydb/core/kqp/runtime/kqp_compute.cpp50
-rw-r--r--ydb/core/kqp/runtime/kqp_program_builder.cpp34
-rw-r--r--ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp47
4 files changed, 118 insertions, 27 deletions
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index 7ec2a3f6c73..c697be8a853 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -1718,9 +1718,10 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
TVector<const TItemExprType*> resultStructItems;
for (const auto& item : leftRowType->GetItems()) {
- resultStructItems.emplace_back(
- ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", item->GetName()), item->GetItemType())
- );
+ TString itemName = leftLabel.Value().empty()
+ ? TString(item->GetName())
+ : TString::Join(leftLabel.Value(), ".", item->GetName());
+ resultStructItems.emplace_back(ctx.MakeType<TItemExprType>(itemName, item->GetItemType()));
}
if (RightJoinSideAllowed(joinType.Value())) {
@@ -1731,9 +1732,10 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
? ctx.MakeType<TOptionalExprType>(item->GetItemType())
: item->GetItemType();
- resultStructItems.emplace_back(
- ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), itemType)
- );
+ TString itemName = rightLabel.Value().empty()
+ ? TString(item->GetName())
+ : TString::Join(rightLabel.Value(), ".", item->GetName());
+ resultStructItems.emplace_back(ctx.MakeType<TItemExprType>(itemName, itemType));
}
}
diff --git a/ydb/core/kqp/runtime/kqp_compute.cpp b/ydb/core/kqp/runtime/kqp_compute.cpp
index 412ed9252cf..6443cb6f985 100644
--- a/ydb/core/kqp/runtime/kqp_compute.cpp
+++ b/ydb/core/kqp/runtime/kqp_compute.cpp
@@ -103,26 +103,24 @@ public:
};
NUdf::TUnboxedValue FillResultItems(NUdf::TUnboxedValue leftRow, NUdf::TUnboxedValue rightRow, EOutputMode mode) {
- auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsCount
- : Self->LeftColumnsCount + Self->RightColumnsCount;
+ auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsIndices.size()
+ : Self->LeftColumnsIndices.size() + Self->RightColumnsIndices.size();
auto resultRow = Self->ResultRowCache.NewArray(Ctx, resultRowSize, ResultItems);
- size_t resIdx = 0;
-
if (mode == EOutputMode::OnlyLeftRow || mode == EOutputMode::Both) {
- for (size_t i = 0; i < Self->LeftColumnsCount; ++i) {
- ResultItems[resIdx++] = std::move(leftRow.GetElement(i));
+ for (size_t i = 0; i < Self->LeftColumnsIndices.size(); ++i) {
+ ResultItems[Self->LeftColumnsIndices[i]] = std::move(leftRow.GetElement(i));
}
}
if (mode == EOutputMode::Both) {
if (rightRow.HasValue()) {
- for (size_t i = 0; i < Self->RightColumnsCount; ++i) {
- ResultItems[resIdx++] = std::move(rightRow.GetElement(i));
+ for (size_t i = 0; i < Self->RightColumnsIndices.size(); ++i) {
+ ResultItems[Self->RightColumnsIndices[i]] = std::move(rightRow.GetElement(i));
}
} else {
- for (size_t i = 0; i < Self->RightColumnsCount; ++i) {
- ResultItems[resIdx++] = NUdf::TUnboxedValuePod();
+ for (size_t i = 0; i < Self->RightColumnsIndices.size(); ++i) {
+ ResultItems[Self->RightColumnsIndices[i]] = NUdf::TUnboxedValuePod();
}
}
}
@@ -189,12 +187,12 @@ public:
public:
TKqpIndexLookupJoinWrapper(TComputationMutables& mutables, IComputationNode* inputNode,
- EJoinKind joinType, ui64 leftColumnsCount, ui64 rightColumnsCount)
+ EJoinKind joinType, TVector<ui32>&& leftColumnsIndices, TVector<ui32>&& rightColumnsIndices)
: TMutableComputationNode<TKqpIndexLookupJoinWrapper>(mutables)
, InputNode(inputNode)
, JoinType(joinType)
- , LeftColumnsCount(leftColumnsCount)
- , RightColumnsCount(rightColumnsCount)
+ , LeftColumnsIndices(std::move(leftColumnsIndices))
+ , RightColumnsIndices(std::move(rightColumnsIndices))
, ResultRowCache(mutables) {
}
@@ -210,8 +208,8 @@ private:
private:
IComputationNode* InputNode;
const EJoinKind JoinType;
- const ui64 LeftColumnsCount;
- const ui64 RightColumnsCount;
+ const TVector<ui32> LeftColumnsIndices;
+ const TVector<ui32> RightColumnsIndices;
const TContainerCacheOnContext ResultRowCache;
};
@@ -236,10 +234,26 @@ IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputation
auto inputNode = LocateNode(ctx.NodeLocator, callable, 0);
ui32 joinKind = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
- ui64 leftColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui64>();
- ui64 rightColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui64>();
+ auto leftColumnsIndicesMap = AS_VALUE(TDictLiteral, callable.GetInput(2));
+ auto rightColumnsIndicesMap = AS_VALUE(TDictLiteral, callable.GetInput(3));
+
+ TVector<ui32> leftColumnsIndices(leftColumnsIndicesMap->GetItemsCount());
+ for (ui32 i = 0; i < leftColumnsIndicesMap->GetItemsCount(); ++i) {
+ auto item = leftColumnsIndicesMap->GetItem(i);
+ ui32 leftIndex = AS_VALUE(TDataLiteral, item.first)->AsValue().Get<ui32>();
+ ui32 resultIndex = AS_VALUE(TDataLiteral, item.second)->AsValue().Get<ui32>();
+ leftColumnsIndices[leftIndex] = resultIndex;
+ }
+
+ TVector<ui32> rightColumnsIndices(rightColumnsIndicesMap->GetItemsCount());
+ for (ui32 i = 0; i < rightColumnsIndicesMap->GetItemsCount(); ++i) {
+ auto item = rightColumnsIndicesMap->GetItem(i);
+ ui32 rightIndex = AS_VALUE(TDataLiteral, item.first)->AsValue().Get<ui32>();
+ ui32 resultIndex = AS_VALUE(TDataLiteral, item.second)->AsValue().Get<ui32>();
+ rightColumnsIndices[rightIndex] = resultIndex;
+ }
- return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), leftColumnsCount, rightColumnsCount);
+ return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), std::move(leftColumnsIndices), std::move(rightColumnsIndices));
}
} // namespace NMiniKQL
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp
index 5a5ac97937b..e1be871344f 100644
--- a/ydb/core/kqp/runtime/kqp_program_builder.cpp
+++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp
@@ -348,12 +348,17 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c
TStructTypeBuilder rowTypeBuilder(GetTypeEnvironment());
+ TVector<TString> leftRowColumns;
+ leftRowColumns.reserve(leftRowType->GetMembersCount());
for (ui32 i = 0; i < leftRowType->GetMembersCount(); ++i) {
TString newMemberName = leftLabel.empty() ? TString(leftRowType->GetMemberName(i))
: TString::Join(leftLabel, ".", leftRowType->GetMemberName(i));
rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i));
+ leftRowColumns.push_back(newMemberName);
}
+ TVector<TString> rightRowColumns;
+ rightRowColumns.reserve(rightRowType->GetMembersCount());
if (RightJoinSideAllowed(joinType)) {
for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) {
TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i))
@@ -368,16 +373,39 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c
: rightRowType->GetMemberType(i);
rowTypeBuilder.Add(newMemberName, memberType);
+ rightRowColumns.push_back(newMemberName);
}
}
- auto returnType = NewStreamType(rowTypeBuilder.Build());
+ auto resultRowStruct = rowTypeBuilder.Build();
+
+ TDictLiteralBuilder leftIndicesMap(GetTypeEnvironment(),
+ TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()),
+ TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment())
+ );
+
+ for (ui32 i = 0; i < leftRowColumns.size(); ++i) {
+ auto resultIndex = resultRowStruct->GetMemberIndex(leftRowColumns[i]);
+ leftIndicesMap.Add(NewDataLiteral<ui32>(i), NewDataLiteral<ui32>(resultIndex));
+ }
+
+ TDictLiteralBuilder rightIndicesMap(GetTypeEnvironment(),
+ TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()),
+ TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment())
+ );
+
+ for (ui32 i = 0; i < rightRowColumns.size(); ++i) {
+ auto resultIndex = resultRowStruct->GetMemberIndex(rightRowColumns[i]);
+ rightIndicesMap.Add(NewDataLiteral<ui32>(i), NewDataLiteral<ui32>(resultIndex));
+ }
+
+ auto returnType = NewStreamType(resultRowStruct);
TCallableBuilder callableBuilder(Env, __func__, returnType);
callableBuilder.Add(input);
callableBuilder.Add(NewDataLiteral<ui32>((ui32)GetIndexLookupJoinKind(joinType)));
- callableBuilder.Add(NewDataLiteral<ui64>(leftRowType->GetMembersCount()));
- callableBuilder.Add(NewDataLiteral<ui64>(rightRowType->GetMembersCount()));
+ callableBuilder.Add(TRuntimeNode(leftIndicesMap.Build(), true));
+ callableBuilder.Add(TRuntimeNode(rightIndicesMap.Build(), true));
return TRuntimeNode(callableBuilder.Build(), false);
}
diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
index 71340d420b7..eed4c92f29e 100644
--- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
@@ -24,6 +24,12 @@ void PrepareTables(TSession session) {
PRIMARY KEY (Key)
);
+ CREATE TABLE `/Root/Kv` (
+ Key Int32,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+
CREATE TABLE `/Root/LaunchByProcessIdAndPinned` (
idx_processId Utf8,
idx_pinned Bool,
@@ -64,6 +70,12 @@ void PrepareTables(TSession session) {
(NULL, "Value24"),
(104, NULL);
+ REPLACE INTO `/Root/Kv` (Key, Value) VALUES
+ (1, "Value1"),
+ (2, "Value2"),
+ (3, "Value3"),
+ (4, "Value4");
+
REPLACE INTO `/Root/LaunchByProcessIdAndPinned` (idx_processId, idx_pinned, idx_launchNumber) VALUES
("eProcess", false, 4),
("eProcess", true, 5),
@@ -190,6 +202,41 @@ Y_UNIT_TEST_TWIN(Inner, StreamLookup) {
])", 2, StreamLookup);
}
+Y_UNIT_TEST_TWIN(JoinWithSubquery, StreamLookup) {
+ const auto query = R"(
+ $join = (SELECT l.Key AS lKey, l.Value AS lValue, r.Value AS rValue
+ FROM `/Root/Left` AS l
+ INNER JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ );
+ SELECT j.lValue AS Value FROM $join AS j INNER JOIN `/Root/Kv` AS kv
+ ON j.lKey = kv.Key;
+ )";
+
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookup);
+
+ auto settings = TKikimrSettings().SetAppConfig(appConfig);
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ PrepareTables(session);
+
+ TExecDataQuerySettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
+
+ auto result = session.ExecuteDataQuery(Q_(query), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+
+ CompareYson(R"([
+ [["Value1"]];
+ [["Value1"]];
+ [["Value2"]];
+ [["Value2"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+}
+
Y_UNIT_TEST_TWIN(Left, StreamLookup) {
Test(
R"(