diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-02-04 18:15:20 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-02-04 21:17:12 +0300 |
commit | 3baeef8f8741d96e951eaf9b62176fb8e9589041 (patch) | |
tree | 210bc40c6fe1fe558a1eaa5c307f8987f4fe2b65 | |
parent | b70e82eebea77757f33e6b77b6f1512cc326bfc6 (diff) | |
download | ydb-3baeef8f8741d96e951eaf9b62176fb8e9589041.tar.gz |
YT block cross join
commit_hash:c685c6dae1c5a6babaecb0ec7771113b18614118
-rw-r--r-- | yql/essentials/core/type_ann/type_ann_join.cpp | 12 | ||||
-rw-r--r-- | yt/yql/providers/yt/provider/yql_yt_join_impl.cpp | 121 |
2 files changed, 73 insertions, 60 deletions
diff --git a/yql/essentials/core/type_ann/type_ann_join.cpp b/yql/essentials/core/type_ann/type_ann_join.cpp index 498237b356..78c1c89f28 100644 --- a/yql/essentials/core/type_ann/type_ann_join.cpp +++ b/yql/essentials/core/type_ann/type_ann_join.cpp @@ -1006,9 +1006,9 @@ namespace NTypeAnnImpl { } const auto joinKind = input->Child(2)->Content(); - if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly") { + if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly"&& joinKind != "Cross") { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()), TStringBuilder() << "Unknown join kind: " << joinKind - << ", supported: Inner, Left, LeftSemi, LeftOnly")); + << ", supported: Inner, Left, LeftSemi, LeftOnly, Cross")); return IGraphTransformer::TStatus::Error; } @@ -1018,6 +1018,10 @@ namespace NTypeAnnImpl { } auto checkKeyColumns = [&](std::unordered_set<ui32>& keyColumns, bool isLeft, const TExprNode& keyColumnsNode, const TMultiExprType* itemType) { + if (joinKind == "Cross" && !keyColumnsNode.Children().empty()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyColumnsNode.Pos()), "Specifying key columns is not allowed for cross join")); + return false; + } for (const auto& keyColumnNode : keyColumnsNode.Children()) { auto position = GetWideBlockFieldPosition(*itemType, keyColumnNode->Content()); if (!position) { @@ -1030,6 +1034,10 @@ namespace NTypeAnnImpl { }; auto checkKeyDrops = [&](std::unordered_set<ui32>& keyDrops, bool isLeft, const std::unordered_set<ui32>& keyColumns, const TExprNode& keyDropsNode, const TMultiExprType* itemType) { + if (joinKind == "Cross" && !keyDropsNode.Children().empty()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyDropsNode.Pos()), "Specifying key drops is not allowed for cross join")); + return false; + } for (const auto& keyDropNode : keyDropsNode.Children()) { auto position = GetWideBlockFieldPosition(*itemType, keyDropNode->Content()); if (!position) { diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp index 5d9949a257..5002d3cac2 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -274,8 +274,6 @@ TStatus UpdateInMemorySizeUsingBlocksSetting(TMapJoinSettings& settings, TYtSect const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, const TVector<TYtPathInfo::TPtr>& tables) { - Y_ENSURE(!op.JoinKind->IsAtom("Cross")); - ui64 dataSize = 0; auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables); if (status != TStatus::Ok) { @@ -2087,46 +2085,56 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo } } + if (useBlocks) { + for (auto& [_, columnType] : columnTypes) { + if (!IsSupportedAsBlockType(pos, *columnType, ctx, *state->Types)) { + useBlocks = false; + YQL_CLOG(INFO, ProviderYt) << "Block mapjoin won't be used because of unsupported type: " << *columnType; + break; + } + } + } + TExprNode::TPtr joined; - if (!isCross) { + if (useBlocks) { TExprNode::TListType leftKeyColumnNodes; TExprNode::TListType leftKeyColumnNodesNullable; - auto mapInput = RemapNonConvertibleItems(listArg, mainLabel, *leftKeyColumns, outputKeyType, leftKeyColumnNodes, leftKeyColumnNodesNullable, ctx); - - if (useBlocks) { - for (auto& [_, columnType] : columnTypes) { - if (!IsSupportedAsBlockType(pos, *columnType, ctx, *state->Types)) { - useBlocks = false; - YQL_CLOG(INFO, ProviderYt) << "Block mapjoin won't be used because of unsupported type: " << *columnType; - break; - } - } - } - if (useBlocks) { - if (!mapJoinUseFlow) { - mapInput = ctx.Builder(pos) - .Callable("ToFlow") - .Add(0, std::move(mapInput)) - .Seal() - .Build(); - } + TExprNode::TPtr mapInput; + if (!isCross) { + mapInput = RemapNonConvertibleItems(listArg, mainLabel, *leftKeyColumns, outputKeyType, leftKeyColumnNodes, leftKeyColumnNodesNullable, ctx); + } else { + YQL_ENSURE(remappedMembers.empty()); + mapInput = listArg; + } - tableContent = ctx.Builder(pos) + if (!mapJoinUseFlow) { + mapInput = ctx.Builder(pos) .Callable("ToFlow") - .Add(0, std::move(tableContent)) - .Callable(1, "DependsOn") - .Add(0, listArg) - .Seal() + .Add(0, std::move(mapInput)) .Seal() .Build(); + } - joined = BuildBlockMapJoin(std::move(mapInput), std::move(tableContent), - leftKeyColumnNodes, leftOutputColumns, leftOutputColumnSources, leftUsedSourceColumns, - remappedMembers, rightOutputColumns, rightOutputColumnSources, rightUsedSourceColumns, - outItemType, joinType, pos, needPayload, isUniqueKey, ctx - ); - } else { + tableContent = ctx.Builder(pos) + .Callable("ToFlow") + .Add(0, std::move(tableContent)) + .Callable(1, "DependsOn") + .Add(0, listArg) + .Seal() + .Seal() + .Build(); + + joined = BuildBlockMapJoin(std::move(mapInput), std::move(tableContent), + leftKeyColumnNodes, leftOutputColumns, leftOutputColumnSources, leftUsedSourceColumns, + remappedMembers, rightOutputColumns, rightOutputColumnSources, rightUsedSourceColumns, + outItemType, joinType, pos, needPayload, isUniqueKey, ctx + ); + } else { + if (!isCross) { + TExprNode::TListType leftKeyColumnNodes; + TExprNode::TListType leftKeyColumnNodesNullable; + auto mapInput = RemapNonConvertibleItems(listArg, mainLabel, *leftKeyColumns, outputKeyType, leftKeyColumnNodes, leftKeyColumnNodesNullable, ctx); if (mapJoinUseFlow) { joined = ctx.Builder(pos) .Callable("FlatMap") @@ -2180,31 +2188,31 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo .Seal() .Build(); } - } - } else { - auto joinedOut = ctx.NewCallable(pos, "AsStruct", std::move(joinedOutNodes)); - auto joinedBody = ctx.Builder(pos) - .Callable("Map") - .Callable(0, "ToFlow") - .Add(0, std::move(tableContent)) - .Callable(1, "DependsOn") - .Add(0, listArg) + } else { + auto joinedOut = ctx.NewCallable(pos, "AsStruct", std::move(joinedOutNodes)); + auto joinedBody = ctx.Builder(pos) + .Callable("Map") + .Callable(0, "ToFlow") + .Add(0, std::move(tableContent)) + .Callable(1, "DependsOn") + .Add(0, listArg) + .Seal() + .Seal() + .Lambda(1) + .Param("smallRow") + .ApplyPartial(nullptr, std::move(joinedOut)).WithNode(*lookupArg, "smallRow").Seal() .Seal() .Seal() - .Lambda(1) - .Param("smallRow") - .ApplyPartial(nullptr, std::move(joinedOut)).WithNode(*lookupArg, "smallRow").Seal() - .Seal() - .Seal() - .Build(); + .Build(); - auto joinedLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { mainArg }), std::move(joinedBody)); - joined = ctx.Builder(pos) - .Callable("FlatMap") - .Add(0, listArg) - .Add(1, std::move(joinedLambda)) - .Seal() - .Build(); + auto joinedLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, { mainArg }), std::move(joinedBody)); + joined = ctx.Builder(pos) + .Callable("FlatMap") + .Add(0, listArg) + .Add(1, std::move(joinedLambda)) + .Seal() + .Build(); + } } auto mapLambda = ctx.NewLambda(pos, ctx.NewArguments(pos, {std::move(listArg)}), std::move(joined)); @@ -3605,9 +3613,6 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo bool mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW); bool mapJoinUseBlocks = state->Configuration->BlockMapJoin.Get().GetOrElse(state->Types->UseBlocks); - if (joinType == "Cross") { - mapJoinUseBlocks = false; - } if (leftTablesReady) { auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables, mapJoinUseFlow); |