aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2025-02-04 18:15:20 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2025-02-04 21:17:12 +0300
commit3baeef8f8741d96e951eaf9b62176fb8e9589041 (patch)
tree210bc40c6fe1fe558a1eaa5c307f8987f4fe2b65
parentb70e82eebea77757f33e6b77b6f1512cc326bfc6 (diff)
downloadydb-3baeef8f8741d96e951eaf9b62176fb8e9589041.tar.gz
YT block cross join
commit_hash:c685c6dae1c5a6babaecb0ec7771113b18614118
-rw-r--r--yql/essentials/core/type_ann/type_ann_join.cpp12
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.cpp121
2 files changed, 73 insertions, 60 deletions
diff --git a/yql/essentials/core/type_ann/type_ann_join.cpp b/yql/essentials/core/type_ann/type_ann_join.cpp
index 498237b356..78c1c89f28 100644
--- a/yql/essentials/core/type_ann/type_ann_join.cpp
+++ b/yql/essentials/core/type_ann/type_ann_join.cpp
@@ -1006,9 +1006,9 @@ namespace NTypeAnnImpl {
}
const auto joinKind = input->Child(2)->Content();
- if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly") {
+ if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly"&& joinKind != "Cross") {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()), TStringBuilder() << "Unknown join kind: " << joinKind
- << ", supported: Inner, Left, LeftSemi, LeftOnly"));
+ << ", supported: Inner, Left, LeftSemi, LeftOnly, Cross"));
return IGraphTransformer::TStatus::Error;
}
@@ -1018,6 +1018,10 @@ namespace NTypeAnnImpl {
}
auto checkKeyColumns = [&](std::unordered_set<ui32>& keyColumns, bool isLeft, const TExprNode& keyColumnsNode, const TMultiExprType* itemType) {
+ if (joinKind == "Cross" && !keyColumnsNode.Children().empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyColumnsNode.Pos()), "Specifying key columns is not allowed for cross join"));
+ return false;
+ }
for (const auto& keyColumnNode : keyColumnsNode.Children()) {
auto position = GetWideBlockFieldPosition(*itemType, keyColumnNode->Content());
if (!position) {
@@ -1030,6 +1034,10 @@ namespace NTypeAnnImpl {
};
auto checkKeyDrops = [&](std::unordered_set<ui32>& keyDrops, bool isLeft, const std::unordered_set<ui32>& keyColumns, const TExprNode& keyDropsNode, const TMultiExprType* itemType) {
+ if (joinKind == "Cross" && !keyDropsNode.Children().empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyDropsNode.Pos()), "Specifying key drops is not allowed for cross join"));
+ return false;
+ }
for (const auto& keyDropNode : keyDropsNode.Children()) {
auto position = GetWideBlockFieldPosition(*itemType, keyDropNode->Content());
if (!position) {
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
index 5d9949a257..5002d3cac2 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
@@ -274,8 +274,6 @@ TStatus UpdateInMemorySizeUsingBlocksSetting(TMapJoinSettings& settings, TYtSect
const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
const TVector<TYtPathInfo::TPtr>& tables)
{
- Y_ENSURE(!op.JoinKind->IsAtom("Cross"));
-
ui64 dataSize = 0;
auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables);
if (status != TStatus::Ok) {
@@ -2087,46 +2085,56 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo
}
}
+ if (useBlocks) {
+ for (auto& [_, columnType] : columnTypes) {
+ if (!IsSupportedAsBlockType(pos, *columnType, ctx, *state->Types)) {
+ useBlocks = false;
+ YQL_CLOG(INFO, ProviderYt) << "Block mapjoin won't be used because of unsupported type: " << *columnType;
+ break;
+ }
+ }
+ }
+
TExprNode::TPtr joined;
- if (!isCross) {
+ if (useBlocks) {
TExprNode::TListType leftKeyColumnNodes;
TExprNode::TListType leftKeyColumnNodesNullable;
- auto mapInput = RemapNonConvertibleItems(listArg, mainLabel, *leftKeyColumns, outputKeyType, leftKeyColumnNodes, leftKeyColumnNodesNullable, ctx);
-
- if (useBlocks) {
- for (auto& [_, columnType] : columnTypes) {
- if (!IsSupportedAsBlockType(pos, *columnType, ctx, *state->Types)) {
- useBlocks = false;
- YQL_CLOG(INFO, ProviderYt) << "Block mapjoin won't be used because of unsupported type: " << *columnType;
- break;
- }
- }
- }
- if (useBlocks) {
- if (!mapJoinUseFlow) {
- mapInput = ctx.Builder(pos)
- .Callable("ToFlow")
- .Add(0, std::move(mapInput))
- .Seal()
- .Build();
- }
+ TExprNode::TPtr mapInput;
+ if (!isCross) {
+ mapInput = RemapNonConvertibleItems(listArg, mainLabel, *leftKeyColumns, outputKeyType, leftKeyColumnNodes, leftKeyColumnNodesNullable, ctx);
+ } else {
+ YQL_ENSURE(remappedMembers.empty());
+ mapInput = listArg;
+ }
- tableContent = ctx.Builder(pos)
+ if (!mapJoinUseFlow) {
+ mapInput = ctx.Builder(pos)
.Callable("ToFlow")
- .Add(0, std::move(tableContent))
- .Callable(1, "DependsOn")
- .Add(0, listArg)
- .Seal()
+ .Add(0, std::move(mapInput))
.Seal()
.Build();
+ }
- joined = BuildBlockMapJoin(std::move(mapInput), std::move(tableContent),
- leftKeyColumnNodes, leftOutputColumns, leftOutputColumnSources, leftUsedSourceColumns,
- remappedMembers, rightOutputColumns, rightOutputColumnSources, rightUsedSourceColumns,
- outItemType, joinType, pos, needPayload, isUniqueKey, ctx
- );
- } else {
+ tableContent = ctx.Builder(pos)
+ .Callable("ToFlow")
+ .Add(0, std::move(tableContent))
+ .Callable(1, "DependsOn")
+ .Add(0, listArg)
+ .Seal()
+ .Seal()
+ .Build();
+
+ joined = BuildBlockMapJoin(std::move(mapInput), std::move(tableContent),
+ leftKeyColumnNodes, leftOutputColumns, leftOutputColumnSources, leftUsedSourceColumns,
+ remappedMembers, rightOutputColumns, rightOutputColumnSources, rightUsedSourceColumns,
+ outItemType, joinType, pos, needPayload, isUniqueKey, ctx
+ );
+ } else {
+ if (!isCross) {
+ TExprNode::TListType leftKeyColumnNodes;
+ TExprNode::TListType leftKeyColumnNodesNullable;
+ auto mapInput = RemapNonConvertibleItems(listArg, mainLabel, *leftKeyColumns, outputKeyType, leftKeyColumnNodes, leftKeyColumnNodesNullable, ctx);
if (mapJoinUseFlow) {
joined = ctx.Builder(pos)
.Callable("FlatMap")
@@ -2180,31 +2188,31 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo
.Seal()
.Build();
}
- }
- } else {
- auto joinedOut = ctx.NewCallable(pos, "AsStruct", std::move(joinedOutNodes));
- auto joinedBody = ctx.Builder(pos)
- .Callable("Map")
- .Callable(0, "ToFlow")
- .Add(0, std::move(tableContent))
- .Callable(1, "DependsOn")
- .Add(0, listArg)
+ } else {
+ auto joinedOut = ctx.NewCallable(pos, "AsStruct", std::move(joinedOutNodes));
+ auto joinedBody = ctx.Builder(pos)
+ .Callable("Map")
+ .Callable(0, "ToFlow")
+ .Add(0, std::move(tableContent))
+ .Callable(1, "DependsOn")
+ .Add(0, listArg)
+ .Seal()
+ .Seal()
+ .Lambda(1)
+ .Param("smallRow")
+ .ApplyPartial(nullptr, std::move(joinedOut)).WithNode(*lookupArg, "smallRow").Seal()
.Seal()
.Seal()
- .Lambda(1)
- .Param("smallRow")
- .ApplyPartial(nullptr, std::move(joinedOut)).WithNode(*lookupArg, "smallRow").Seal()
- .Seal()
- .Seal()
- .Build();
+ .Build();
- auto joinedLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { mainArg }), std::move(joinedBody));
- joined = ctx.Builder(pos)
- .Callable("FlatMap")
- .Add(0, listArg)
- .Add(1, std::move(joinedLambda))
- .Seal()
- .Build();
+ auto joinedLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { mainArg }), std::move(joinedBody));
+ joined = ctx.Builder(pos)
+ .Callable("FlatMap")
+ .Add(0, listArg)
+ .Add(1, std::move(joinedLambda))
+ .Seal()
+ .Build();
+ }
}
auto mapLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, {std::move(listArg)}), std::move(joined));
@@ -3605,9 +3613,6 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
bool mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW);
bool mapJoinUseBlocks = state->Configuration->BlockMapJoin.Get().GetOrElse(state->Types->UseBlocks);
- if (joinType == "Cross") {
- mapJoinUseBlocks = false;
- }
if (leftTablesReady) {
auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables, mapJoinUseFlow);