summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yql/essentials/core/expr_nodes/yql_expr_nodes.json15
-rw-r--r--yql/essentials/core/type_ann/type_ann_core.cpp2
-rw-r--r--yql/essentials/core/type_ann/type_ann_impl.h1
-rw-r--r--yql/essentials/core/type_ann/type_ann_join.cpp133
-rw-r--r--yql/essentials/core/yql_expr_type_annotation.cpp7
-rw-r--r--yql/essentials/core/yql_expr_type_annotation.h1
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp6
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_rh_hash.h20
-rw-r--r--yql/essentials/minikql/mkql_block_map_join_utils.cpp12
-rw-r--r--yql/essentials/minikql/mkql_block_map_join_utils.h13
-rw-r--r--yql/essentials/minikql/mkql_rh_hash_utils.cpp33
-rw-r--r--yql/essentials/minikql/mkql_rh_hash_utils.h13
-rw-r--r--yql/essentials/minikql/ya.make4
-rw-r--r--yql/essentials/providers/common/mkql/yql_provider_mkql.cpp20
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.cpp1
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.h1
-rw-r--r--yt/yql/providers/yt/opt/yql_yt_join.h2
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.cpp436
18 files changed, 647 insertions, 73 deletions
diff --git a/yql/essentials/core/expr_nodes/yql_expr_nodes.json b/yql/essentials/core/expr_nodes/yql_expr_nodes.json
index cc4becb1e1a..76e556533f7 100644
--- a/yql/essentials/core/expr_nodes/yql_expr_nodes.json
+++ b/yql/essentials/core/expr_nodes/yql_expr_nodes.json
@@ -1613,6 +1613,21 @@
]
},
{
+ "Name": "TCoBlockMapJoinCore",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "BlockMapJoinCore"},
+ "Children": [
+ {"Index": 0, "Name": "LeftInput", "Type": "TExprBase"},
+ {"Index": 1, "Name": "RightInput", "Type": "TExprBase"},
+ {"Index": 2, "Name": "JoinKind", "Type": "TCoAtom"},
+ {"Index": 3, "Name": "LeftKeyColumns", "Type": "TCoAtomList"},
+ {"Index": 4, "Name": "LeftKeyDrops", "Type": "TCoAtomList"},
+ {"Index": 5, "Name": "RightKeyColumns", "Type": "TCoAtomList"},
+ {"Index": 6, "Name": "RightKeyDrops", "Type": "TCoAtomList"},
+ {"Index": 7, "Name": "Options", "Type": "TExprList"}
+ ]
+ },
+ {
"Name": "TCoGraceJoinCore",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "GraceJoinCore"},
diff --git a/yql/essentials/core/type_ann/type_ann_core.cpp b/yql/essentials/core/type_ann/type_ann_core.cpp
index d448c7ed545..882adedcc56 100644
--- a/yql/essentials/core/type_ann/type_ann_core.cpp
+++ b/yql/essentials/core/type_ann/type_ann_core.cpp
@@ -12850,6 +12850,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
ExtFunctions["BlockFunc"] = &BlockFuncWrapper;
ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper;
+ Functions["BlockMapJoinCore"] = &BlockMapJoinCoreWrapper;
+
ExtFunctions["AsScalar"] = &AsScalarWrapper;
ExtFunctions["WideToBlocks"] = &WideToBlocksWrapper;
ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper;
diff --git a/yql/essentials/core/type_ann/type_ann_impl.h b/yql/essentials/core/type_ann/type_ann_impl.h
index 46d929b2350..717cb7b3ac8 100644
--- a/yql/essentials/core/type_ann/type_ann_impl.h
+++ b/yql/essentials/core/type_ann/type_ann_impl.h
@@ -35,6 +35,7 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus CombineCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus GroupingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus DecimalBinaryWrapperBase(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx, bool blocks);
+ IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TExprContext& ctx);
diff --git a/yql/essentials/core/type_ann/type_ann_join.cpp b/yql/essentials/core/type_ann/type_ann_join.cpp
index 140b15eca29..498237b3561 100644
--- a/yql/essentials/core/type_ann/type_ann_join.cpp
+++ b/yql/essentials/core/type_ann/type_ann_join.cpp
@@ -980,5 +980,138 @@ namespace NTypeAnnImpl {
}
}
+ IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+
+ if (!EnsureArgsCount(*input, 8, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TTypeAnnotationNode::TListType leftItemTypes;
+ if (!EnsureWideStreamBlockType(input->Head(), leftItemTypes, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ leftItemTypes.pop_back();
+ auto leftItemType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
+
+ TTypeAnnotationNode::TListType rightItemTypes;
+ if (!EnsureWideStreamBlockType(*input->Child(1), rightItemTypes, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ rightItemTypes.pop_back();
+ auto rightItemType = input->Child(1)->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
+
+ if (!EnsureAtom(*input->Child(2), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto joinKind = input->Child(2)->Content();
+ if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly") {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()), TStringBuilder() << "Unknown join kind: " << joinKind
+ << ", supported: Inner, Left, LeftSemi, LeftOnly"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (input->Child(3)->ChildrenSize() != input->Child(5)->ChildrenSize()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(5)->Pos()), TStringBuilder() << "Mismatch of key column count"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto checkKeyColumns = [&](std::unordered_set<ui32>& keyColumns, bool isLeft, const TExprNode& keyColumnsNode, const TMultiExprType* itemType) {
+ for (const auto& keyColumnNode : keyColumnsNode.Children()) {
+ auto position = GetWideBlockFieldPosition(*itemType, keyColumnNode->Content());
+ if (!position) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyColumnNode->Pos()), TStringBuilder() << "Unknown " << (isLeft ? "left" : "right") << " key column: " << keyColumnNode->Content()));
+ return false;
+ }
+ keyColumns.insert(*position);
+ }
+ return true;
+ };
+
+ auto checkKeyDrops = [&](std::unordered_set<ui32>& keyDrops, bool isLeft, const std::unordered_set<ui32>& keyColumns, const TExprNode& keyDropsNode, const TMultiExprType* itemType) {
+ for (const auto& keyDropNode : keyDropsNode.Children()) {
+ auto position = GetWideBlockFieldPosition(*itemType, keyDropNode->Content());
+ if (!position) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyDropNode->Pos()), TStringBuilder() << "Unknown " << (isLeft ? "left" : "right") << " key column: " << keyDropNode->Content()));
+ return false;
+ }
+ if (!keyColumns.contains(*position)) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyDropNode->Pos()), TStringBuilder() << "Attempted to drop " << (isLeft ? "left" : "right") << " non-key column: " << keyDropNode->Content()));
+ return false;
+ }
+ if (!keyDrops.insert(*position).second) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyDropNode->Pos()), TStringBuilder() << "Duplicated " << (isLeft ? "left" : "right") << " key drop: " << keyDropNode->Content()));
+ return false;
+ }
+ }
+ return true;
+ };
+
+ for (size_t childIdx = 3; childIdx <= 6; childIdx++) {
+ if (!EnsureTupleOfAtoms(*input->Child(childIdx), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ std::unordered_set<ui32> leftKeyColumns;
+ if (!checkKeyColumns(leftKeyColumns, true, *input->Child(3), leftItemType)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ std::unordered_set<ui32> leftKeyDrops;
+ if (!checkKeyDrops(leftKeyDrops, true, leftKeyColumns, *input->Child(4), leftItemType)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ std::unordered_set<ui32> rightKeyColumns;
+ if (!checkKeyColumns(rightKeyColumns, false, *input->Child(5), rightItemType)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ std::unordered_set<ui32> rightKeyDrops;
+ if (!checkKeyDrops(rightKeyDrops, false, rightKeyColumns, *input->Child(6), rightItemType)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto settingsValidator = [&](TStringBuf, TExprNode& node, TExprContext&) { return node.ChildrenSize() == 1; };
+ if (!EnsureValidSettings(input->Tail(), {"rightAny"}, settingsValidator, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ std::vector<const TTypeAnnotationNode*> resultItems;
+ for (ui32 pos = 0; pos < leftItemTypes.size(); pos++) {
+ if (leftKeyDrops.contains(pos)) {
+ continue;
+ }
+
+ resultItems.push_back(ctx.Expr.MakeType<TBlockExprType>(leftItemTypes[pos]));
+ }
+
+ if (joinKind != "LeftSemi" && joinKind != "LeftOnly") {
+ for (ui32 pos = 0; pos < rightItemTypes.size(); pos++) {
+ if (rightKeyDrops.contains(pos)) {
+ continue;
+ }
+
+ auto columnType = rightItemTypes[pos];
+ if (joinKind == "Left" && !rightItemTypes[pos]->IsOptionalOrNull()) {
+ columnType = ctx.Expr.MakeType<TOptionalExprType>(columnType);
+ }
+
+ resultItems.push_back(ctx.Expr.MakeType<TBlockExprType>(columnType));
+ }
+ } else {
+ if (!rightKeyDrops.empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(6)->Pos()), TStringBuilder() << "Right key drops are not allowed for semi/only join"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
+ resultItems.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
+ input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(ctx.Expr.MakeType<TMultiExprType>(resultItems)));
+ return IGraphTransformer::TStatus::Ok;
+ }
+
} // namespace NTypeAnnImpl
} // namespace NYql
diff --git a/yql/essentials/core/yql_expr_type_annotation.cpp b/yql/essentials/core/yql_expr_type_annotation.cpp
index 56626afe87e..e8372de39f2 100644
--- a/yql/essentials/core/yql_expr_type_annotation.cpp
+++ b/yql/essentials/core/yql_expr_type_annotation.cpp
@@ -6035,6 +6035,13 @@ std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TS
return std::nullopt;
}
+std::optional<ui32> GetWideBlockFieldPosition(const TMultiExprType& multiType, const TStringBuf& field) {
+ YQL_ENSURE(multiType.GetSize() >= 1);
+ if (ui32 pos; TryFromString(field, pos) && pos < multiType.GetSize() - 1)
+ return {pos};
+ return std::nullopt;
+}
+
bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx) {
pgType = 0;
convertToPg = false;
diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h
index a8def26cee8..508c9e42581 100644
--- a/yql/essentials/core/yql_expr_type_annotation.h
+++ b/yql/essentials/core/yql_expr_type_annotation.h
@@ -319,6 +319,7 @@ IGraphTransformer::TStatus NormalizeKeyValueTuples(const TExprNode::TPtr& input,
std::optional<ui32> GetFieldPosition(const TMultiExprType& tupleType, const TStringBuf& field);
std::optional<ui32> GetFieldPosition(const TTupleExprType& tupleType, const TStringBuf& field);
std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TStringBuf& field);
+std::optional<ui32> GetWideBlockFieldPosition(const TMultiExprType& tupleType, const TStringBuf& field);
bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx);
bool HasContextFuncs(const TExprNode& input);
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
index 847e4409b6c..189f8a0d2df 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -7,6 +7,7 @@
#include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
+#include <yql/essentials/minikql/mkql_block_map_join_utils.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
#include <yql/essentials/minikql/mkql_program_builder.h>
@@ -308,8 +309,6 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
};
};
- static_assert(sizeof(TIndexMapValue) == 8);
-
using TBase = TComputationValue<TBlockIndex>;
using TIndexMap = TRobinHoodHashFixedMap<
ui64,
@@ -319,6 +318,9 @@ class TBlockIndex : public TComputationValue<TBlockIndex> {
TMKQLAllocator<char>
>;
+ static_assert(sizeof(TIndexMapValue) == 8);
+ static_assert(std::max(TIndexMap::GetCellSize(), static_cast<ui32>(sizeof(TIndexNode))) == BlockMapJoinIndexEntrySize);
+
public:
class TIterator {
enum class EIteratorType {
diff --git a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h
index f5c40a69448..3387039477e 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h
@@ -6,6 +6,7 @@
#include <vector>
#include <span>
+#include <yql/essentials/minikql/mkql_rh_hash_utils.h>
#include <yql/essentials/utils/prefetch.h>
#include <util/digest/city.h>
@@ -109,7 +110,7 @@ public:
// should be called after Insert if isNew is true
Y_FORCE_INLINE void CheckGrow() {
- if (Size * 2 >= Capacity) {
+ if (RHHashTableNeedsGrow(Size, Capacity)) {
Grow();
}
}
@@ -124,7 +125,7 @@ public:
template <typename TSink>
Y_NO_INLINE void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) {
- while (2 * (Size + batchRequest.size()) >= Capacity) {
+ while (RHHashTableNeedsGrow(Size + batchRequest.size(), Capacity)) {
Grow();
}
@@ -331,15 +332,7 @@ private:
}
Y_NO_INLINE void Grow() {
- ui64 growFactor;
- if (Capacity < 100'000) {
- growFactor = 8;
- } else if (Capacity < 1'000'000) {
- growFactor = 4;
- } else {
- growFactor = 2;
- }
- auto newCapacity = Capacity * growFactor;
+ auto newCapacity = Capacity * CalculateRHHashTableGrowFactor(Capacity);
auto newCapacityShift = 64 - MostSignificantBit(newCapacity);
char *newData, *newDataEnd;
Allocate(newCapacity, newData, newDataEnd);
@@ -522,8 +515,7 @@ public:
TBase::Init();
}
-
- ui32 GetCellSize() const {
+ static constexpr ui32 GetCellSize() {
return sizeof(typename TBase::TPSLStorage) + sizeof(TKey) + sizeof(TPayload);
}
@@ -569,7 +561,7 @@ public:
TBase::Init();
}
- ui32 GetCellSize() const {
+ static constexpr ui32 GetCellSize() {
return sizeof(typename TBase::TPSLStorage) + sizeof(TKey);
}
diff --git a/yql/essentials/minikql/mkql_block_map_join_utils.cpp b/yql/essentials/minikql/mkql_block_map_join_utils.cpp
new file mode 100644
index 00000000000..e85390c08a0
--- /dev/null
+++ b/yql/essentials/minikql/mkql_block_map_join_utils.cpp
@@ -0,0 +1,12 @@
+#include "mkql_block_map_join_utils.h"
+#include "mkql_rh_hash_utils.h"
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+ui64 EstimateBlockMapJoinIndexSize(ui64 rowsCount) {
+ return CalculateRHHashTableCapacity(rowsCount) * BlockMapJoinIndexEntrySize;
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/yql/essentials/minikql/mkql_block_map_join_utils.h b/yql/essentials/minikql/mkql_block_map_join_utils.h
new file mode 100644
index 00000000000..8f3468a68ed
--- /dev/null
+++ b/yql/essentials/minikql/mkql_block_map_join_utils.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <util/system/types.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+constexpr ui64 BlockMapJoinIndexEntrySize = 20;
+
+ui64 EstimateBlockMapJoinIndexSize(ui64 rowsCount);
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/yql/essentials/minikql/mkql_rh_hash_utils.cpp b/yql/essentials/minikql/mkql_rh_hash_utils.cpp
new file mode 100644
index 00000000000..c620e7af436
--- /dev/null
+++ b/yql/essentials/minikql/mkql_rh_hash_utils.cpp
@@ -0,0 +1,33 @@
+#include "mkql_rh_hash_utils.h"
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+ui64 RHHashTableNeedsGrow(ui64 size, ui64 capacity) {
+ return size * 2 >= capacity;
+}
+
+ui64 CalculateRHHashTableGrowFactor(ui64 currentCapacity) {
+ ui64 growFactor;
+ if (currentCapacity < 100'000) {
+ growFactor = 8;
+ } else if (currentCapacity < 1'000'000) {
+ growFactor = 4;
+ } else {
+ growFactor = 2;
+ }
+
+ return growFactor;
+}
+
+ui64 CalculateRHHashTableCapacity(ui64 targetSize) {
+ ui64 capacity = 256;
+ while (RHHashTableNeedsGrow(targetSize, capacity)) {
+ capacity *= CalculateRHHashTableGrowFactor(capacity);
+ }
+
+ return capacity;
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/yql/essentials/minikql/mkql_rh_hash_utils.h b/yql/essentials/minikql/mkql_rh_hash_utils.h
new file mode 100644
index 00000000000..fb981f6979c
--- /dev/null
+++ b/yql/essentials/minikql/mkql_rh_hash_utils.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <util/system/types.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+ui64 RHHashTableNeedsGrow(ui64 size, ui64 capacity);
+ui64 CalculateRHHashTableGrowFactor(ui64 currentCapacity);
+ui64 CalculateRHHashTableCapacity(ui64 targetSize);
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/yql/essentials/minikql/ya.make b/yql/essentials/minikql/ya.make
index b308132cb99..c1d03fb5875 100644
--- a/yql/essentials/minikql/ya.make
+++ b/yql/essentials/minikql/ya.make
@@ -7,6 +7,8 @@ SRCS(
compact_hash.h
defs.h
mkql_alloc.cpp
+ mkql_block_map_join_utils.cpp
+ mkql_block_map_join_utils.h
mkql_buffer.cpp
mkql_buffer.h
mkql_function_metadata.cpp
@@ -30,6 +32,8 @@ SRCS(
mkql_opt_literal.h
mkql_program_builder.cpp
mkql_program_builder.h
+ mkql_rh_hash_utils.cpp
+ mkql_rh_hash_utils.h
mkql_runtime_version.cpp
mkql_runtime_version.h
mkql_stats_registry.cpp
diff --git a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp
index 211fc6c84c7..92ab62adb1b 100644
--- a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp
+++ b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp
@@ -1672,6 +1672,26 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType);
});
+ AddCallable("BlockMapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ const auto leftStream = MkqlBuildExpr(node.Head(), ctx);
+ const auto rightStream = MkqlBuildExpr(*node.Child(1), ctx);
+ const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
+
+ const auto leftItemType = node.Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
+ const auto rightItemType = node.Child(1U)->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
+
+ std::vector<ui32> leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops;
+ node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); });
+ node.Child(4)->ForEachChild([&](const TExprNode& child){ leftKeyDrops.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); });
+ node.Child(5)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); });
+ node.Child(6)->ForEachChild([&](const TExprNode& child){ rightKeyDrops.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); });
+
+ bool rightAny = HasSetting(node.Tail(), "rightAny");
+
+ const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
+ return ctx.ProgramBuilder.BlockMapJoinCore(leftStream, rightStream, joinKind, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, rightAny, returnType);
+ });
+
AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
bool selfJoin = node.Content() == "GraceSelfJoinCore";
int shift = selfJoin ? 0 : 1;
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp
index ffd5e8f0227..edf95828850 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.cpp
+++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp
@@ -308,6 +308,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
REGISTER_SETTING(*this, MapJoinShardMinRows);
REGISTER_SETTING(*this, MapJoinShardCount).Lower(1).Upper(10);
REGISTER_SETTING(*this, MapJoinUseFlow);
+ REGISTER_SETTING(*this, BlockMapJoin);
REGISTER_SETTING(*this, EvaluationTableSizeLimit).Upper(10_MB); // Max 10Mb
REGISTER_SETTING(*this, LookupJoinLimit).Upper(10_MB); // Same as EvaluationTableSizeLimit
REGISTER_SETTING(*this, LookupJoinMaxRows).Upper(10000);
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h
index 700f36aae5e..1163f9ccbb7 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.h
+++ b/yt/yql/providers/yt/common/yql_yt_settings.h
@@ -227,6 +227,7 @@ struct TYtSettings {
NCommon::TConfSetting<ui64, false> MapJoinShardMinRows;
NCommon::TConfSetting<ui64, false> MapJoinShardCount; // [1-10]
NCommon::TConfSetting<bool, false> MapJoinUseFlow;
+ NCommon::TConfSetting<bool, false> BlockMapJoin;
NCommon::TConfSetting<NSize::TSize, false> LookupJoinLimit;
NCommon::TConfSetting<ui64, false> LookupJoinMaxRows;
NCommon::TConfSetting<NSize::TSize, false> EvaluationTableSizeLimit;
diff --git a/yt/yql/providers/yt/opt/yql_yt_join.h b/yt/yql/providers/yt/opt/yql_yt_join.h
index 3ccc860f7fe..fa5b057afbe 100644
--- a/yt/yql/providers/yt/opt/yql_yt_join.h
+++ b/yt/yql/providers/yt/opt/yql_yt_join.h
@@ -35,7 +35,9 @@ struct TMapJoinSettings {
ui64 LeftSize = 0;
ui64 RightSize = 0;
ui64 LeftMemSize = 0;
+ ui64 LeftMemSizeUsingBlocks = 0;
ui64 RightMemSize = 0;
+ ui64 RightMemSizeUsingBlocks = 0;
bool LeftUnique = false;
bool RightUnique = false;
ui64 LeftCount = 0;
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
index b40c14e5a32..9f4c2aa4a1b 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
@@ -12,6 +12,7 @@
#include <yql/essentials/core/yql_expr_optimize.h>
#include <yql/essentials/core/yql_opt_utils.h>
#include <yql/essentials/core/yql_type_helpers.h>
+#include <yql/essentials/minikql/mkql_block_map_join_utils.h>
#include <yql/essentials/utils/log/log.h>
#include <util/string/join.h>
@@ -268,6 +269,28 @@ TStatus UpdateInMemorySizeSetting(TMapJoinSettings& settings, TYtSection& inputS
return TStatus::Ok;
}
+TStatus UpdateInMemorySizeUsingBlocksSetting(TMapJoinSettings& settings, TYtSection& inputSection,
+ const TYtJoinNodeOp& op, TExprContext& ctx, bool isLeft,
+ const TStructExprType* itemType, const TVector<TString>& joinKeyList, const TYtState::TPtr& state, const TString& cluster,
+ const TVector<TYtPathInfo::TPtr>& tables)
+{
+ Y_ENSURE(!op.JoinKind->IsAtom("Cross"));
+
+ ui64 dataSize = 0;
+ auto status = CalculateJoinLeafSize(dataSize, settings, inputSection, op, ctx, isLeft, itemType, joinKeyList, state, cluster, tables);
+ if (status != TStatus::Ok) {
+ return status;
+ }
+
+ const ui64 rows = isLeft ? settings.LeftRows : settings.RightRows;
+ const ui64 indexSize = NKikimr::NMiniKQL::EstimateBlockMapJoinIndexSize(rows);
+ YQL_CLOG(INFO, ProviderYt) << "Estimated block map join index size for " << (isLeft ? "left" : "right") << " table: " << indexSize << " bytes";
+
+ const ui64 result = dataSize + indexSize;
+ (isLeft ? settings.LeftMemSizeUsingBlocks : settings.RightMemSizeUsingBlocks) = result;
+ return TStatus::Ok;
+}
+
TYtJoinNodeLeaf::TPtr ConvertYtEquiJoinToLeaf(const TYtJoinNodeOp& op, TPositionHandle pos, TExprContext& ctx) {
auto joinLabelBuilder = Build<TCoAtomList>(ctx, pos);
for (auto& x : op.Scope) {
@@ -1432,9 +1455,223 @@ bool RewriteYtMergeJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, TYtJoin
return true;
}
+TExprNode::TPtr BuildBlockMapJoin(TExprNode::TPtr leftFlow, TExprNode::TPtr rightFlow,
+ const TExprNode::TListType& leftKeyColumnNodes, const std::vector<TStringBuf>& leftOutputColumns,
+ const THashMap<TStringBuf, TString>& leftOutputColumnSources, const THashSet<TString>& leftUsedSourceColumns,
+ const TExprNode::TListType& rightKeyColumnNodes, const std::vector<TStringBuf>& rightOutputColumns,
+ const THashMap<TStringBuf, TString>& rightOutputColumnSources, const THashSet<TString>& rightUsedSourceColumns,
+ const TStructExprType* outItemType, TExprNode::TPtr joinType, TPositionHandle pos, bool needPayload, bool isUniqueKey,
+ TExprContext& ctx
+) {
+ THashSet<TStringBuf> leftSourceKeyDrops;
+ for (auto& keyColumnNode : leftKeyColumnNodes) {
+ auto memberName = keyColumnNode->Content();
+ if (!leftUsedSourceColumns.contains(memberName)) {
+ leftSourceKeyDrops.insert(memberName);
+ }
+ }
+
+ THashSet<TStringBuf> rightSourceKeyDrops;
+ for (auto& keyColumnNode : rightKeyColumnNodes) {
+ auto memberName = keyColumnNode->Content();
+ if (!rightUsedSourceColumns.contains(memberName)) {
+ rightSourceKeyDrops.insert(memberName);
+ }
+ }
+
+ auto expandLambdaBuilder = [&](
+ TExprNodeBuilder& builder,
+ THashMap<TStringBuf, ui32>& columnPositions,
+ THashMap<TString, ui32>& sourceKeyColumnPositions,
+ const std::vector<TStringBuf>& outputColumns,
+ const THashMap<TStringBuf, TString>& outputColumnSources,
+ const TExprNode::TListType& keyColumnNodes,
+ const THashSet<TStringBuf>& sourceKeyDrops
+ ) {
+ ui32 pos = 0;
+ size_t dropCount = 0;
+
+ THashMap<TStringBuf, ui32> sourceColumnPositions;
+ for (auto& keyColumnNode : keyColumnNodes) {
+ auto memberName = keyColumnNode->Content();
+ if (!sourceKeyColumnPositions.contains(memberName)) {
+ builder.Callable(pos, "Member")
+ .Arg(0, "item")
+ .Atom(1, memberName)
+ .Seal();
+
+ if (!sourceKeyDrops.contains(memberName)) {
+ sourceColumnPositions.emplace(memberName, pos - dropCount);
+ } else {
+ dropCount++;
+ }
+
+ sourceKeyColumnPositions.emplace(memberName, pos);
+ pos++;
+ }
+ }
+
+ for (auto& newName : outputColumns) {
+ auto& memberName = outputColumnSources.at(newName);
+ if (!sourceColumnPositions.contains(memberName)) {
+ builder.Callable(pos, "Member")
+ .Arg(0, "item")
+ .Atom(1, memberName)
+ .Seal();
+ sourceColumnPositions.emplace(memberName, pos - dropCount);
+ pos++;
+ }
+
+ columnPositions.emplace(newName, sourceColumnPositions[memberName]);
+ }
+
+ return sourceColumnPositions.size();
+ };
+
+ THashMap<TStringBuf, ui32> leftColumnPositions;
+ THashMap<TString, ui32> leftSourceKeyColumnPositions; // before drop
+ size_t leftInputSizeAfterDrop = 0;
+ auto leftExpandLambda = ctx.Builder(pos)
+ .Lambda()
+ .Param("item")
+ .Do([&](TExprNodeBuilder& builder) -> TExprNodeBuilder& {
+ leftInputSizeAfterDrop = expandLambdaBuilder(
+ builder,
+ leftColumnPositions,
+ leftSourceKeyColumnPositions,
+ leftOutputColumns,
+ leftOutputColumnSources,
+ leftKeyColumnNodes,
+ leftSourceKeyDrops
+ );
+ return builder;
+ })
+ .Seal()
+ .Build();
+
+ THashMap<TStringBuf, ui32> rightColumnPositions;
+ THashMap<TString, ui32> rightSourceKeyColumnPositions; // before drop
+ size_t rightInputSizeAfterDrop = 0;
+ auto rightExpandLambda = ctx.Builder(pos)
+ .Lambda()
+ .Param("item")
+ .Do([&](TExprNodeBuilder& builder) -> TExprNodeBuilder& {
+ rightInputSizeAfterDrop = expandLambdaBuilder(
+ builder,
+ rightColumnPositions,
+ rightSourceKeyColumnPositions,
+ rightOutputColumns,
+ rightOutputColumnSources,
+ rightKeyColumnNodes,
+ rightSourceKeyDrops
+ );
+ return builder;
+ })
+ .Seal()
+ .Build();
+
+ auto narrowLambda = ctx.Builder(pos)
+ .Lambda()
+ .Params("items", leftInputSizeAfterDrop + rightInputSizeAfterDrop)
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& builder) -> TExprNodeBuilder& {
+ size_t i = 0;
+
+ for (auto& newName : leftOutputColumns) {
+ builder.List(i)
+ .Atom(0, newName)
+ .Arg(1, "items", leftColumnPositions.at(newName))
+ .Seal();
+ i++;
+ }
+
+ for (auto& newName : rightOutputColumns) {
+ builder.List(i)
+ .Atom(0, newName)
+ .Arg(1, "items", leftInputSizeAfterDrop + rightColumnPositions.at(newName))
+ .Seal();
+ i++;
+ }
+
+ YQL_ENSURE(i == outItemType->GetSize());
+ return builder;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+
+ TExprNode::TListType leftKeyColumnPositionNodes;
+ for (auto& keyColumnNode : leftKeyColumnNodes) {
+ auto memberName = keyColumnNode->Content();
+ leftKeyColumnPositionNodes.push_back(ctx.NewAtom(pos, leftSourceKeyColumnPositions.at(memberName)));
+ }
+
+ TExprNode::TListType leftKeyDropPositionNodes;
+ for (auto& memberName : leftSourceKeyDrops) {
+ leftKeyDropPositionNodes.push_back(ctx.NewAtom(pos, leftSourceKeyColumnPositions.at(memberName)));
+ }
+
+ TExprNode::TListType rightKeyColumnPositionNodes;
+ for (auto& keyColumnNode : rightKeyColumnNodes) {
+ auto memberName = keyColumnNode->Content();
+ rightKeyColumnPositionNodes.push_back(ctx.NewAtom(pos, rightSourceKeyColumnPositions.at(memberName)));
+ }
+
+ TExprNode::TListType rightKeyDropPositionNodes;
+ if (needPayload) {
+ for (auto& memberName : rightSourceKeyDrops) {
+ rightKeyDropPositionNodes.push_back(ctx.NewAtom(pos, rightSourceKeyColumnPositions.at(memberName)));
+ }
+ }
+
+ auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, pos);
+ if (isUniqueKey) {
+ settingsBuilder
+ .Add()
+ .Name()
+ .Value("rightAny")
+ .Build()
+ .Build();
+ }
+
+ return ctx.Builder(pos)
+ .Callable("NarrowMap")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "ToFlow")
+ .Callable(0, "BlockMapJoinCore")
+ .Callable(0, "FromFlow")
+ .Callable(0, "WideToBlocks")
+ .Callable(0, "ExpandMap")
+ .Add(0, std::move(leftFlow))
+ .Add(1, std::move(leftExpandLambda))
+ .Seal()
+ .Seal()
+ .Seal()
+ .Callable(1, "FromFlow")
+ .Callable(0, "WideToBlocks")
+ .Callable(0, "ExpandMap")
+ .Add(0, std::move(rightFlow))
+ .Add(1, std::move(rightExpandLambda))
+ .Seal()
+ .Seal()
+ .Seal()
+ .Add(2, std::move(joinType))
+ .Add(3, ctx.NewList(pos, std::move(leftKeyColumnPositionNodes)))
+ .Add(4, ctx.NewList(pos, std::move(leftKeyDropPositionNodes)))
+ .Add(5, ctx.NewList(pos, std::move(rightKeyColumnPositionNodes)))
+ .Add(6, ctx.NewList(pos, std::move(rightKeyDropPositionNodes)))
+ .Add(7, settingsBuilder.Done().Ptr())
+ .Seal()
+ .Seal()
+ .Seal()
+ .Add(1, std::move(narrowLambda))
+ .Seal()
+ .Build();
+}
+
bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLookupJoin,
TYtJoinNodeOp& op, const TYtJoinNodeLeaf& leftLeaf, const TYtJoinNodeLeaf& rightLeaf,
- TExprContext& ctx, const TMapJoinSettings& settings, bool useShards, const TYtState::TPtr& state)
+ TExprContext& ctx, const TMapJoinSettings& settings, bool useShards, bool useBlocks, const TYtState::TPtr& state)
{
auto pos = equiJoin.Pos();
auto joinType = op.JoinKind;
@@ -1787,6 +2024,15 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo
TExprNode::TListType joinedOutNodes;
TExprNode::TListType rightRenameNodes;
TExprNode::TListType leftRenameNodes;
+
+ std::vector<TStringBuf> leftOutputColumns;
+ THashMap<TStringBuf, TString> leftOutputColumnSources;
+ THashSet<TString> leftUsedSourceColumns;
+
+ std::vector<TStringBuf> rightOutputColumns;
+ THashMap<TStringBuf, TString> rightOutputColumnSources;
+ THashSet<TString> rightUsedSourceColumns;
+
for (ui32 index = 0; index < outputLeftSchemeType->GetSize(); ++index) {
auto item = outputLeftSchemeType->GetItems()[index];
TVector<TStringBuf> newNames;
@@ -1806,6 +2052,9 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo
for (auto newName : newNames) {
leftRenameNodes.push_back(ctx.NewAtom(pos, memberName));
leftRenameNodes.push_back(ctx.NewAtom(pos, newName));
+ leftUsedSourceColumns.insert(memberName);
+ leftOutputColumns.push_back(newName);
+ leftOutputColumnSources.emplace(newName, memberName);
AddJoinRemappedColumn(pos, mainArg, joinedOutNodes, memberName, newName, ctx);
}
}
@@ -1830,6 +2079,9 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo
for (auto newName : newNames) {
rightRenameNodes.push_back(ctx.NewAtom(pos, memberName));
rightRenameNodes.push_back(ctx.NewAtom(pos, newName));
+ rightUsedSourceColumns.insert(memberName);
+ rightOutputColumns.push_back(newName);
+ rightOutputColumnSources.emplace(newName, memberName);
AddJoinRemappedColumn(pos, lookupArg, joinedOutNodes, memberName, newName, ctx);
}
}
@@ -1840,61 +2092,96 @@ bool RewriteYtMapJoin(TYtEquiJoin equiJoin, const TJoinLabels& labels, bool isLo
TExprNode::TListType leftKeyColumnNodes;
TExprNode::TListType leftKeyColumnNodesNullable;
auto mapInput = RemapNonConvertibleItems(listArg, mainLabel, *leftKeyColumns, outputKeyType, leftKeyColumnNodes, leftKeyColumnNodesNullable, ctx);
- if (mapJoinUseFlow) {
- joined = ctx.Builder(pos)
- .Callable("FlatMap")
- .Callable(0, "SqueezeToDict")
- .Callable(0, "ToFlow")
- .Add(0, std::move(tableContent))
- .Callable(1, "DependsOn")
- .Add(0, listArg)
+
+ if (useBlocks) {
+ for (auto& [_, columnType] : columnTypes) {
+ if (!IsSupportedAsBlockType(pos, *columnType, ctx, *state->Types)) {
+ useBlocks = false;
+ YQL_CLOG(INFO, ProviderYt) << "Block mapjoin won't be used because of unsupported type: " << *columnType;
+ break;
+ }
+ }
+ }
+
+ if (useBlocks) {
+ if (!mapJoinUseFlow) {
+ mapInput = ctx.Builder(pos)
+ .Callable("ToFlow")
+ .Add(0, std::move(mapInput))
+ .Seal()
+ .Build();
+ }
+
+ tableContent = ctx.Builder(pos)
+ .Callable("ToFlow")
+ .Add(0, std::move(tableContent))
+ .Callable(1, "DependsOn")
+ .Add(0, listArg)
+ .Seal()
+ .Seal()
+ .Build();
+
+ joined = BuildBlockMapJoin(std::move(mapInput), std::move(tableContent),
+ leftKeyColumnNodes, leftOutputColumns, leftOutputColumnSources, leftUsedSourceColumns,
+ remappedMembers, rightOutputColumns, rightOutputColumnSources, rightUsedSourceColumns,
+ outItemType, joinType, pos, needPayload, isUniqueKey, ctx
+ );
+ } else {
+ if (mapJoinUseFlow) {
+ joined = ctx.Builder(pos)
+ .Callable("FlatMap")
+ .Callable(0, "SqueezeToDict")
+ .Callable(0, "ToFlow")
+ .Add(0, std::move(tableContent))
+ .Callable(1, "DependsOn")
+ .Add(0, listArg)
+ .Seal()
.Seal()
- .Seal()
- .Add(1, std::move(smallKeySelector))
- .Add(2, std::move(smallPayloadSelector))
- .List(3)
- .Atom(0, "Hashed", TNodeFlags::Default)
- .Atom(1, needPayload && !isUniqueKey ? "Many" : "One", TNodeFlags::Default)
- .Atom(2, "Compact", TNodeFlags::Default)
+ .Add(1, std::move(smallKeySelector))
+ .Add(2, std::move(smallPayloadSelector))
.List(3)
- .Atom(0, "ItemsCount", TNodeFlags::Default)
- .Atom(1, ToString(dictItemsCount), TNodeFlags::Default)
+ .Atom(0, "Hashed", TNodeFlags::Default)
+ .Atom(1, needPayload && !isUniqueKey ? "Many" : "One", TNodeFlags::Default)
+ .Atom(2, "Compact", TNodeFlags::Default)
+ .List(3)
+ .Atom(0, "ItemsCount", TNodeFlags::Default)
+ .Atom(1, ToString(dictItemsCount), TNodeFlags::Default)
+ .Seal()
.Seal()
.Seal()
- .Seal()
- .Lambda(1)
- .Param("dict")
- .Callable("MapJoinCore")
- .Add(0, std::move(mapInput))
- .Arg(1, "dict")
- .Add(2, joinType)
- .Add(3, ctx.NewList(pos, std::move(leftKeyColumnNodes)))
- .Add(4, ctx.NewList(pos, std::move(remappedMembers)))
- .Add(5, ctx.NewList(pos, std::move(leftRenameNodes)))
- .Add(6, ctx.NewList(pos, std::move(rightRenameNodes)))
- .Add(7, leftKeyColumns)
- .Add(8, rightKeyColumns)
+ .Lambda(1)
+ .Param("dict")
+ .Callable("MapJoinCore")
+ .Add(0, std::move(mapInput))
+ .Arg(1, "dict")
+ .Add(2, joinType)
+ .Add(3, ctx.NewList(pos, std::move(leftKeyColumnNodes)))
+ .Add(4, ctx.NewList(pos, std::move(remappedMembers)))
+ .Add(5, ctx.NewList(pos, std::move(leftRenameNodes)))
+ .Add(6, ctx.NewList(pos, std::move(rightRenameNodes)))
+ .Add(7, leftKeyColumns)
+ .Add(8, rightKeyColumns)
+ .Seal()
.Seal()
.Seal()
- .Seal()
- .Build();
- } else {
- joined = ctx.Builder(pos)
- .Callable("MapJoinCore")
- .Add(0, mapInput)
- .Add(1, dict)
- .Add(2, joinType)
- .Add(3, ctx.NewList(pos, std::move(leftKeyColumnNodes)))
- .Add(4, ctx.NewList(pos, std::move(remappedMembers)))
- .Add(5, ctx.NewList(pos, std::move(leftRenameNodes)))
- .Add(6, ctx.NewList(pos, std::move(rightRenameNodes)))
- .Add(7, leftKeyColumns)
- .Add(8, rightKeyColumns)
- .Seal()
- .Build();
+ .Build();
+ } else {
+ joined = ctx.Builder(pos)
+ .Callable("MapJoinCore")
+ .Add(0, mapInput)
+ .Add(1, dict)
+ .Add(2, joinType)
+ .Add(3, ctx.NewList(pos, std::move(leftKeyColumnNodes)))
+ .Add(4, ctx.NewList(pos, std::move(remappedMembers)))
+ .Add(5, ctx.NewList(pos, std::move(leftRenameNodes)))
+ .Add(6, ctx.NewList(pos, std::move(rightRenameNodes)))
+ .Add(7, leftKeyColumns)
+ .Add(8, rightKeyColumns)
+ .Seal()
+ .Build();
+ }
}
- }
- else {
+ } else {
auto joinedOut = ctx.NewCallable(pos, "AsStruct", std::move(joinedOutNodes));
auto joinedBody = ctx.Builder(pos)
.Callable("Map")
@@ -3036,12 +3323,12 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
DoSwap(mapSettings.LeftUnique, mapSettings.RightUnique);
YQL_CLOG(INFO, ProviderYt) << "Selected LookupJoin: filter over the right table, use content of the left one, " << (op.LinkSettings.JoinAlgo != EJoinAlgoType::Undefined ? ToString(op.LinkSettings.JoinAlgo).c_str() : "no") << " cbo algo";
- return RewriteYtMapJoin(equiJoin, labels, true, op, rightLeaf, leftLeaf, ctx, mapSettings, false, state) ?
+ return RewriteYtMapJoin(equiJoin, labels, true, op, rightLeaf, leftLeaf, ctx, mapSettings, false, false, state) ?
TStatus::Ok : TStatus::Error;
} else {
YQL_CLOG(INFO, ProviderYt) << "Selected LookupJoin: filter over the left table, use content of the right one, " << (op.LinkSettings.JoinAlgo != EJoinAlgoType::Undefined ? ToString(op.LinkSettings.JoinAlgo).c_str() : "no") << " cbo algo";
- return RewriteYtMapJoin(equiJoin, labels, true, op, leftLeaf, rightLeaf, ctx, mapSettings, false, state) ?
+ return RewriteYtMapJoin(equiJoin, labels, true, op, leftLeaf, rightLeaf, ctx, mapSettings, false, false, state) ?
TStatus::Ok : TStatus::Error;
}
}
@@ -3316,12 +3603,24 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
leftLimit *= *leftPartCount;
}
- auto mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW);
+ bool mapJoinUseFlow = state->Configuration->MapJoinUseFlow.Get().GetOrElse(DEFAULT_MAP_JOIN_USE_FLOW);
+ bool mapJoinUseBlocks = state->Configuration->BlockMapJoin.Get().GetOrElse(state->Types->UseBlocks);
+ if (joinType == "Cross") {
+ mapJoinUseBlocks = false;
+ }
+
if (leftTablesReady) {
auto status = UpdateInMemorySizeSetting(mapSettings, leftLeaf.Section, labels, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables, mapJoinUseFlow);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
+
+ if (mapJoinUseBlocks) {
+ auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, leftLeaf.Section, op, ctx, true, leftItemType, leftJoinKeyList, state, cluster, leftTables);
+ if (status.Level != TStatus::Ok) {
+ return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
+ }
+ }
}
if (rightTablesReady) {
@@ -3329,22 +3628,34 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
+
+ if (mapJoinUseBlocks) {
+ auto status = UpdateInMemorySizeUsingBlocksSetting(mapSettings, rightLeaf.Section, op, ctx, false, rightItemType, rightJoinKeyList, state, cluster, rightTables);
+ if (status.Level != TStatus::Ok) {
+ return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
+ }
+ }
}
YQL_CLOG(INFO, ProviderYt) << "MapJoinShardMinRows: " << mapSettings.MapJoinShardMinRows
<< ", MapJoinShardCount: " << mapSettings.MapJoinShardCount
<< ", left is present: " << leftTablesReady
<< ", left size limit: " << leftLimit << ", left mem size:" << mapSettings.LeftMemSize
+ << ", left mem size using blocks: " << mapSettings.LeftMemSizeUsingBlocks
<< ", right is present: " << rightTablesReady
- << ", right size limit: " << rightLimit << ", right mem size: " << mapSettings.RightMemSize;
+ << ", right size limit: " << rightLimit << ", right mem size: " << mapSettings.RightMemSize
+ << ", right mem size using blocks: " << mapSettings.RightMemSizeUsingBlocks;
bool leftAny = linkSettings.LeftHints.contains("any");
bool rightAny = linkSettings.RightHints.contains("any");
- const bool isLeftAllowMapJoin = !leftAny && rightTablesReady && (mapSettings.RightMemSize <= rightLimit) &&
+ const bool isLeftAllowBlockMapJoin = mapJoinUseBlocks && (mapSettings.RightMemSizeUsingBlocks <= rightLimit);
+ const bool isRightAllowBlockMapJoin = mapJoinUseBlocks && (mapSettings.LeftMemSizeUsingBlocks <= leftLimit);
+
+ const bool isLeftAllowMapJoin = !leftAny && rightTablesReady && (mapSettings.RightMemSize <= rightLimit || isLeftAllowBlockMapJoin) &&
(joinType == "Inner" || joinType == "Left" || joinType == "LeftOnly" || joinType == "LeftSemi" || joinType == "Cross")
&& !rightStats.IsDynamic;
- const bool isRightAllowMapJoin = !rightAny && leftTablesReady && (mapSettings.LeftMemSize <= leftLimit) &&
+ const bool isRightAllowMapJoin = !rightAny && leftTablesReady && (mapSettings.LeftMemSize <= leftLimit || isRightAllowBlockMapJoin) &&
(joinType == "Inner" || joinType == "Right" || joinType == "RightOnly" || joinType == "RightSemi" || joinType == "Cross")
&& !leftStats.IsDynamic;
YQL_CLOG(INFO, ProviderYt) << "MapJoin: isLeftAllowMapJoin: " << isLeftAllowMapJoin
@@ -3361,17 +3672,28 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
mapSettings.SwapTables = swapTables;
+ if (mapJoinUseBlocks && !state->Types->UseBlocks && (
+ (mapSettings.RightMemSize < mapSettings.RightMemSizeUsingBlocks) ||
+ (swapTables && (mapSettings.LeftMemSize < mapSettings.LeftMemSizeUsingBlocks))
+ )) {
+ ui64 memSize = swapTables ? mapSettings.LeftMemSize : mapSettings.RightMemSize;
+ ui64 memSizeUsingBlocks = swapTables ? mapSettings.LeftMemSizeUsingBlocks : mapSettings.RightMemSizeUsingBlocks;
+ YQL_CLOG(INFO, ProviderYt) << "Block mapjoin won't be used: memSize=" << memSize << " is less than memSizeUsingBlocks=" << memSizeUsingBlocks;
+ mapJoinUseBlocks = false;
+ }
+
if (swapTables) {
DoSwap(mapSettings.LeftRows, mapSettings.RightRows);
DoSwap(mapSettings.LeftSize, mapSettings.RightSize);
DoSwap(mapSettings.LeftMemSize, mapSettings.RightMemSize);
+ DoSwap(mapSettings.LeftMemSizeUsingBlocks, mapSettings.RightMemSizeUsingBlocks);
DoSwap(mapSettings.LeftUnique, mapSettings.RightUnique);
YQL_CLOG(INFO, ProviderYt) << "Selected MapJoin: map over the right table, use content of the left one, " << (op.LinkSettings.JoinAlgo != EJoinAlgoType::Undefined ? ToString(op.LinkSettings.JoinAlgo).c_str() : "no") << " cbo algo";
- return RewriteYtMapJoin(equiJoin, labels, false, op, rightLeaf, leftLeaf, ctx, mapSettings, allowShardLeft, state) ?
+ return RewriteYtMapJoin(equiJoin, labels, false, op, rightLeaf, leftLeaf, ctx, mapSettings, allowShardLeft, mapJoinUseBlocks, state) ?
TStatus::Ok : TStatus::Error;
} else {
YQL_CLOG(INFO, ProviderYt) << "Selected MapJoin: map over the left table, use content of the right one, " << (op.LinkSettings.JoinAlgo != EJoinAlgoType::Undefined ? ToString(op.LinkSettings.JoinAlgo).c_str() : "no") << " cbo algo";
- return RewriteYtMapJoin(equiJoin, labels, false, op, leftLeaf, rightLeaf, ctx, mapSettings, allowShardRight, state) ?
+ return RewriteYtMapJoin(equiJoin, labels, false, op, leftLeaf, rightLeaf, ctx, mapSettings, allowShardRight, mapJoinUseBlocks, state) ?
TStatus::Ok : TStatus::Error;
}
}