diff options
18 files changed, 647 insertions, 73 deletions
diff --git a/yql/essentials/core/expr_nodes/yql_expr_nodes.json b/yql/essentials/core/expr_nodes/yql_expr_nodes.json index cc4becb1e1a..76e556533f7 100644 --- a/yql/essentials/core/expr_nodes/yql_expr_nodes.json +++ b/yql/essentials/core/expr_nodes/yql_expr_nodes.json @@ -1613,6 +1613,21 @@ ] }, { + "Name": "TCoBlockMapJoinCore", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "BlockMapJoinCore"}, + "Children": [ + {"Index": 0, "Name": "LeftInput", "Type": "TExprBase"}, + {"Index": 1, "Name": "RightInput", "Type": "TExprBase"}, + {"Index": 2, "Name": "JoinKind", "Type": "TCoAtom"}, + {"Index": 3, "Name": "LeftKeyColumns", "Type": "TCoAtomList"}, + {"Index": 4, "Name": "LeftKeyDrops", "Type": "TCoAtomList"}, + {"Index": 5, "Name": "RightKeyColumns", "Type": "TCoAtomList"}, + {"Index": 6, "Name": "RightKeyDrops", "Type": "TCoAtomList"}, + {"Index": 7, "Name": "Options", "Type": "TExprList"} + ] + }, + { "Name": "TCoGraceJoinCore", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "GraceJoinCore"}, diff --git a/yql/essentials/core/type_ann/type_ann_core.cpp b/yql/essentials/core/type_ann/type_ann_core.cpp index d448c7ed545..882adedcc56 100644 --- a/yql/essentials/core/type_ann/type_ann_core.cpp +++ b/yql/essentials/core/type_ann/type_ann_core.cpp @@ -12850,6 +12850,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot> ExtFunctions["BlockFunc"] = &BlockFuncWrapper; ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper; + Functions["BlockMapJoinCore"] = &BlockMapJoinCoreWrapper; + ExtFunctions["AsScalar"] = &AsScalarWrapper; ExtFunctions["WideToBlocks"] = &WideToBlocksWrapper; ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper; diff --git a/yql/essentials/core/type_ann/type_ann_impl.h b/yql/essentials/core/type_ann/type_ann_impl.h index 46d929b2350..717cb7b3ac8 100644 --- a/yql/essentials/core/type_ann/type_ann_impl.h +++ b/yql/essentials/core/type_ann/type_ann_impl.h @@ -35,6 +35,7 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus CombineCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus GroupingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus DecimalBinaryWrapperBase(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx, bool blocks); + IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TExprContext& ctx); diff --git a/yql/essentials/core/type_ann/type_ann_join.cpp b/yql/essentials/core/type_ann/type_ann_join.cpp index 140b15eca29..498237b3561 100644 --- a/yql/essentials/core/type_ann/type_ann_join.cpp +++ b/yql/essentials/core/type_ann/type_ann_join.cpp @@ -980,5 +980,138 @@ namespace NTypeAnnImpl { } } + IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + + if (!EnsureArgsCount(*input, 8, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType leftItemTypes; + if (!EnsureWideStreamBlockType(input->Head(), leftItemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + leftItemTypes.pop_back(); + auto leftItemType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + + TTypeAnnotationNode::TListType rightItemTypes; + if (!EnsureWideStreamBlockType(*input->Child(1), rightItemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + rightItemTypes.pop_back(); + auto rightItemType = input->Child(1)->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + + if (!EnsureAtom(*input->Child(2), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + const auto joinKind = input->Child(2)->Content(); + if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly") { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()), TStringBuilder() << "Unknown join kind: " << joinKind + << ", supported: Inner, Left, LeftSemi, LeftOnly")); + return IGraphTransformer::TStatus::Error; + } + + if (input->Child(3)->ChildrenSize() != input->Child(5)->ChildrenSize()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(5)->Pos()), TStringBuilder() << "Mismatch of key column count")); + return IGraphTransformer::TStatus::Error; + } + + auto checkKeyColumns = [&](std::unordered_set<ui32>& keyColumns, bool isLeft, const TExprNode& keyColumnsNode, const TMultiExprType* itemType) { + for (const auto& keyColumnNode : keyColumnsNode.Children()) { + auto position = GetWideBlockFieldPosition(*itemType, keyColumnNode->Content()); + if (!position) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyColumnNode->Pos()), TStringBuilder() << "Unknown " << (isLeft ? "left" : "right") << " key column: " << keyColumnNode->Content())); + return false; + } + keyColumns.insert(*position); + } + return true; + }; + + auto checkKeyDrops = [&](std::unordered_set<ui32>& keyDrops, bool isLeft, const std::unordered_set<ui32>& keyColumns, const TExprNode& keyDropsNode, const TMultiExprType* itemType) { + for (const auto& keyDropNode : keyDropsNode.Children()) { + auto position = GetWideBlockFieldPosition(*itemType, keyDropNode->Content()); + if (!position) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyDropNode->Pos()), TStringBuilder() << "Unknown " << (isLeft ? "left" : "right") << " key column: " << keyDropNode->Content())); + return false; + } + if (!keyColumns.contains(*position)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyDropNode->Pos()), TStringBuilder() << "Attempted to drop " << (isLeft ? "left" : "right") << " non-key column: " << keyDropNode->Content())); + return false; + } + if (!keyDrops.insert(*position).second) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyDropNode->Pos()), TStringBuilder() << "Duplicated " << (isLeft ? "left" : "right") << " key drop: " << keyDropNode->Content())); + return false; + } + } + return true; + }; + + for (size_t childIdx = 3; childIdx <= 6; childIdx++) { + if (!EnsureTupleOfAtoms(*input->Child(childIdx), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + } + + std::unordered_set<ui32> leftKeyColumns; + if (!checkKeyColumns(leftKeyColumns, true, *input->Child(3), leftItemType)) { + return IGraphTransformer::TStatus::Error; + } + + std::unordered_set<ui32> leftKeyDrops; + if (!checkKeyDrops(leftKeyDrops, true, leftKeyColumns, *input->Child(4), leftItemType)) { + return IGraphTransformer::TStatus::Error; + } + + std::unordered_set<ui32> rightKeyColumns; + if (!checkKeyColumns(rightKeyColumns, false, *input->Child(5), rightItemType)) { + return IGraphTransformer::TStatus::Error; + } + + std::unordered_set<ui32> rightKeyDrops; + if (!checkKeyDrops(rightKeyDrops, false, rightKeyColumns, *input->Child(6), rightItemType)) { + return IGraphTransformer::TStatus::Error; + } + + auto settingsValidator = [&](TStringBuf, TExprNode& node, TExprContext&) { return node.ChildrenSize() == 1; }; + if (!EnsureValidSettings(input->Tail(), {"rightAny"}, settingsValidator, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + std::vector<const TTypeAnnotationNode*> resultItems; + for (ui32 pos = 0; pos < leftItemTypes.size(); pos++) { + if (leftKeyDrops.contains(pos)) { + continue; + } + + resultItems.push_back(ctx.Expr.MakeType<TBlockExprType>(leftItemTypes[pos])); + } + + if (joinKind != "LeftSemi" && joinKind != "LeftOnly") { + for (ui32 pos = 0; pos < rightItemTypes.size(); pos++) { + if (rightKeyDrops.contains(pos)) { + continue; + } + + auto columnType = rightItemTypes[pos]; + if (joinKind == "Left" && !rightItemTypes[pos]->IsOptionalOrNull()) { + columnType = ctx.Expr.MakeType<TOptionalExprType>(columnType); + } + + resultItems.push_back(ctx.Expr.MakeType<TBlockExprType>(columnType)); + } + } else { + if (!rightKeyDrops.empty()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(6)->Pos()), TStringBuilder() << "Right key drops are not allowed for semi/only join")); + return IGraphTransformer::TStatus::Error; + } + } + + resultItems.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); + input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(ctx.Expr.MakeType<TMultiExprType>(resultItems))); + return IGraphTransformer::TStatus::Ok; + } + } // namespace NTypeAnnImpl } // namespace NYql diff --git a/yql/essentials/core/yql_expr_type_annotation.cpp b/yql/essentials/core/yql_expr_type_annotation.cpp index 56626afe87e..e8372de39f2 100644 --- a/yql/essentials/core/yql_expr_type_annotation.cpp +++ b/yql/essentials/core/yql_expr_type_annotation.cpp @@ -6035,6 +6035,13 @@ std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TS return std::nullopt; } +std::optional<ui32> GetWideBlockFieldPosition(const TMultiExprType& multiType, const TStringBuf& field) { + YQL_ENSURE(multiType.GetSize() >= 1); + if (ui32 pos; TryFromString(field, pos) && pos < multiType.GetSize() - 1) + return {pos}; + return std::nullopt; +} + bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx) { pgType = 0; convertToPg = false; diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h index a8def26cee8..508c9e42581 100644 --- a/yql/essentials/core/yql_expr_type_annotation.h +++ b/yql/essentials/core/yql_expr_type_annotation.h @@ -319,6 +319,7 @@ IGraphTransformer::TStatus NormalizeKeyValueTuples(const TExprNode::TPtr& input, std::optional<ui32> GetFieldPosition(const TMultiExprType& tupleType, const TStringBuf& field); std::optional<ui32> GetFieldPosition(const TTupleExprType& tupleType, const TStringBuf& field); std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TStringBuf& field); +std::optional<ui32> GetWideBlockFieldPosition(const TMultiExprType& tupleType, const TStringBuf& field); bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx); bool HasContextFuncs(const TExprNode& input); diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp index 847e4409b6c..189f8a0d2df 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp @@ -7,6 +7,7 @@ #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h> #include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h> #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> +#include <yql/essentials/minikql/mkql_block_map_join_utils.h> #include <yql/essentials/minikql/mkql_node_cast.h> #include <yql/essentials/minikql/mkql_program_builder.h> @@ -308,8 +309,6 @@ class TBlockIndex : public TComputationValue<TBlockIndex> { }; }; - static_assert(sizeof(TIndexMapValue) == 8); - using TBase = TComputationValue<TBlockIndex>; using TIndexMap = TRobinHoodHashFixedMap< ui64, @@ -319,6 +318,9 @@ class TBlockIndex : public TComputationValue<TBlockIndex> { TMKQLAllocator<char> >; + static_assert(sizeof(TIndexMapValue) == 8); + static_assert(std::max(TIndexMap::GetCellSize(), static_cast<ui32>(sizeof(TIndexNode))) == BlockMapJoinIndexEntrySize); + public: class TIterator { enum class EIteratorType { diff --git a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h index f5c40a69448..3387039477e 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h +++ b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h @@ -6,6 +6,7 @@ #include <vector> #include <span> +#include <yql/essentials/minikql/mkql_rh_hash_utils.h> #include <yql/essentials/utils/prefetch.h> #include <util/digest/city.h> @@ -109,7 +110,7 @@ public: // should be called after Insert if isNew is true Y_FORCE_INLINE void CheckGrow() { - if (Size * 2 >= Capacity) { + if (RHHashTableNeedsGrow(Size, Capacity)) { Grow(); } } @@ -124,7 +125,7 @@ public: template <typename TSink> Y_NO_INLINE void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) { - while (2 * (Size + batchRequest.size()) >= Capacity) { + while (RHHashTableNeedsGrow(Size + batchRequest.size(), Capacity)) { Grow(); } @@ -331,15 +332,7 @@ private: } Y_NO_INLINE void Grow() { - ui64 growFactor; - if (Capacity < 100'000) { - growFactor = 8; - } else if (Capacity < 1'000'000) { - growFactor = 4; - } else { - growFactor = 2; - } - auto newCapacity = Capacity * growFactor; + auto newCapacity = Capacity * CalculateRHHashTableGrowFactor(Capacity); auto newCapacityShift = 64 - MostSignificantBit(newCapacity); char *newData, *newDataEnd; Allocate(newCapacity, newData, newDataEnd); @@ -522,8 +515,7 @@ public: TBase::Init(); } - - ui32 GetCellSize() const { + static constexpr ui32 GetCellSize() { return sizeof(typename TBase::TPSLStorage) + sizeof(TKey) + sizeof(TPayload); } @@ -569,7 +561,7 @@ public: TBase::Init(); } - ui32 GetCellSize() const { + static constexpr ui32 GetCellSize() { return sizeof(typename TBase::TPSLStorage) + sizeof(TKey); } diff --git a/yql/essentials/minikql/mkql_block_map_join_utils.cpp b/yql/essentials/minikql/mkql_block_map_join_utils.cpp new file mode 100644 index 00000000000..e85390c08a0 --- /dev/null +++ b/yql/essentials/minikql/mkql_block_map_join_utils.cpp @@ -0,0 +1,12 @@ +#include "mkql_block_map_join_utils.h" +#include "mkql_rh_hash_utils.h" + +namespace NKikimr { +namespace NMiniKQL { + +ui64 EstimateBlockMapJoinIndexSize(ui64 rowsCount) { + return CalculateRHHashTableCapacity(rowsCount) * BlockMapJoinIndexEntrySize; +} + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/yql/essentials/minikql/mkql_block_map_join_utils.h b/yql/essentials/minikql/mkql_block_map_join_utils.h new file mode 100644 index 00000000000..8f3468a68ed --- /dev/null +++ b/yql/essentials/minikql/mkql_block_map_join_utils.h @@ -0,0 +1,13 @@ +#pragma once + +#include <util/system/types.h> + +namespace NKikimr { +namespace NMiniKQL { + +constexpr ui64 BlockMapJoinIndexEntrySize = 20; + +ui64 EstimateBlockMapJoinIndexSize(ui64 rowsCount); + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/yql/essentials/minikql/mkql_rh_hash_utils.cpp b/yql/essentials/minikql/mkql_rh_hash_utils.cpp new file mode 100644 index 00000000000..c620e7af436 --- /dev/null +++ b/yql/essentials/minikql/mkql_rh_hash_utils.cpp @@ -0,0 +1,33 @@ +#include "mkql_rh_hash_utils.h" + +namespace NKikimr { +namespace NMiniKQL { + +ui64 RHHashTableNeedsGrow(ui64 size, ui64 capacity) { + return size * 2 >= capacity; +} + +ui64 CalculateRHHashTableGrowFactor(ui64 currentCapacity) { + ui64 growFactor; + if (currentCapacity < 100'000) { + growFactor = 8; + } else if (currentCapacity < 1'000'000) { + growFactor = 4; + } else { + growFactor = 2; + } + + return growFactor; +} + +ui64 CalculateRHHashTableCapacity(ui64 targetSize) { + ui64 capacity = 256; + while (RHHashTableNeedsGrow(targetSize, capacity)) { + capacity *= CalculateRHHashTableGrowFactor(capacity); + } + + return capacity; +} + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/yql/essentials/minikql/mkql_rh_hash_utils.h b/yql/essentials/minikql/mkql_rh_hash_utils.h new file mode 100644 index 00000000000..fb981f6979c --- /dev/null +++ b/yql/essentials/minikql/mkql_rh_hash_utils.h @@ -0,0 +1,13 @@ +#pragma once + +#include <util/system/types.h> + +namespace NKikimr { +namespace NMiniKQL { + +ui64 RHHashTableNeedsGrow(ui64 size, ui64 capacity); +ui64 CalculateRHHashTableGrowFactor(ui64 currentCapacity); +ui64 CalculateRHHashTableCapacity(ui64 targetSize); + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/yql/essentials/minikql/ya.make b/yql/essentials/minikql/ya.make index b308132cb99..c1d03fb5875 100644 --- a/yql/essentials/minikql/ya.make +++ b/yql/essentials/minikql/ya.make @@ -7,6 +7,8 @@ SRCS( compact_hash.h defs.h mkql_alloc.cpp + mkql_block_map_join_utils.cpp + mkql_block_map_join_utils.h mkql_buffer.cpp mkql_buffer.h mkql_function_metadata.cpp @@ -30,6 +32,8 @@ SRCS( mkql_opt_literal.h mkql_program_builder.cpp mkql_program_builder.h + mkql_rh_hash_utils.cpp + mkql_rh_hash_utils.h mkql_runtime_version.cpp mkql_runtime_version.h mkql_stats_registry.cpp diff --git a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp index 211fc6c84c7..92ab62adb1b 100644 --- a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp +++ b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp @@ -1672,6 +1672,26 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType); }); + AddCallable("BlockMapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) { + const auto leftStream = MkqlBuildExpr(node.Head(), ctx); + const auto rightStream = MkqlBuildExpr(*node.Child(1), ctx); + const auto joinKind = GetJoinKind(node, node.Child(2)->Content()); + + const auto leftItemType = node.Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + const auto rightItemType = node.Child(1U)->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + + std::vector<ui32> leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops; + node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); + node.Child(4)->ForEachChild([&](const TExprNode& child){ leftKeyDrops.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); + node.Child(5)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); + node.Child(6)->ForEachChild([&](const TExprNode& child){ rightKeyDrops.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); + + bool rightAny = HasSetting(node.Tail(), "rightAny"); + + const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + return ctx.ProgramBuilder.BlockMapJoinCore(leftStream, rightStream, joinKind, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, rightAny, returnType); + }); + AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) { bool selfJoin = node.Content() == "GraceSelfJoinCore"; int shift = selfJoin ? 0 : 1; diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp index ffd5e8f0227..edf95828850 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.cpp +++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp @@ -308,6 +308,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx) REGISTER_SETTING(*this, MapJoinShardMinRows); REGISTER_SETTING(*this, MapJoinShardCount).Lower(1).Upper(10); REGISTER_SETTING(*this, MapJoinUseFlow); + REGISTER_SETTING(*this, BlockMapJoin); REGISTER_SETTING(*this, EvaluationTableSizeLimit).Upper(10_MB); // Max 10Mb REGISTER_SETTING(*this, LookupJoinLimit).Upper(10_MB); // Same as EvaluationTableSizeLimit REGISTER_SETTING(*this, LookupJoinMaxRows).Upper(10000); diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h index 700f36aae5e..1163f9ccbb7 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.h +++ b/yt/yql/providers/yt/common/yql_yt_settings.h @@ -227,6 +227,7 @@ struct TYtSettings { NCommon::TConfSetting<ui64, false> MapJoinShardMinRows; NCommon::TConfSetting<ui64, false> MapJoinShardCount; // [1-10] NCommon::TConfSetting<bool, false> MapJoinUseFlow; + NCommon::TConfSetting<bool, false> BlockMapJoin; NCommon::TConfSetting<NSize::TSize, false> LookupJoinLimit; NCommon::TConfSetting<ui64, false> LookupJoinMaxRows; NCommon::TConfSetting<NSize::TSize, false> EvaluationTableSizeLimit; diff --git a/yt/yql/providers/yt/opt/yql_yt_join.h b/yt/yql/providers/yt/opt/yql_yt_join.h index 3ccc860f7fe..fa5b057afbe 100644 --- a/yt/yql/providers/yt/opt/yql_yt_join.h +++ b/yt/yql/providers/yt/opt/yql_yt_join.h @@ -35,7 +35,9 @@ struct TMapJoinSettings { ui64 LeftSize = 0; ui64 RightSize = 0; ui64 LeftMemSize = 0; + ui64 LeftMemSizeUsingBlocks = 0; ui64 RightMemSize = 0; + ui64 RightMemSizeUsingBlocks = 0; bool LeftUnique = false; bool RightUnique = false; ui64 LeftCount = 0; diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp index b40c14e5a32..9f4c2aa4a1b 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -12,6 +12,7 @@ #include <yql/essentials/core/yql_expr_optimize.h> #include <yql/essentials/core/yql_opt_utils.h> #include <yql/essentials/core/yql_type_helpers.h> +#include <yql/essentials/minikql/mkql_block_map_join_utils.h> #include <yql/essentials/utils/log/log.h> #include <util/string/join.h> @@ -268,6 +269,28 @@ TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputS return TStatus::Ok; } +TStatus UpdateInMemorySizeUsingBlocksSetting(TMapJoinSettings& settings, TYtSection& inputSection, + const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft, + const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster, + const TVector<TYtPathInfo::TPtr>& tables) +{ + Y_ENSURE(!op.JoinKind->IsAtom("Cross")); + + ui64 dataSize = 0; + auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables); + if (status != TStatus::Ok) { + return status; + } + + const ui64 rows = isLeft ? settings.LeftRows : settings.RightRows; + const ui64 indexSize = NKikimr::NMiniKQL::EstimateBlockMapJoinIndexSize(rows); + YQL_CLOG(INFO, ProviderYt) << "Estimated block map join index size for " << (isLeft ? "left" : "right") << " table: " << indexSize << " bytes"; + + const ui64 result = dataSize + indexSize; + (isLeft ? settings.LeftMemSizeUsingBlocks : settings.RightMemSizeUsingBlocks) = result; + return TStatus::Ok; +} + TYtJoinNodeLeaf::TPtr ConvertYtEquiJoinToLeaf(const TYtJoinNodeOp& op, TPositionHandle pos, TExprContext& ctx) { auto joinLabelBuilder = Build<TCoAtomList>(ctx, pos); for (auto& x : op.Scope) { @@ -1432,9 +1455,223 @@ bool RewriteYtMergeJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, TYtJoin return true; } +TExprNode::TPtr BuildBlockMapJoin(TExprNode::TPtr leftFlow, TExprNode::TPtr rightFlow, + const TExprNode::TListType& leftKeyColumnNodes, const std::vector<TStringBuf>& leftOutputColumns, + const THashMap<TStringBuf, TString>& leftOutputColumnSources, const THashSet<TString>& leftUsedSourceColumns, + const TExprNode::TListType& rightKeyColumnNodes, const std::vector<TStringBuf>& rightOutputColumns, + const THashMap<TStringBuf, TString>& rightOutputColumnSources, const THashSet<TString>& rightUsedSourceColumns, + const TStructExprType* outItemType, TExprNode::TPtr joinType, TPositionHandle pos, bool needPayload, bool isUniqueKey, + TExprContext& ctx +) { + THashSet<TStringBuf> leftSourceKeyDrops; + for (auto& keyColumnNode : leftKeyColumnNodes) { + auto memberName = keyColumnNode->Content(); + if (!leftUsedSourceColumns.contains(memberName)) { + leftSourceKeyDrops.insert(memberName); + } + } + + THashSet<TStringBuf> rightSourceKeyDrops; + for (auto& keyColumnNode : rightKeyColumnNodes) { + auto memberName = keyColumnNode->Content(); + if (!rightUsedSourceColumns.contains(memberName)) { + rightSourceKeyDrops.insert(memberName); + } + } + + auto expandLambdaBuilder = [&]( + TExprNodeBuilder& builder, + THashMap<TStringBuf, ui32>& columnPositions, + THashMap<TString, ui32>& sourceKeyColumnPositions, + const std::vector<TStringBuf>& outputColumns, + const THashMap<TStringBuf, TString>& outputColumnSources, + const TExprNode::TListType& keyColumnNodes, + const THashSet<TStringBuf>& sourceKeyDrops + ) { + ui32 pos = 0; + size_t dropCount = 0; + + THashMap<TStringBuf, ui32> sourceColumnPositions; + for (auto& keyColumnNode : keyColumnNodes) { + auto memberName = keyColumnNode->Content(); + if (!sourceKeyColumnPositions.contains(memberName)) { + builder.Callable(pos, "Member") + .Arg(0, "item") + .Atom(1, memberName) + .Seal(); + + if (!sourceKeyDrops.contains(memberName)) { + sourceColumnPositions.emplace(memberName, pos - dropCount); + } else { + dropCount++; + } + + sourceKeyColumnPositions.emplace(memberName, pos); + pos++; + } + } + + for (auto& newName : outputColumns) { + auto& memberName = outputColumnSources.at(newName); + if (!sourceColumnPositions.contains(memberName)) { + builder.Callable(pos, "Member") + .Arg(0, "item") + .Atom(1, memberName) + .Seal(); + sourceColumnPositions.emplace(memberName, pos - dropCount); + pos++; + } + + columnPositions.emplace(newName, sourceColumnPositions[memberName]); + } + + return sourceColumnPositions.size(); + }; + + THashMap<TStringBuf, ui32> leftColumnPositions; + THashMap<TString, ui32> leftSourceKeyColumnPositions; // before drop + size_t leftInputSizeAfterDrop = 0; + auto leftExpandLambda = ctx.Builder(pos) + .Lambda() + .Param("item") + .Do([&](TExprNodeBuilder& builder) -> TExprNodeBuilder& { + leftInputSizeAfterDrop = expandLambdaBuilder( + builder, + leftColumnPositions, + leftSourceKeyColumnPositions, + leftOutputColumns, + leftOutputColumnSources, + leftKeyColumnNodes, + leftSourceKeyDrops + ); + return builder; + }) + .Seal() + .Build(); + + THashMap<TStringBuf, ui32> rightColumnPositions; + THashMap<TString, ui32> rightSourceKeyColumnPositions; // before drop + size_t rightInputSizeAfterDrop = 0; + auto rightExpandLambda = ctx.Builder(pos) + .Lambda() + .Param("item") + .Do([&](TExprNodeBuilder& builder) -> TExprNodeBuilder& { + rightInputSizeAfterDrop = expandLambdaBuilder( + builder, + rightColumnPositions, + rightSourceKeyColumnPositions, + rightOutputColumns, + rightOutputColumnSources, + rightKeyColumnNodes, + rightSourceKeyDrops + ); + return builder; + }) + .Seal() + .Build(); + + auto narrowLambda = ctx.Builder(pos) + .Lambda() + .Params("items", leftInputSizeAfterDrop + rightInputSizeAfterDrop) + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& builder) -> TExprNodeBuilder& { + size_t i = 0; + + for (auto& newName : leftOutputColumns) { + builder.List(i) + .Atom(0, newName) + .Arg(1, "items", leftColumnPositions.at(newName)) + .Seal(); + i++; + } + + for (auto& newName : rightOutputColumns) { + builder.List(i) + .Atom(0, newName) + .Arg(1, "items", leftInputSizeAfterDrop + rightColumnPositions.at(newName)) + .Seal(); + i++; + } + + YQL_ENSURE(i == outItemType->GetSize()); + return builder; + }) + .Seal() + .Seal() + .Build(); + + TExprNode::TListType leftKeyColumnPositionNodes; + for (auto& keyColumnNode : leftKeyColumnNodes) { + auto memberName = keyColumnNode->Content(); + leftKeyColumnPositionNodes.push_back(ctx.NewAtom(pos, leftSourceKeyColumnPositions.at(memberName))); + } + + TExprNode::TListType leftKeyDropPositionNodes; + for (auto& memberName : leftSourceKeyDrops) { + leftKeyDropPositionNodes.push_back(ctx.NewAtom(pos, leftSourceKeyColumnPositions.at(memberName))); + } + + TExprNode::TListType rightKeyColumnPositionNodes; + for (auto& keyColumnNode : rightKeyColumnNodes) { + auto memberName = keyColumnNode->Content(); + rightKeyColumnPositionNodes.push_back(ctx.NewAtom(pos, rightSourceKeyColumnPositions.at(memberName))); + } + + TExprNode::TListType rightKeyDropPositionNodes; + if (needPayload) { + for (auto& memberName : rightSourceKeyDrops) { + rightKeyDropPositionNodes.push_back(ctx.NewAtom(pos, rightSourceKeyColumnPositions.at(memberName))); + } + } + + auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, pos); + if (isUniqueKey) { + settingsBuilder + .Add() + .Name() + .Value("rightAny") + .Build() + .Build(); + } + + return ctx.Builder(pos) + .Callable("NarrowMap") + .Callable(0, "WideFromBlocks") + .Callable(0, "ToFlow") + .Callable(0, "BlockMapJoinCore") + .Callable(0, "FromFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "ExpandMap") + .Add(0, std::move(leftFlow)) + .Add(1, std::move(leftExpandLambda)) + .Seal() + .Seal() + .Seal() + .Callable(1, "FromFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "ExpandMap") + .Add(0, std::move(rightFlow)) + .Add(1, std::move(rightExpandLambda)) + .Seal() + .Seal() + .Seal() + .Add(2, std::move(joinType)) + .Add(3, ctx.NewList(pos, std::move(leftKeyColumnPositionNodes))) + .Add(4, ctx.NewList(pos, std::move(leftKeyDropPositionNodes))) + .Add(5, ctx.NewList(pos, std::move(rightKeyColumnPositionNodes))) + .Add(6, ctx.NewList(pos, std::move(rightKeyDropPositionNodes))) + .Add(7, settingsBuilder.Done().Ptr()) + .Seal() + .Seal() + .Seal() + .Add(1, std::move(narrowLambda)) + .Seal() + .Build(); +} + bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLookupJoin, TYtJoinNodeOp& op, const TYtJoinNodeLeaf& leftLeaf, const TYtJoinNodeLeaf& rightLeaf, - TExprContext& ctx, const TMapJoinSettings& settings, bool useShards, const TYtState::TPtr& state) + TExprContext& ctx, const TMapJoinSettings& settings, bool useShards, bool useBlocks, const TYtState::TPtr& state) { auto pos = equiJoin.Pos(); auto joinType = op.JoinKind; @@ -1787,6 +2024,15 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo TExprNode::TListType joinedOutNodes; TExprNode::TListType rightRenameNodes; TExprNode::TListType leftRenameNodes; + + std::vector<TStringBuf> leftOutputColumns; + THashMap<TStringBuf, TString> leftOutputColumnSources; + THashSet<TString> leftUsedSourceColumns; + + std::vector<TStringBuf> rightOutputColumns; + THashMap<TStringBuf, TString> rightOutputColumnSources; + THashSet<TString> rightUsedSourceColumns; + for (ui32 index = 0; index < outputLeftSchemeType->GetSize(); ++index) { auto item = outputLeftSchemeType->GetItems()[index]; TVector<TStringBuf> newNames; @@ -1806,6 +2052,9 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo for (auto newName : newNames) { leftRenameNodes.push_back(ctx.NewAtom(pos, memberName)); leftRenameNodes.push_back(ctx.NewAtom(pos, newName)); + leftUsedSourceColumns.insert(memberName); + leftOutputColumns.push_back(newName); + leftOutputColumnSources.emplace(newName, memberName); AddJoinRemappedColumn(pos, mainArg, joinedOutNodes, memberName, newName, ctx); } } @@ -1830,6 +2079,9 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo for (auto newName : newNames) { rightRenameNodes.push_back(ctx.NewAtom(pos, memberName)); rightRenameNodes.push_back(ctx.NewAtom(pos, newName)); + rightUsedSourceColumns.insert(memberName); + rightOutputColumns.push_back(newName); + rightOutputColumnSources.emplace(newName, memberName); AddJoinRemappedColumn(pos, lookupArg, joinedOutNodes, memberName, newName, ctx); } } @@ -1840,61 +2092,96 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo TExprNode::TListType leftKeyColumnNodes; TExprNode::TListType leftKeyColumnNodesNullable; auto mapInput = RemapNonConvertibleItems(listArg, mainLabel, *leftKeyColumns, outputKeyType, leftKeyColumnNodes, leftKeyColumnNodesNullable, ctx); - if (mapJoinUseFlow) { - joined = ctx.Builder(pos) - .Callable("FlatMap") - .Callable(0, "SqueezeToDict") - .Callable(0, "ToFlow") - .Add(0, std::move(tableContent)) - .Callable(1, "DependsOn") - .Add(0, listArg) + + if (useBlocks) { + for (auto& [_, columnType] : columnTypes) { + if (!IsSupportedAsBlockType(pos, *columnType, ctx, *state->Types)) { + useBlocks = false; + YQL_CLOG(INFO, ProviderYt) << "Block mapjoin won't be used because of unsupported type: " << *columnType; + break; + } + } + } + + if (useBlocks) { + if (!mapJoinUseFlow) { + mapInput = ctx.Builder(pos) + .Callable("ToFlow") + .Add(0, std::move(mapInput)) + .Seal() + .Build(); + } + + tableContent = ctx.Builder(pos) + .Callable("ToFlow") + .Add(0, std::move(tableContent)) + .Callable(1, "DependsOn") + .Add(0, listArg) + .Seal() + .Seal() + .Build(); + + joined = BuildBlockMapJoin(std::move(mapInput), std::move(tableContent), + leftKeyColumnNodes, leftOutputColumns, leftOutputColumnSources, leftUsedSourceColumns, + remappedMembers, rightOutputColumns, rightOutputColumnSources, rightUsedSourceColumns, + outItemType, joinType, pos, needPayload, isUniqueKey, ctx + ); + } else { + if (mapJoinUseFlow) { + joined = ctx.Builder(pos) + .Callable("FlatMap") + .Callable(0, "SqueezeToDict") + .Callable(0, "ToFlow") + .Add(0, std::move(tableContent)) + .Callable(1, "DependsOn") + .Add(0, listArg) + .Seal() .Seal() - .Seal() - .Add(1, std::move(smallKeySelector)) - .Add(2, std::move(smallPayloadSelector)) - .List(3) - .Atom(0, "Hashed", TNodeFlags::Default) - .Atom(1, needPayload && !isUniqueKey ? "Many" : "One", TNodeFlags::Default) - .Atom(2, "Compact", TNodeFlags::Default) + .Add(1, std::move(smallKeySelector)) + .Add(2, std::move(smallPayloadSelector)) .List(3) - .Atom(0, "ItemsCount", TNodeFlags::Default) - .Atom(1, ToString(dictItemsCount), TNodeFlags::Default) + .Atom(0, "Hashed", TNodeFlags::Default) + .Atom(1, needPayload && !isUniqueKey ? "Many" : "One", TNodeFlags::Default) + .Atom(2, "Compact", TNodeFlags::Default) + .List(3) + .Atom(0, "ItemsCount", TNodeFlags::Default) + .Atom(1, ToString(dictItemsCount), TNodeFlags::Default) + .Seal() .Seal() .Seal() - .Seal() - .Lambda(1) - .Param("dict") - .Callable("MapJoinCore") - .Add(0, std::move(mapInput)) - .Arg(1, "dict") - .Add(2, joinType) - .Add(3, ctx.NewList(pos, std::move(leftKeyColumnNodes))) - .Add(4, ctx.NewList(pos, std::move(remappedMembers))) - .Add(5, ctx.NewList(pos, std::move(leftRenameNodes))) - .Add(6, ctx.NewList(pos, std::move(rightRenameNodes))) - .Add(7, leftKeyColumns) - .Add(8, rightKeyColumns) + .Lambda(1) + .Param("dict") + .Callable("MapJoinCore") + .Add(0, std::move(mapInput)) + .Arg(1, "dict") + .Add(2, joinType) + .Add(3, ctx.NewList(pos, std::move(leftKeyColumnNodes))) + .Add(4, ctx.NewList(pos, std::move(remappedMembers))) + .Add(5, ctx.NewList(pos, std::move(leftRenameNodes))) + .Add(6, ctx.NewList(pos, std::move(rightRenameNodes))) + .Add(7, leftKeyColumns) + .Add(8, rightKeyColumns) + .Seal() .Seal() .Seal() - .Seal() - .Build(); - } else { - joined = ctx.Builder(pos) - .Callable("MapJoinCore") - .Add(0, mapInput) - .Add(1, dict) - .Add(2, joinType) - .Add(3, ctx.NewList(pos, std::move(leftKeyColumnNodes))) - .Add(4, ctx.NewList(pos, std::move(remappedMembers))) - .Add(5, ctx.NewList(pos, std::move(leftRenameNodes))) - .Add(6, ctx.NewList(pos, std::move(rightRenameNodes))) - .Add(7, leftKeyColumns) - .Add(8, rightKeyColumns) - .Seal() - .Build(); + .Build(); + } else { + joined = ctx.Builder(pos) + .Callable("MapJoinCore") + .Add(0, mapInput) + .Add(1, dict) + .Add(2, joinType) + .Add(3, ctx.NewList(pos, std::move(leftKeyColumnNodes))) + .Add(4, ctx.NewList(pos, std::move(remappedMembers))) + .Add(5, ctx.NewList(pos, std::move(leftRenameNodes))) + .Add(6, ctx.NewList(pos, std::move(rightRenameNodes))) + .Add(7, leftKeyColumns) + .Add(8, rightKeyColumns) + .Seal() + .Build(); + } } - } - else { + } else { auto joinedOut = ctx.NewCallable(pos, "AsStruct", std::move(joinedOutNodes)); auto joinedBody = ctx.Builder(pos) .Callable("Map") @@ -3036,12 +3323,12 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo DoSwap(mapSettings.LeftUnique, mapSettings.RightUnique); YQL_CLOG(INFO, ProviderYt) << "Selected LookupJoin: filter over the right table, use content of the left one, " << (op.LinkSettings.JoinAlgo != EJoinAlgoType::Undefined ? ToString(op.LinkSettings.JoinAlgo).c_str() : "no") << " cbo algo"; - return RewriteYtMapJoin(equiJoin, labels, true, op, rightLeaf, leftLeaf, ctx, mapSettings, false, state) ? + return RewriteYtMapJoin(equiJoin, labels, true, op, rightLeaf, leftLeaf, ctx, mapSettings, false, false, state) ? TStatus::Ok : TStatus::Error; } else { YQL_CLOG(INFO, ProviderYt) << "Selected LookupJoin: filter over the left table, use content of the right one, " << (op.LinkSettings.JoinAlgo != EJoinAlgoType::Undefined ? ToString(op.LinkSettings.JoinAlgo).c_str() : "no") << " cbo algo"; - return RewriteYtMapJoin(equiJoin, labels, true, op, leftLeaf, rightLeaf, ctx, mapSettings, false, state) ? + return RewriteYtMapJoin(equiJoin, labels, true, op, leftLeaf, rightLeaf, ctx, mapSettings, false, false, state) ? TStatus::Ok : TStatus::Error; } } @@ -3316,12 +3603,24 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo leftLimit *= *leftPartCount; } - auto mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW); + bool mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW); + bool mapJoinUseBlocks = state->Configuration->BlockMapJoin.Get().GetOrElse(state->Types->UseBlocks); + if (joinType == "Cross") { + mapJoinUseBlocks = false; + } + if (leftTablesReady) { auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables, mapJoinUseFlow); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } + + if (mapJoinUseBlocks) { + auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables); + if (status.Level != TStatus::Ok) { + return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; + } + } } if (rightTablesReady) { @@ -3329,22 +3628,34 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } + + if (mapJoinUseBlocks) { + auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables); + if (status.Level != TStatus::Ok) { + return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; + } + } } YQL_CLOG(INFO, ProviderYt) << "MapJoinShardMinRows: " << mapSettings.MapJoinShardMinRows << ", MapJoinShardCount: " << mapSettings.MapJoinShardCount << ", left is present: " << leftTablesReady << ", left size limit: " << leftLimit << ", left mem size:" << mapSettings.LeftMemSize + << ", left mem size using blocks: " << mapSettings.LeftMemSizeUsingBlocks << ", right is present: " << rightTablesReady - << ", right size limit: " << rightLimit << ", right mem size: " << mapSettings.RightMemSize; + << ", right size limit: " << rightLimit << ", right mem size: " << mapSettings.RightMemSize + << ", right mem size using blocks: " << mapSettings.RightMemSizeUsingBlocks; bool leftAny = linkSettings.LeftHints.contains("any"); bool rightAny = linkSettings.RightHints.contains("any"); - const bool isLeftAllowMapJoin = !leftAny && rightTablesReady && (mapSettings.RightMemSize <= rightLimit) && + const bool isLeftAllowBlockMapJoin = mapJoinUseBlocks && (mapSettings.RightMemSizeUsingBlocks <= rightLimit); + const bool isRightAllowBlockMapJoin = mapJoinUseBlocks && (mapSettings.LeftMemSizeUsingBlocks <= leftLimit); + + const bool isLeftAllowMapJoin = !leftAny && rightTablesReady && (mapSettings.RightMemSize <= rightLimit || isLeftAllowBlockMapJoin) && (joinType == "Inner" || joinType == "Left" || joinType == "LeftOnly" || joinType == "LeftSemi" || joinType == "Cross") && !rightStats.IsDynamic; - const bool isRightAllowMapJoin = !rightAny && leftTablesReady && (mapSettings.LeftMemSize <= leftLimit) && + const bool isRightAllowMapJoin = !rightAny && leftTablesReady && (mapSettings.LeftMemSize <= leftLimit || isRightAllowBlockMapJoin) && (joinType == "Inner" || joinType == "Right" || joinType == "RightOnly" || joinType == "RightSemi" || joinType == "Cross") && !leftStats.IsDynamic; YQL_CLOG(INFO, ProviderYt) << "MapJoin: isLeftAllowMapJoin: " << isLeftAllowMapJoin @@ -3361,17 +3672,28 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo mapSettings.SwapTables = swapTables; + if (mapJoinUseBlocks && !state->Types->UseBlocks && ( + (mapSettings.RightMemSize < mapSettings.RightMemSizeUsingBlocks) || + (swapTables && (mapSettings.LeftMemSize < mapSettings.LeftMemSizeUsingBlocks)) + )) { + ui64 memSize = swapTables ? mapSettings.LeftMemSize : mapSettings.RightMemSize; + ui64 memSizeUsingBlocks = swapTables ? mapSettings.LeftMemSizeUsingBlocks : mapSettings.RightMemSizeUsingBlocks; + YQL_CLOG(INFO, ProviderYt) << "Block mapjoin won't be used: memSize=" << memSize << " is less than memSizeUsingBlocks=" << memSizeUsingBlocks; + mapJoinUseBlocks = false; + } + if (swapTables) { DoSwap(mapSettings.LeftRows, mapSettings.RightRows); DoSwap(mapSettings.LeftSize, mapSettings.RightSize); DoSwap(mapSettings.LeftMemSize, mapSettings.RightMemSize); + DoSwap(mapSettings.LeftMemSizeUsingBlocks, mapSettings.RightMemSizeUsingBlocks); DoSwap(mapSettings.LeftUnique, mapSettings.RightUnique); YQL_CLOG(INFO, ProviderYt) << "Selected MapJoin: map over the right table, use content of the left one, " << (op.LinkSettings.JoinAlgo != EJoinAlgoType::Undefined ? ToString(op.LinkSettings.JoinAlgo).c_str() : "no") << " cbo algo"; - return RewriteYtMapJoin(equiJoin, labels, false, op, rightLeaf, leftLeaf, ctx, mapSettings, allowShardLeft, state) ? + return RewriteYtMapJoin(equiJoin, labels, false, op, rightLeaf, leftLeaf, ctx, mapSettings, allowShardLeft, mapJoinUseBlocks, state) ? TStatus::Ok : TStatus::Error; } else { YQL_CLOG(INFO, ProviderYt) << "Selected MapJoin: map over the left table, use content of the right one, " << (op.LinkSettings.JoinAlgo != EJoinAlgoType::Undefined ? ToString(op.LinkSettings.JoinAlgo).c_str() : "no") << " cbo algo"; - return RewriteYtMapJoin(equiJoin, labels, false, op, leftLeaf, rightLeaf, ctx, mapSettings, allowShardRight, state) ? + return RewriteYtMapJoin(equiJoin, labels, false, op, leftLeaf, rightLeaf, ctx, mapSettings, allowShardRight, mapJoinUseBlocks, state) ? TStatus::Ok : TStatus::Error; } } |
