diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-02-17 23:25:34 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-02-17 23:43:14 +0300 |
commit | 8fe93946bc369873a7ffbb3a7403463aa80e3117 (patch) | |
tree | 9881b73381be7912e07909359d277e1016b43d2c /yql | |
parent | 3af8bdc1b9f36eea116453da1a8b456810d3038e (diff) | |
download | ydb-8fe93946bc369873a7ffbb3a7403463aa80e3117.tar.gz |
BlockMapJoinCore refactor
* Split storage and index parts from BlockMapJoinCore computation node into separate BlockStorage and BlockIndex nodes in order to allow multiple join nodes to reuse the same block data and index for the right table where possible
* Corresponding s-expressions changes
commit_hash:40e39fb0b22c2f929c184963b5bd901006122c14
Diffstat (limited to 'yql')
17 files changed, 925 insertions, 282 deletions
diff --git a/yql/essentials/ast/yql_type_string.cpp b/yql/essentials/ast/yql_type_string.cpp index fc360d2f90..6dfbaa5664 100644 --- a/yql/essentials/ast/yql_type_string.cpp +++ b/yql/essentials/ast/yql_type_string.cpp @@ -91,6 +91,7 @@ enum EToken TOKEN_TZDATE32 = -56, TOKEN_TZDATETIME64 = -57, TOKEN_TZTIMESTAMP64 = -58, + TOKEN_MULTI = -59, // identifiers TOKEN_IDENTIFIER = -100, @@ -121,6 +122,7 @@ EToken TokenTypeFromStr(TStringBuf str) { TStringBuf("Dict"), TOKEN_DICT }, { TStringBuf("Tuple"), TOKEN_TUPLE }, { TStringBuf("Struct"), TOKEN_STRUCT }, + { TStringBuf("Multi"), TOKEN_MULTI }, { TStringBuf("Resource"), TOKEN_RESOURCE }, { TStringBuf("Void"), TOKEN_VOID }, { TStringBuf("Callable"), TOKEN_CALLABLE }, @@ -267,6 +269,10 @@ private: type = ParseStructType(); break; + case TOKEN_MULTI: + type = ParseMultiType(); + break; + case TOKEN_RESOURCE: type = ParseResourceType(); break; @@ -752,9 +758,9 @@ private: return MakeDictType(keyType, MakeVoidType()); } - TAstNode* ParseTupleTypeImpl() { + TAstNode* ParseTupleTypeImpl(TAstNode* (TTypeParser::*typeCreator)(TSmallVec<TAstNode*>&)) { TSmallVec<TAstNode*> items; - items.push_back(nullptr); // reserve for TupleType + items.push_back(nullptr); // reserve for type callable if (Token != '>') { for (;;) { @@ -773,13 +779,13 @@ private: } } - return MakeTupleType(items); + return (this->*typeCreator)(items); } TAstNode* ParseTupleType() { GetNextToken(); // eat keyword EXPECT_AND_SKIP_TOKEN('<', nullptr); - TAstNode* tupleType = ParseTupleTypeImpl(); + TAstNode* tupleType = ParseTupleTypeImpl(&TTypeParser::MakeTupleType); if (tupleType) { EXPECT_AND_SKIP_TOKEN('>', nullptr); } @@ -836,6 +842,16 @@ private: return structType; } + TAstNode* ParseMultiType() { + GetNextToken(); // eat keyword + EXPECT_AND_SKIP_TOKEN('<', nullptr); + TAstNode* tupleType = ParseTupleTypeImpl(&TTypeParser::MakeMultiType); + if (tupleType) { + EXPECT_AND_SKIP_TOKEN('>', nullptr); + } + return tupleType; + } + TAstNode* ParseVariantType() { GetNextToken(); // eat keyword EXPECT_AND_SKIP_TOKEN('<', nullptr); @@ -844,7 +860,7 @@ private: if (Token == TOKEN_IDENTIFIER || Token == TOKEN_ESCAPED_IDENTIFIER) { underlyingType = ParseStructTypeImpl(); } else if (IsTypeKeyword(Token) || Token == '(') { - underlyingType = ParseTupleTypeImpl(); + underlyingType = ParseTupleTypeImpl(&TTypeParser::MakeTupleType); } else { return AddError("Expected type"); } @@ -995,6 +1011,11 @@ private: return MakeList(items.data(), items.size()); } + TAstNode* MakeMultiType(TSmallVec<TAstNode*>& items) { + items[0] = MakeLiteralAtom(TStringBuf("MultiType")); + return MakeList(items.data(), items.size()); + } + TAstNode* ParseResourceType() { GetNextToken(); // eat keyword EXPECT_AND_SKIP_TOKEN('<', nullptr); diff --git a/yql/essentials/core/expr_nodes/yql_expr_nodes.json b/yql/essentials/core/expr_nodes/yql_expr_nodes.json index 09de074b5e..cd33706ac7 100644 --- a/yql/essentials/core/expr_nodes/yql_expr_nodes.json +++ b/yql/essentials/core/expr_nodes/yql_expr_nodes.json @@ -1613,18 +1613,37 @@ ] }, { + "Name": "TCoBlockStorage", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "BlockStorage"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"} + ] + }, + { + "Name": "TCoBlockMapJoinIndex", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "BlockMapJoinIndex"}, + "Children": [ + {"Index": 0, "Name": "BlockStorage", "Type": "TExprBase"}, + {"Index": 1, "Name": "InputType", "Type": "TExprBase"}, + {"Index": 2, "Name": "KeyColumns", "Type": "TCoAtomList"}, + {"Index": 3, "Name": "Options", "Type": "TExprList"} + ] + }, + { "Name": "TCoBlockMapJoinCore", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "BlockMapJoinCore"}, "Children": [ {"Index": 0, "Name": "LeftInput", "Type": "TExprBase"}, {"Index": 1, "Name": "RightInput", "Type": "TExprBase"}, - {"Index": 2, "Name": "JoinKind", "Type": "TCoAtom"}, - {"Index": 3, "Name": "LeftKeyColumns", "Type": "TCoAtomList"}, - {"Index": 4, "Name": "LeftKeyDrops", "Type": "TCoAtomList"}, - {"Index": 5, "Name": "RightKeyColumns", "Type": "TCoAtomList"}, - {"Index": 6, "Name": "RightKeyDrops", "Type": "TCoAtomList"}, - {"Index": 7, "Name": "Options", "Type": "TExprList"} + {"Index": 2, "Name": "RightInputType", "Type": "TExprBase"}, + {"Index": 3, "Name": "JoinKind", "Type": "TCoAtom"}, + {"Index": 4, "Name": "LeftKeyColumns", "Type": "TCoAtomList"}, + {"Index": 5, "Name": "LeftKeyDrops", "Type": "TCoAtomList"}, + {"Index": 6, "Name": "RightKeyColumns", "Type": "TCoAtomList"}, + {"Index": 7, "Name": "RightKeyDrops", "Type": "TCoAtomList"} ] }, { diff --git a/yql/essentials/core/type_ann/type_ann_core.cpp b/yql/essentials/core/type_ann/type_ann_core.cpp index fdba180472..732edecc45 100644 --- a/yql/essentials/core/type_ann/type_ann_core.cpp +++ b/yql/essentials/core/type_ann/type_ann_core.cpp @@ -12987,17 +12987,18 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["BlockDecimalMul"] = &BlockDecimalBinaryWrapper; Functions["BlockDecimalMod"] = &BlockDecimalBinaryWrapper; Functions["BlockDecimalDiv"] = &BlockDecimalBinaryWrapper; + Functions["BlockStorage"] = &BlockStorageWrapper; ExtFunctions["BlockFunc"] = &BlockFuncWrapper; - Functions["BlockMapJoinCore"] = &BlockMapJoinCoreWrapper; - ExtFunctions["AsScalar"] = &AsScalarWrapper; ExtFunctions["WideToBlocks"] = &WideToBlocksWrapper; ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper; ExtFunctions["BlockCombineHashed"] = &BlockCombineHashedWrapper; ExtFunctions["BlockMergeFinalizeHashed"] = &BlockMergeFinalizeHashedWrapper; ExtFunctions["BlockMergeManyFinalizeHashed"] = &BlockMergeFinalizeHashedWrapper; + ExtFunctions["BlockMapJoinIndex"] = &BlockMapJoinIndexWrapper; + ExtFunctions["BlockMapJoinCore"] = &BlockMapJoinCoreWrapper; ExtFunctions["SqlRename"] = &SqlRenameWrapper; ExtFunctions["OrderedSqlRename"] = &SqlRenameWrapper; diff --git a/yql/essentials/core/type_ann/type_ann_impl.h b/yql/essentials/core/type_ann/type_ann_impl.h index 717cb7b3ac..20379c8bfa 100644 --- a/yql/essentials/core/type_ann/type_ann_impl.h +++ b/yql/essentials/core/type_ann/type_ann_impl.h @@ -35,7 +35,9 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus CombineCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus GroupingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus DecimalBinaryWrapperBase(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx, bool blocks); - IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus BlockStorageWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus BlockMapJoinIndexWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); + IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TExprContext& ctx); diff --git a/yql/essentials/core/type_ann/type_ann_join.cpp b/yql/essentials/core/type_ann/type_ann_join.cpp index 78c1c89f28..cc57859624 100644 --- a/yql/essentials/core/type_ann/type_ann_join.cpp +++ b/yql/essentials/core/type_ann/type_ann_join.cpp @@ -1,7 +1,12 @@ #include "type_ann_core.h" #include "type_ann_impl.h" + #include <util/string/join.h> +#include <util/string/split.h> + +#include <yql/essentials/core/type_ann/type_ann_types.h> #include <yql/essentials/core/yql_join.h> +#include <yql/essentials/minikql/mkql_program_builder.h> namespace NYql { namespace NTypeAnnImpl { @@ -980,40 +985,212 @@ namespace NTypeAnnImpl { } } - IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + IGraphTransformer::TStatus BlockStorageWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + + if (!EnsureArgsCount(*input, 1, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType itemTypes; + if (!EnsureWideStreamBlockType(input->Head(), itemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto streamItemType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType(); + input->SetTypeAnn(ctx.Expr.MakeType<TResourceExprType>(TStringBuilder() << + NKikimr::NMiniKQL::BlockStorageResourcePrefix << FormatType(streamItemType))); + return IGraphTransformer::TStatus::Ok; + } + + bool EnsureBlockStorageResource(const TExprNode* resource, const TMultiExprType*& streamItemType, TExtContext& ctx) { + using NKikimr::NMiniKQL::BlockStorageResourcePrefix; + + if (!EnsureResourceType(*resource, ctx.Expr)) { + return false; + } + const auto resourceType = resource->GetTypeAnn()->Cast<TResourceExprType>(); + const auto resourceTag = resourceType->GetTag(); + if (!resourceTag.StartsWith(BlockStorageResourcePrefix)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(resource->Pos()), "Expected block storage resource")); + return false; + } + + auto typeString = TStringBuf(resourceTag.data() + BlockStorageResourcePrefix.size(), resourceTag.size() - BlockStorageResourcePrefix.size()); + auto typeNode = ctx.Expr.Builder(resource->Pos()) + .Callable("ParseType") + .Atom(0, typeString) + .Seal() + .Build(); + + auto status = ParseTypeWrapper(typeNode, typeNode, ctx); + if (status == IGraphTransformer::TStatus::Error) { + return false; + } + if (!EnsureType(*typeNode, ctx.Expr)) { + return false; + } + + streamItemType = typeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->UserCast<TMultiExprType>(ctx.Expr.GetPosition(resource->Pos()), ctx.Expr); + return true; + } + + IGraphTransformer::TStatus BlockMapJoinIndexWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { + Y_UNUSED(output); + + using NKikimr::NMiniKQL::BlockMapJoinIndexResourcePrefix; + using NKikimr::NMiniKQL::BlockMapJoinIndexResourceSeparator; + + if (!EnsureArgsCount(*input, 4, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + const TMultiExprType* expectedStreamItemType = nullptr; + if (!EnsureBlockStorageResource(input->Child(0), expectedStreamItemType, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType itemTypes; + if (!EnsureType(*input->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + auto inputType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!EnsureWideBlockType(input->Child(1)->Pos(), *inputType, itemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + auto streamItemType = inputType->Cast<TMultiExprType>(); + + if (!IsSameAnnotation(*streamItemType, *expectedStreamItemType)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + TStringBuilder() << "Mismatch between provided stream item type " << static_cast<const TTypeAnnotationNode&>(*streamItemType) + << "and block storage item type " << static_cast<const TTypeAnnotationNode&>(*expectedStreamItemType))); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureTupleOfAtoms(*input->Child(2), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TVector<ui32> keyColumns; + for (const auto& keyColumnNode : input->Child(2)->Children()) { + auto position = GetWideBlockFieldPosition(*streamItemType, keyColumnNode->Content()); + if (!position) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyColumnNode->Pos()), TStringBuilder() << "Unknown key column: " << keyColumnNode->Content())); + return IGraphTransformer::TStatus::Error; + } + keyColumns.push_back(*position); + } + + auto settingsValidator = [&](TStringBuf settingName, TExprNode& node, TExprContext& ctx) { + if (node.ChildrenSize() != 1) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), + TStringBuilder() << "No extra parameters are expected by setting '" << settingName << "'")); + return false; + } + return true; + }; + if (!EnsureValidSettings(input->Tail(), {"any"}, settingsValidator, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + input->SetTypeAnn(ctx.Expr.MakeType<TResourceExprType>(TStringBuilder() << + BlockMapJoinIndexResourcePrefix << FormatType(streamItemType) << BlockMapJoinIndexResourceSeparator << JoinSeq(",", keyColumns))); + return IGraphTransformer::TStatus::Ok; + } + + bool EnsureBlockMapJoinIndexResource(const TExprNode* resource, const TMultiExprType*& streamItemType, TVector<TStringBuf>& keyColumns, TExtContext& ctx) { + using NKikimr::NMiniKQL::BlockMapJoinIndexResourcePrefix; + using NKikimr::NMiniKQL::BlockMapJoinIndexResourceSeparator; + + if (!EnsureResourceType(*resource, ctx.Expr)) { + return false; + } + const auto resourceType = resource->GetTypeAnn()->Cast<TResourceExprType>(); + const auto resourceTag = resourceType->GetTag(); + if (!resourceTag.StartsWith(BlockMapJoinIndexResourcePrefix)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(resource->Pos()), "Expected block map join index resource")); + return false; + } + + auto resourceIdentifier = TStringBuf(resourceTag.data() + BlockMapJoinIndexResourcePrefix.size(), resourceTag.size() - BlockMapJoinIndexResourcePrefix.size()); + TStringBuf typeString, keyColumnsString; + Split(resourceIdentifier, BlockMapJoinIndexResourceSeparator, typeString, keyColumnsString); + Split(keyColumnsString, ",", keyColumns); + + auto resourceTypeNode = ctx.Expr.Builder(resource->Pos()) + .Callable("ParseType") + .Atom(0, typeString) + .Seal() + .Build(); + + auto status = ParseTypeWrapper(resourceTypeNode, resourceTypeNode, ctx); + if (status == IGraphTransformer::TStatus::Error) { + return false; + } + if (!EnsureType(*resourceTypeNode, ctx.Expr)) { + return false; + } + + streamItemType = resourceTypeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->UserCast<TMultiExprType>(ctx.Expr.GetPosition(resource->Pos()), ctx.Expr); + return true; + } + + IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); if (!EnsureArgsCount(*input, 8, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } + if (!EnsureAtom(*input->Child(3), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + const auto joinKind = input->Child(3)->Content(); + if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly"&& joinKind != "Cross") { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(3)->Pos()), TStringBuilder() << "Unknown join kind: " << joinKind + << ", supported: Inner, Left, LeftSemi, LeftOnly, Cross")); + return IGraphTransformer::TStatus::Error; + } + TTypeAnnotationNode::TListType leftItemTypes; if (!EnsureWideStreamBlockType(input->Head(), leftItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } leftItemTypes.pop_back(); - auto leftItemType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + auto leftStreamItemType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + + const TMultiExprType* expectedRightStreamItemType = nullptr; + TVector<TStringBuf> expectedRightKeyColumns; + if (joinKind != "Cross") { + if (!EnsureBlockMapJoinIndexResource(input->Child(1), expectedRightStreamItemType, expectedRightKeyColumns, ctx)) { + return IGraphTransformer::TStatus::Error; + } + } else { + if (!EnsureBlockStorageResource(input->Child(1), expectedRightStreamItemType, ctx)) { + return IGraphTransformer::TStatus::Error; + } + } TTypeAnnotationNode::TListType rightItemTypes; - if (!EnsureWideStreamBlockType(*input->Child(1), rightItemTypes, ctx.Expr)) { + if (!EnsureType(*input->Child(2), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - rightItemTypes.pop_back(); - auto rightItemType = input->Child(1)->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); - - if (!EnsureAtom(*input->Child(2), ctx.Expr)) { + auto rightInputType = input->Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!EnsureWideBlockType(input->Child(2)->Pos(), *rightInputType, rightItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } + rightItemTypes.pop_back(); + auto rightStreamItemType = rightInputType->Cast<TMultiExprType>(); - const auto joinKind = input->Child(2)->Content(); - if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly"&& joinKind != "Cross") { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()), TStringBuilder() << "Unknown join kind: " << joinKind - << ", supported: Inner, Left, LeftSemi, LeftOnly, Cross")); + if (!IsSameAnnotation(*rightStreamItemType, *expectedRightStreamItemType)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + TStringBuilder() << "Mismatch between provided right stream item type " << static_cast<const TTypeAnnotationNode&>(*rightStreamItemType) + << "and right block storage item type " << static_cast<const TTypeAnnotationNode&>(*expectedRightStreamItemType))); return IGraphTransformer::TStatus::Error; } - if (input->Child(3)->ChildrenSize() != input->Child(5)->ChildrenSize()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(5)->Pos()), TStringBuilder() << "Mismatch of key column count")); + if (input->Child(4)->ChildrenSize() != input->Child(6)->ChildrenSize()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(6)->Pos()), TStringBuilder() << "Mismatch of key column count")); return IGraphTransformer::TStatus::Error; } @@ -1056,36 +1233,44 @@ namespace NTypeAnnImpl { return true; }; - for (size_t childIdx = 3; childIdx <= 6; childIdx++) { + for (size_t childIdx = 4; childIdx <= 7; childIdx++) { if (!EnsureTupleOfAtoms(*input->Child(childIdx), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } } std::unordered_set<ui32> leftKeyColumns; - if (!checkKeyColumns(leftKeyColumns, true, *input->Child(3), leftItemType)) { + if (!checkKeyColumns(leftKeyColumns, true, *input->Child(4), leftStreamItemType)) { return IGraphTransformer::TStatus::Error; } std::unordered_set<ui32> leftKeyDrops; - if (!checkKeyDrops(leftKeyDrops, true, leftKeyColumns, *input->Child(4), leftItemType)) { + if (!checkKeyDrops(leftKeyDrops, true, leftKeyColumns, *input->Child(5), leftStreamItemType)) { return IGraphTransformer::TStatus::Error; } std::unordered_set<ui32> rightKeyColumns; - if (!checkKeyColumns(rightKeyColumns, false, *input->Child(5), rightItemType)) { + if (!checkKeyColumns(rightKeyColumns, true, *input->Child(6), rightStreamItemType)) { return IGraphTransformer::TStatus::Error; } std::unordered_set<ui32> rightKeyDrops; - if (!checkKeyDrops(rightKeyDrops, false, rightKeyColumns, *input->Child(6), rightItemType)) { + if (!checkKeyDrops(rightKeyDrops, false, rightKeyColumns, *input->Child(7), rightStreamItemType)) { return IGraphTransformer::TStatus::Error; } - auto settingsValidator = [&](TStringBuf, TExprNode& node, TExprContext&) { return node.ChildrenSize() == 1; }; - if (!EnsureValidSettings(input->Tail(), {"rightAny"}, settingsValidator, ctx.Expr)) { + if (input->Child(6)->ChildrenSize() != expectedRightKeyColumns.size()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + "Provided right key columns list differs from right block storage one")); return IGraphTransformer::TStatus::Error; } + for (size_t i = 0; i < input->Child(6)->ChildrenSize(); i++) { + if (input->Child(6)->Child(i)->Content() != expectedRightKeyColumns[i]) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + "Provided right key columns list differs from right block storage one")); + return IGraphTransformer::TStatus::Error; + } + } std::vector<const TTypeAnnotationNode*> resultItems; for (ui32 pos = 0; pos < leftItemTypes.size(); pos++) { diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp index 37217aec85..17c549202c 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp @@ -239,10 +239,8 @@ private: TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_; }; -template <typename TDerived> -class TBlockStorageBase : public TComputationValue<TDerived> { - using TSelf = TBlockStorageBase<TDerived>; - using TBase = TComputationValue<TDerived>; +class TBlockStorage : public TComputationValue<TBlockStorage> { + using TBase = TComputationValue<TBlockStorage>; public: struct TBlock { @@ -268,7 +266,7 @@ public: }; class TRowIterator { - friend class TBlockStorageBase; + friend class TBlockStorage; public: TRowIterator() = default; @@ -304,7 +302,7 @@ public: } private: - TRowIterator(const TSelf* blockStorage) + TRowIterator(const TBlockStorage* blockStorage) : BlockStorage_(blockStorage) {} @@ -312,19 +310,21 @@ public: size_t CurrentBlockOffset_ = 0; size_t CurrentItemOffset_ = 0; - const TSelf* BlockStorage_ = nullptr; + const TBlockStorage* BlockStorage_ = nullptr; }; - TBlockStorageBase( + TBlockStorage( TMemoryUsageInfo* memInfo, const TVector<TType*>& itemTypes, NUdf::TUnboxedValue stream, + TStringBuf resourceTag, arrow::MemoryPool* pool ) : TBase(memInfo) , InputsDescr_(ToValueDescr(itemTypes)) - , Stream_(stream) + , Stream_(std::move(stream)) , Inputs_(itemTypes.size()) + , ResourceTag_(std::move(resourceTag)) { TBlockTypeHelper helper; for (size_t i = 0; i < itemTypes.size(); i++) { @@ -341,11 +341,14 @@ public: case NUdf::EFetchStatus::Yield: return NUdf::EFetchStatus::Yield; case NUdf::EFetchStatus::Finish: + IsFinished_ = true; return NUdf::EFetchStatus::Finish; case NUdf::EFetchStatus::Ok: break; } + Y_ENSURE(!IsFinished_, "Got data on finished stream"); + std::vector<arrow::Datum> blockColumns; for (size_t i = 0; i < Inputs_.size() - 1; i++) { auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum(); @@ -384,19 +387,15 @@ public: return TRowIterator(this); } + size_t GetRowCount() const { + return RowCount_; + } + TBlockItem GetItem(TRowEntry entry, ui32 columnIdx) const { Y_ENSURE(columnIdx < Inputs_.size() - 1); return GetItemFromBlock(GetBlock(entry.BlockOffset), columnIdx, entry.ItemOffset); } - void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const { - Y_ENSURE(row.size() == ioMap.size()); - for (size_t i = 0; i < row.size(); i++) { - row[i] = GetItem(entry, ioMap[i]); - } - } - -protected: TBlockItem GetItemFromBlock(const TBlock& block, ui32 columnIdx, size_t offset) const { Y_ENSURE(offset < block.Size); const auto& datum = block.Columns[columnIdx]; @@ -408,6 +407,34 @@ protected: } } + void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const { + Y_ENSURE(row.size() == ioMap.size()); + for (size_t i = 0; i < row.size(); i++) { + row[i] = GetItem(entry, ioMap[i]); + } + } + + const TVector<NUdf::IBlockItemComparator::TPtr>& GetItemComparators() const { + return Comparators_; + } + + const TVector<NUdf::IBlockItemHasher::TPtr>& GetItemHashers() const { + return Hashers_; + } + + bool IsFinished() const { + return IsFinished_; + } + +private: + NUdf::TStringRef GetResourceTag() const override { + return NUdf::TStringRef(ResourceTag_); + } + + void* GetResource() override { + return this; + } + protected: const std::vector<arrow::ValueDescr> InputsDescr_; @@ -418,27 +445,60 @@ protected: std::vector<TBlock> Data_; size_t RowCount_ = 0; + bool IsFinished_ = false; NUdf::TUnboxedValue Stream_; TUnboxedValueVector Inputs_; + + const TStringBuf ResourceTag_; }; -class TBlockStorage: public TBlockStorageBase<TBlockStorage> { -private: - using TBase = TBlockStorageBase<TBlockStorage>; +class TBlockStorageWrapper : public TMutableComputationNode<TBlockStorageWrapper> { + using TBaseComputation = TMutableComputationNode<TBlockStorageWrapper>; + public: - using TBase::TBase; + TBlockStorageWrapper( + TComputationMutables& mutables, + TVector<TType*>&& itemTypes, + IComputationNode* stream, + const TStringBuf& resourceTag + ) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , ItemTypes_(std::move(itemTypes)) + , Stream_(stream) + , ResourceTag_(resourceTag) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TBlockStorage>( + ItemTypes_, + std::move(Stream_->GetValue(ctx)), + ResourceTag_, + &ctx.ArrowMemoryPool + ); + } + +private: + void RegisterDependencies() const final { + DependsOn(Stream_); + } + +private: + const TVector<TType*> ItemTypes_; + IComputationNode* const Stream_; + + const TString ResourceTag_; }; -class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { - using TBase = TBlockStorageBase<TIndexedBlockStorage>; +class TBlockIndex : public TComputationValue<TBlockIndex> { + using TBase = TComputationValue<TBlockIndex>; struct TIndexNode { - TRowEntry Entry; + TBlockStorage::TRowEntry Entry; TIndexNode* Next; TIndexNode() = delete; - TIndexNode(TRowEntry entry, TIndexNode* next = nullptr) + TIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* next = nullptr) : Entry(entry) , Next(next) {} @@ -450,7 +510,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { : Raw(0) {} - TIndexMapValue(TRowEntry entry) { + TIndexMapValue(TBlockStorage::TRowEntry entry) { TIndexEntryUnion un; un.Entry = entry; @@ -471,7 +531,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { return EntryList; } - TRowEntry GetEntry() const { + TBlockStorage::TRowEntry GetEntry() const { Y_ENSURE(IsInplace()); TIndexEntryUnion un; @@ -481,7 +541,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { private: union TIndexEntryUnion { - TRowEntry Entry; + TBlockStorage::TRowEntry Entry; ui64 Raw; }; @@ -504,7 +564,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { public: class TIterator { - friend class TIndexedBlockStorage; + friend class TBlockIndex; enum class EIteratorType { EMPTY, @@ -547,7 +607,7 @@ public: return *this; } - TMaybe<TRowEntry> Next() { + TMaybe<TBlockStorage::TRowEntry> Next() { Y_ENSURE(IsValid()); switch (Type_) { @@ -560,7 +620,7 @@ public: } EntryConsumed_ = true; - return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TRowEntry>(Entry_) : Nothing(); + return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TBlockStorage::TRowEntry>(Entry_) : Nothing(); case EIteratorType::LIST: for (; Node_ != nullptr; Node_ = Node_->Next) { @@ -597,12 +657,12 @@ public: } private: - TIterator(const TIndexedBlockStorage* blockIndex) + TIterator(const TBlockIndex* blockIndex) : Type_(EIteratorType::EMPTY) , BlockIndex_(blockIndex) {} - TIterator(const TIndexedBlockStorage* blockIndex, TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TBlockStorage::TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::INPLACE) , BlockIndex_(blockIndex) , Entry_(entry) @@ -610,7 +670,7 @@ public: , ItemsToLookup_(std::move(itemsToLookup)) {} - TIterator(const TIndexedBlockStorage* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::LIST) , BlockIndex_(blockIndex) , Node_(node) @@ -619,12 +679,12 @@ public: private: EIteratorType Type_; - const TIndexedBlockStorage* BlockIndex_ = nullptr; + const TBlockIndex* BlockIndex_ = nullptr; union { TIndexNode* Node_; struct { - TRowEntry Entry_; + TBlockStorage::TRowEntry Entry_; bool EntryConsumed_; }; }; @@ -633,32 +693,35 @@ public: }; public: - TIndexedBlockStorage( + TBlockIndex( TMemoryUsageInfo* memInfo, - const TVector<TType*>& itemTypes, const TVector<ui32>& keyColumns, - NUdf::TUnboxedValue stream, + NUdf::TUnboxedValue blockStorage, bool any, - arrow::MemoryPool* pool + TStringBuf resourceTag ) - : TBase(memInfo, itemTypes, stream, pool) + : TBase(memInfo) , KeyColumns_(keyColumns) + , BlockStorage_(std::move(blockStorage)) , Any_(any) + , ResourceTag_(std::move(resourceTag)) {} - NUdf::EFetchStatus FetchStream() { - Y_ENSURE(!Index_, "Data fetch shouldn't be done after the index has been built"); - return TBase::FetchStream(); - } - void BuildIndex() { - Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(RowCount_)); - for (size_t blockOffset = 0; blockOffset < Data_.size(); blockOffset++) { - const auto& block = GetBlock(blockOffset); + if (Index_) { + return; + } + + auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource()); + Y_ENSURE(blockStorage.IsFinished(), "Index build should be done after all data has been read"); + + Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(blockStorage.GetRowCount())); + for (size_t blockOffset = 0; blockOffset < blockStorage.GetBlockCount(); blockOffset++) { + const auto& block = blockStorage.GetBlock(blockOffset); auto blockSize = block.Size; std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch; - std::array<TRowEntry, PrefetchBatchSize> insertBatchEntries; + std::array<TBlockStorage::TRowEntry, PrefetchBatchSize> insertBatchEntries; std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys; ui32 insertBatchLen = 0; @@ -692,7 +755,7 @@ public: continue; } - insertBatchEntries[insertBatchLen] = TRowEntry(blockOffset, itemOffset); + insertBatchEntries[insertBatchLen] = TBlockStorage::TRowEntry(blockOffset, itemOffset); insertBatch[insertBatchLen].ConstructKey(keyHash); insertBatchLen++; @@ -709,7 +772,7 @@ public: } template<typename TGetKey> - void BatchLookup(size_t batchSize, std::array<TIndexedBlockStorage::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) { + void BatchLookup(size_t batchSize, std::array<TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) { Y_ENSURE(batchSize <= PrefetchBatchSize); std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch; @@ -737,11 +800,13 @@ public: }); } - bool IsKeyEquals(TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + bool IsKeyEquals(TBlockStorage::TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource()); + Y_ENSURE(keyItems.size() == KeyColumns_.size()); for (size_t i = 0; i < KeyColumns_.size(); i++) { - auto indexItem = GetItem(entry, KeyColumns_[i]); - if (Comparators_[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) { + auto indexItem = blockStorage.GetItem(entry, KeyColumns_[i]); + if (blockStorage.GetItemComparators()[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) { return true; } } @@ -749,25 +814,31 @@ public: return false; } + const NUdf::TUnboxedValue& GetBlockStorage() const { + return BlockStorage_; + } + private: - ui64 GetKey(const TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + ui64 GetKey(const TBlockStorage::TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource()); + ui64 keyHash = 0; keyItems.clear(); for (ui32 keyColumn : KeyColumns_) { - auto item = GetItemFromBlock(block, keyColumn, offset); + auto item = blockStorage.GetItemFromBlock(block, keyColumn, offset); if (!item) { keyItems.clear(); return 0; } - keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item)); + keyHash = CombineHashes(keyHash, blockStorage.GetItemHashers()[keyColumn]->Hash(item)); keyItems.push_back(std::move(item)); } return keyHash; } - TIndexNode* InsertIndexNode(TRowEntry entry, TIndexNode* currentHead = nullptr) { + TIndexNode* InsertIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* currentHead = nullptr) { return &IndexNodes_.emplace_back(entry, currentHead); } @@ -787,21 +858,72 @@ private: } } + NUdf::TStringRef GetResourceTag() const override { + return NUdf::TStringRef(ResourceTag_); + } + + void* GetResource() override { + return this; + } + private: const TVector<ui32>& KeyColumns_; + NUdf::TUnboxedValue BlockStorage_; std::unique_ptr<TIndexMap> Index_; std::deque<TIndexNode> IndexNodes_; const bool Any_; + const TStringBuf ResourceTag_; +}; + +class TBlockMapJoinIndexWrapper : public TMutableComputationNode<TBlockMapJoinIndexWrapper> { + using TBaseComputation = TMutableComputationNode<TBlockMapJoinIndexWrapper>; + +public: + TBlockMapJoinIndexWrapper( + TComputationMutables& mutables, + TVector<ui32>&& keyColumns, + IComputationNode* blockStorage, + bool any, + const TStringBuf& resourceTag + ) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , KeyColumns_(std::move(keyColumns)) + , BlockStorage_(blockStorage) + , Any_(any) + , ResourceTag_(resourceTag) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TBlockIndex>( + KeyColumns_, + std::move(BlockStorage_->GetValue(ctx)), + Any_, + ResourceTag_ + ); + } + +private: + void RegisterDependencies() const final { + DependsOn(BlockStorage_); + } + +private: + const TVector<ui32> KeyColumns_; + IComputationNode* const BlockStorage_; + const bool Any_; + const TString ResourceTag_; }; -template <bool WithoutRight, bool RightRequired, bool RightAny> -class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>> +template <bool WithoutRight, bool RightRequired> +class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>> { -using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>; -using TJoinState = TBlockJoinState<RightRequired>; -using TIndexState = TIndexedBlockStorage; + using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>>; + using TJoinState = TBlockJoinState<RightRequired>; + using TStorageState = TBlockStorage; + using TIndexState = TBlockIndex; + public: TBlockMapJoinCoreWraper( TComputationMutables& mutables, @@ -809,22 +931,18 @@ public: const TVector<TType*>&& leftItemTypes, const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap, - const TVector<TType*>&& rightItemTypes, - const TVector<ui32>&& rightKeyColumns, const TVector<ui32>&& rightIOMap, IComputationNode* leftStream, - IComputationNode* rightStream + IComputationNode* rightBlockIndex ) : TBaseComputation(mutables, EValueRepresentation::Boxed) , ResultItemTypes_(std::move(resultItemTypes)) , LeftItemTypes_(std::move(leftItemTypes)) , LeftKeyColumns_(std::move(leftKeyColumns)) , LeftIOMap_(std::move(leftIOMap)) - , RightItemTypes_(std::move(rightItemTypes)) - , RightKeyColumns_(std::move(rightKeyColumns)) , RightIOMap_(std::move(rightIOMap)) - , LeftStream_(std::move(leftStream)) - , RightStream_(std::move(rightStream)) + , LeftStream_(leftStream) + , RightBlockIndex_(rightBlockIndex) , KeyTupleCache_(mutables) {} @@ -835,60 +953,49 @@ public: LeftIOMap_, ResultItemTypes_ ); - const auto indexState = ctx.HolderFactory.Create<TIndexState>( - RightItemTypes_, - RightKeyColumns_, - std::move(RightStream_->GetValue(ctx)), - RightAny, - &ctx.ArrowMemoryPool - ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, std::move(joinState), - std::move(indexState), - std::move(LeftStream_->GetValue(ctx)), LeftKeyColumns_, - std::move(RightStream_->GetValue(ctx)), - RightKeyColumns_, - RightIOMap_ + RightIOMap_, + std::move(LeftStream_->GetValue(ctx)), + std::move(RightBlockIndex_->GetValue(ctx)) ); } private: class TStreamValue : public TComputationValue<TStreamValue> { - using TBase = TComputationValue<TStreamValue>; + using TBase = TComputationValue<TStreamValue>; + public: TStreamValue( TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, NUdf::TUnboxedValue&& joinState, - NUdf::TUnboxedValue&& indexState, - NUdf::TUnboxedValue&& leftStream, const TVector<ui32>& leftKeyColumns, - NUdf::TUnboxedValue&& rightStream, - const TVector<ui32>& rightKeyColumns, - const TVector<ui32>& rightIOMap + const TVector<ui32>& rightIOMap, + NUdf::TUnboxedValue&& leftStream, + NUdf::TUnboxedValue&& rightBlockIndex ) : TBase(memInfo) , JoinState_(joinState) - , IndexState_(indexState) - , LeftStream_(leftStream) , LeftKeyColumns_(leftKeyColumns) - , RightStream_(rightStream) - , RightKeyColumns_(rightKeyColumns) , RightIOMap_(rightIOMap) + , LeftStream_(leftStream) + , RightBlockIndex_(rightBlockIndex) , HolderFactory_(holderFactory) {} private: NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get()); - auto& indexState = *static_cast<TIndexState*>(IndexState_.AsBoxed().Get()); + auto& indexState = *static_cast<TIndexState*>(RightBlockIndex_.GetResource()); + auto& storageState = *static_cast<TStorageState*>(indexState.GetBlockStorage().GetResource()); if (!RightStreamConsumed_) { auto fetchStatus = NUdf::EFetchStatus::Ok; while (fetchStatus != NUdf::EFetchStatus::Finish) { - fetchStatus = indexState.FetchStream(); + fetchStatus = storageState.FetchStream(); if (fetchStatus == NUdf::EFetchStatus::Yield) { return NUdf::EFetchStatus::Yield; } @@ -931,7 +1038,7 @@ private: while (joinState.IsNotFull() && !iter.IsEmpty()) { auto key = iter.Next(); - indexState.GetRow(*key, RightIOMap_, rightRow); + storageState.GetRow(*key, RightIOMap_, rightRow); joinState.MakeRow(rightRow); } @@ -993,13 +1100,9 @@ private: } NUdf::TUnboxedValue JoinState_; - NUdf::TUnboxedValue IndexState_; - NUdf::TUnboxedValue LeftStream_; const TVector<ui32>& LeftKeyColumns_; - NUdf::TUnboxedValue RightStream_; - const TVector<ui32>& RightKeyColumns_; const TVector<ui32>& RightIOMap_; bool RightStreamConsumed_ = false; @@ -1007,12 +1110,15 @@ private: ui32 LookupBatchCurrent_ = 0; ui32 LookupBatchSize_ = 0; + NUdf::TUnboxedValue LeftStream_; + NUdf::TUnboxedValue RightBlockIndex_; + const THolderFactory& HolderFactory_; }; void RegisterDependencies() const final { this->DependsOn(LeftStream_); - this->DependsOn(RightStream_); + this->DependsOn(RightBlockIndex_); } private: @@ -1022,44 +1128,37 @@ private: const TVector<ui32> LeftKeyColumns_; const TVector<ui32> LeftIOMap_; - const TVector<TType*> RightItemTypes_; - const TVector<ui32> RightKeyColumns_; const TVector<ui32> RightIOMap_; IComputationNode* const LeftStream_; - IComputationNode* const RightStream_; + IComputationNode* const RightBlockIndex_; const TContainerCacheOnContext KeyTupleCache_; }; class TBlockCrossJoinCoreWraper : public TMutableComputationNode<TBlockCrossJoinCoreWraper> { -using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>; -using TJoinState = TBlockJoinState<true>; -using TStorageState = TBlockStorage; + using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>; + using TJoinState = TBlockJoinState<true>; + using TStorageState = TBlockStorage; + public: TBlockCrossJoinCoreWraper( TComputationMutables& mutables, const TVector<TType*>&& resultItemTypes, const TVector<TType*>&& leftItemTypes, - const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap, - const TVector<TType*>&& rightItemTypes, - const TVector<ui32>&& rightKeyColumns, const TVector<ui32>&& rightIOMap, IComputationNode* leftStream, - IComputationNode* rightStream + IComputationNode* rightBlockStorage ) : TBaseComputation(mutables, EValueRepresentation::Boxed) , ResultItemTypes_(std::move(resultItemTypes)) , LeftItemTypes_(std::move(leftItemTypes)) - , LeftKeyColumns_(std::move(leftKeyColumns)) , LeftIOMap_(std::move(leftIOMap)) - , RightItemTypes_(std::move(rightItemTypes)) - , RightKeyColumns_(std::move(rightKeyColumns)) , RightIOMap_(std::move(rightIOMap)) , LeftStream_(std::move(leftStream)) - , RightStream_(std::move(rightStream)) + , RightBlockStorage_(std::move(rightBlockStorage)) , KeyTupleCache_(mutables) {} @@ -1070,47 +1169,40 @@ public: LeftIOMap_, ResultItemTypes_ ); - const auto indexState = ctx.HolderFactory.Create<TStorageState>( - RightItemTypes_, - std::move(RightStream_->GetValue(ctx)), - &ctx.ArrowMemoryPool - ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, std::move(joinState), - std::move(indexState), + RightIOMap_, std::move(LeftStream_->GetValue(ctx)), - std::move(RightStream_->GetValue(ctx)), - RightIOMap_ + std::move(RightBlockStorage_->GetValue(ctx)) ); } private: class TStreamValue : public TComputationValue<TStreamValue> { - using TBase = TComputationValue<TStreamValue>; + using TBase = TComputationValue<TStreamValue>; + public: TStreamValue( TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, NUdf::TUnboxedValue&& joinState, - NUdf::TUnboxedValue&& storageState, + const TVector<ui32>& rightIOMap, NUdf::TUnboxedValue&& leftStream, - NUdf::TUnboxedValue&& rightStream, - const TVector<ui32>& rightIOMap + NUdf::TUnboxedValue&& rightBlockStorage ) : TBase(memInfo) , JoinState_(joinState) - , StorageState_(storageState) - , LeftStream_(leftStream) - , RightStream_(rightStream) , RightIOMap_(rightIOMap) + , LeftStream_(leftStream) + , RightBlockStorage_(rightBlockStorage) , HolderFactory_(holderFactory) {} private: NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get()); - auto& storageState = *static_cast<TStorageState*>(StorageState_.AsBoxed().Get()); + auto& storageState = *static_cast<TStorageState*>(RightBlockStorage_.GetResource()); if (!RightStreamConsumed_) { auto fetchStatus = NUdf::EFetchStatus::Ok; @@ -1176,43 +1268,109 @@ private: } NUdf::TUnboxedValue JoinState_; - NUdf::TUnboxedValue StorageState_; - - NUdf::TUnboxedValue LeftStream_; - NUdf::TUnboxedValue RightStream_; const TVector<ui32>& RightIOMap_; bool RightStreamConsumed_ = false; TStorageState::TRowIterator RightRowIterator_; + NUdf::TUnboxedValue LeftStream_; + NUdf::TUnboxedValue RightBlockStorage_; + const THolderFactory& HolderFactory_; }; void RegisterDependencies() const final { this->DependsOn(LeftStream_); - this->DependsOn(RightStream_); + this->DependsOn(RightBlockStorage_); } private: const TVector<TType*> ResultItemTypes_; const TVector<TType*> LeftItemTypes_; - const TVector<ui32> LeftKeyColumns_; const TVector<ui32> LeftIOMap_; - const TVector<TType*> RightItemTypes_; - const TVector<ui32> RightKeyColumns_; const TVector<ui32> RightIOMap_; IComputationNode* const LeftStream_; - IComputationNode* const RightStream_; + IComputationNode* const RightBlockStorage_; const TContainerCacheOnContext KeyTupleCache_; }; } // namespace +IComputationNode* WrapBlockStorage(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg"); + + const auto resultType = callable.GetType()->GetReturnType(); + MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type"); + auto resultResourceType = AS_TYPE(TResourceType, resultType); + MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsStream(), "Expected WideStream as an input stream"); + const auto inputStreamType = AS_TYPE(TStreamType, inputType); + MKQL_ENSURE(inputStreamType->GetItemType()->IsMulti(), + "Expected Multi as a left stream item type"); + const auto inputStreamComponents = GetWideComponents(inputStreamType); + MKQL_ENSURE(inputStreamComponents.size() > 0, "Expected at least one column"); + TVector<TType*> inputStreamItems(inputStreamComponents.cbegin(), inputStreamComponents.cend()); + + const auto inputStream = LocateNode(ctx.NodeLocator, callable, 0); + return new TBlockStorageWrapper( + ctx.Mutables, + std::move(inputStreamItems), + inputStream, + resultResourceType->GetTag() + ); +} + +IComputationNode* WrapBlockMapJoinIndex(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + + const auto resultType = callable.GetType()->GetReturnType(); + MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type"); + auto resultResourceType = AS_TYPE(TResourceType, resultType); + MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsResource(), "Expected Resource as an input type"); + auto inputResourceType = AS_TYPE(TResourceType, inputType); + MKQL_ENSURE(inputResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + auto origInputItemType = AS_VALUE(TTypeType, callable.GetInput(1)); + MKQL_ENSURE(origInputItemType->IsMulti(), "Expected Multi as an input item type"); + const auto streamComponents = AS_TYPE(TMultiType, origInputItemType)->GetElements(); + MKQL_ENSURE(streamComponents.size() > 0, "Expected at least one column"); + + const auto keyColumnsLiteral = callable.GetInput(2); + const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral); + TVector<ui32> keyColumns; + keyColumns.reserve(keyColumnsTuple->GetValuesCount()); + for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) { + const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i)); + keyColumns.emplace_back(item->AsValue().Get<ui32>()); + } + + for (ui32 keyColumn : keyColumns) { + MKQL_ENSURE(keyColumn < streamComponents.size() - 1, "Key column out of range"); + } + + const auto anyNode = callable.GetInput(3); + const auto any = AS_VALUE(TDataLiteral, anyNode)->AsValue().Get<bool>(); + + const auto blockStorage = LocateNode(ctx.NodeLocator, callable, 0); + return new TBlockMapJoinIndexWrapper( + ctx.Mutables, + std::move(keyColumns), + blockStorage, + any, + resultResourceType->GetTag() + ); +} + IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); @@ -1234,22 +1392,28 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column"); const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend()); - const auto rightType = callable.GetInput(1).GetStaticType(); - MKQL_ENSURE(rightType->IsStream(), "Expected WideStream as a right stream"); - const auto rightStreamType = AS_TYPE(TStreamType, rightType); - MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(), - "Expected Multi as a right stream item type"); - const auto rightStreamComponents = GetWideComponents(rightStreamType); - MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column"); - const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend()); - - const auto joinKindNode = callable.GetInput(2); + const auto joinKindNode = callable.GetInput(3); const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>(); const auto joinKind = GetJoinKind(rawKind); Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross); - const auto leftKeyColumnsLiteral = callable.GetInput(3); + const auto rightBlockStorageType = callable.GetInput(1).GetStaticType(); + MKQL_ENSURE(rightBlockStorageType->IsResource(), "Expected Resource as a right type"); + auto rightBlockStorageResourceType = AS_TYPE(TResourceType, rightBlockStorageType); + if (joinKind != EJoinKind::Cross) { + MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + } else { + MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + } + + auto origRightItemType = AS_VALUE(TTypeType, callable.GetInput(2)); + MKQL_ENSURE(origRightItemType->IsMulti(), "Expected Multi as a right stream item type"); + const auto rightStreamComponents = AS_TYPE(TMultiType, origRightItemType)->GetElements(); + MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column"); + const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend()); + + const auto leftKeyColumnsLiteral = callable.GetInput(4); const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral); TVector<ui32> leftKeyColumns; leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount()); @@ -1259,7 +1423,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend()); - const auto leftKeyDropsLiteral = callable.GetInput(4); + const auto leftKeyDropsLiteral = callable.GetInput(5); const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral); THashSet<ui32> leftKeyDrops; leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount()); @@ -1273,7 +1437,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo "Only key columns has to be specified in drop column set"); } - const auto rightKeyColumnsLiteral = callable.GetInput(5); + const auto rightKeyColumnsLiteral = callable.GetInput(6); const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral); TVector<ui32> rightKeyColumns; rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount()); @@ -1283,7 +1447,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend()); - const auto rightKeyDropsLiteral = callable.GetInput(6); + const auto rightKeyDropsLiteral = callable.GetInput(7); const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral); THashSet<ui32> rightKeyDrops; rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount()); @@ -1303,9 +1467,6 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch"); - const auto rightAnyNode = callable.GetInput(7); - const auto rightAny = AS_VALUE(TDataLiteral, rightAnyNode)->AsValue().Get<bool>(); - // XXX: Mind the last wide item, containing block length. TVector<ui32> leftIOMap; for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { @@ -1329,62 +1490,40 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0); - const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1); + const auto rightBlockStorage = LocateNode(ctx.NodeLocator, callable, 1); -#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY) \ - return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY>( \ +#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED) \ + return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED>( \ ctx.Mutables, \ std::move(joinItems), \ std::move(leftStreamItems), \ std::move(leftKeyColumns), \ std::move(leftIOMap), \ - std::move(rightStreamItems), \ - std::move(rightKeyColumns), \ std::move(rightIOMap), \ leftStream, \ - rightStream \ + rightBlockStorage \ ) switch (joinKind) { case EJoinKind::Inner: - if (rightAny) { - JOIN_WRAPPER(false, true, true); - } else { - JOIN_WRAPPER(false, true, false); - } + JOIN_WRAPPER(false, true); case EJoinKind::Left: - if (rightAny) { - JOIN_WRAPPER(false, false, true); - } else { - JOIN_WRAPPER(false, false, false); - } + JOIN_WRAPPER(false, false); case EJoinKind::LeftSemi: MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join"); - if (rightAny) { - JOIN_WRAPPER(true, true, true); - } else { - JOIN_WRAPPER(true, true, false); - } + JOIN_WRAPPER(true, true); case EJoinKind::LeftOnly: MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join"); - if (rightAny) { - JOIN_WRAPPER(true, false, true); - } else { - JOIN_WRAPPER(true, false, false); - } + JOIN_WRAPPER(true, false); case EJoinKind::Cross: - MKQL_ENSURE(!rightAny, "rightAny can't be used with cross join"); return new TBlockCrossJoinCoreWraper( ctx.Mutables, std::move(joinItems), std::move(leftStreamItems), - std::move(leftKeyColumns), std::move(leftIOMap), - std::move(rightStreamItems), - std::move(rightKeyColumns), std::move(rightIOMap), leftStream, - rightStream + rightBlockStorage ); default: /* TODO: Display the human-readable join kind name. */ diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.h b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.h index a556828aae..d38e0c6115 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.h +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.h @@ -4,6 +4,8 @@ namespace NKikimr { namespace NMiniKQL { +IComputationNode* WrapBlockStorage(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapBlockMapJoinIndex(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx); } // NKikimr diff --git a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp index 7f5550a46f..3b2119ba21 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp @@ -315,6 +315,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"BlockDecimalDiv", &WrapBlockDecimalDiv}, {"BlockDecimalMod", &WrapBlockDecimalMod}, {"ScalarApply", &WrapScalarApply}, + {"BlockStorage", &WrapBlockStorage}, + {"BlockMapJoinIndex", &WrapBlockMapJoinIndex}, {"BlockMapJoinCore", &WrapBlockMapJoinCore}, {"MakeHeap", &WrapMakeHeap}, {"PushHeap", &WrapPushHeap}, diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp index b40da26574..add6f4731a 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp @@ -46,53 +46,115 @@ TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind, const auto leftStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList)); const auto rightStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, rightList)); - const auto leftStreamItems = ValidateBlockStreamType(leftStream.GetStaticType()); - const auto rightStreamItems = ValidateBlockStreamType(rightStream.GetStaticType()); - - TVector<TType*> joinReturnItems; + const auto joinReturnType = MakeJoinType(pgmBuilder, + joinKind, + leftStream.GetStaticType(), + leftKeyDrops, + rightStream.GetStaticType(), + rightKeyDrops + ); - const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend()); - for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { // Excluding block size - if (leftKeyDropsSet.contains(i)) { - continue; - } - joinReturnItems.push_back(pgmBuilder.NewBlockType(leftStreamItems[i], TBlockType::EShape::Many)); + auto rightBlockStorageNode = pgmBuilder.BlockStorage(rightStream, pgmBuilder.NewResourceType(BlockStorageResourcePrefix)); + if (joinKind != EJoinKind::Cross) { + rightBlockStorageNode = pgmBuilder.BlockMapJoinIndex( + rightBlockStorageNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + rightKeyColumns, + rightAny, + pgmBuilder.NewResourceType(BlockMapJoinIndexResourcePrefix) + ); } - if (joinKind != EJoinKind::LeftSemi && joinKind != EJoinKind::LeftOnly) { - const THashSet<ui32> rightKeyDropsSet(rightKeyDrops.cbegin(), rightKeyDrops.cend()); - for (size_t i = 0; i < rightStreamItems.size() - 1; i++) { // Excluding block size - if (rightKeyDropsSet.contains(i)) { - continue; - } + auto joinNode = pgmBuilder.BlockMapJoinCore( + leftStream, + rightBlockStorageNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + joinKind, + leftKeyColumns, + leftKeyDrops, + rightKeyColumns, + rightKeyDrops, + joinReturnType + ); - joinReturnItems.push_back(pgmBuilder.NewBlockType( - joinKind == EJoinKind::Inner ? rightStreamItems[i] - : IsOptionalOrNull(rightStreamItems[i]) ? rightStreamItems[i] - : pgmBuilder.NewOptionalType(rightStreamItems[i]), - TBlockType::EShape::Many - )); - } - } + return FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode)); +} - joinReturnItems.push_back(pgmBuilder.NewBlockType(pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); +TRuntimeNode BuildBlockJoinsWithNodeMultipleUsage(TProgramBuilder& pgmBuilder, EJoinKind joinKind, + TRuntimeNode leftList, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, + TRuntimeNode rightList, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny +) { + Y_ENSURE(joinKind == EJoinKind::Inner); + Y_ENSURE(!rightAny); + Y_ENSURE(leftKeyDrops.empty() && rightKeyDrops.empty()); + + const auto leftStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList)); + const auto leftStream2 = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList)); + const auto leftStream3 = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList)); + + const auto rightStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, rightList)); + + const auto joinReturnType = MakeJoinType(pgmBuilder, + joinKind, + leftStream.GetStaticType(), + leftKeyDrops, + rightStream.GetStaticType(), + rightKeyDrops + ); + + auto rightBlockStorageNode = pgmBuilder.BlockStorage(rightStream, pgmBuilder.NewResourceType(BlockStorageResourcePrefix)); + auto rightBlockIndexNode = pgmBuilder.BlockMapJoinIndex( + rightBlockStorageNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + rightKeyColumns, + rightAny, + pgmBuilder.NewResourceType(BlockMapJoinIndexResourcePrefix) + ); - TType* joinReturnType = pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(joinReturnItems)); auto joinNode = pgmBuilder.BlockMapJoinCore( leftStream, - rightStream, - joinKind, + rightBlockIndexNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + EJoinKind::Inner, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, - rightAny, joinReturnType ); - return FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode)); + auto joinNode2 = pgmBuilder.BlockMapJoinCore( + leftStream2, + rightBlockIndexNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + EJoinKind::Inner, + leftKeyColumns, + leftKeyDrops, + rightKeyColumns, + rightKeyDrops, + joinReturnType + ); + + auto joinNode3 = pgmBuilder.BlockMapJoinCore( + leftStream3, + rightBlockStorageNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + EJoinKind::Cross, + {}, + {}, + {}, + {}, + joinReturnType + ); + + return pgmBuilder.OrderedExtend({ + FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode)), + FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode2)), + FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode3)) + }); } +template<auto BuildBlockJoinFunc> NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny, @@ -112,7 +174,7 @@ NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, TRuntimeNode leftList = pb.Arg(pb.NewListType(leftBlockType)); TRuntimeNode rightList = pb.Arg(pb.NewListType(rightBlockType)); - const auto joinNode = BuildBlockJoin(pb, joinKind, leftList, leftKeyColumns, leftKeyDrops, rightList, rightKeyColumns, rightKeyDrops, rightAny); + const auto joinNode = BuildBlockJoinFunc(pb, joinKind, leftList, leftKeyColumns, leftKeyDrops, rightList, rightKeyColumns, rightKeyDrops, rightAny); const auto joinType = joinNode.GetStaticType(); Y_ENSURE(joinType->IsList(), "Join result has to be list"); @@ -137,6 +199,7 @@ NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, return FromBlocks(ctx, AS_TYPE(TTupleType, joinItemType)->GetElements(), graph->GetValue()); } +template<auto BuildBlockJoinFunc = BuildBlockJoin> void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind, TType* expectedType, const NUdf::TUnboxedValue& expected, TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, @@ -146,7 +209,7 @@ void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind, ) { const size_t testSize = leftListValue.GetListLength(); for (size_t blockSize = 1; blockSize <= testSize; blockSize <<= 1) { - const auto got = DoTestBlockJoin(setup, + const auto got = DoTestBlockJoin<BuildBlockJoinFunc>(setup, leftType, std::move(leftListValue), leftKeyColumns, leftKeyDrops, rightType, std::move(rightListValue), rightKeyColumns, rightKeyDrops, rightAny, joinKind, blockSize, scalar @@ -1323,5 +1386,84 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestCross) { } // Y_UNIT_TEST_SUITE +Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestNodeMultipleUsage) { + constexpr size_t testSize = 1 << 7; + constexpr size_t valueSize = 3; + static const TVector<TString> threeLetterValues = GenerateValues(valueSize); + static const TSet<ui64> fibonacci = GenerateFibonacci(testSize); + static const TString hugeString(128, '1'); + + Y_UNIT_TEST(TestBasic) { + TSetup<false> setup(GetNodeFactory()); + + // 1. Make input for the "left" stream. + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[key]; }); + + // 2. Make input for the "right" stream. + const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + TVector<TString> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), + [](const auto key) { return std::to_string(key); }); + + // 3. Make "expected" data. + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<ui64> expectedRightKey; + TVector<TString> expectedRightValue; + + TMap<ui64, TString> rightMap; + for (size_t i = 0; i < rightKeyInit.size(); i++) { + rightMap[rightKeyInit[i]] = rightValueInit[i]; + } + + // Two inner joins + for (size_t join = 0; join < 2; join++) { + for (size_t i = 0; i < leftKeyInit.size(); i++) { + const auto& found = rightMap.find(leftKeyInit[i]); + if (found != rightMap.cend()) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightKey.push_back(found->first); + expectedRightValue.push_back(found->second); + } + } + } + + // Cross join + for (size_t i = 0; i < leftKeyInit.size(); i++) { + for (size_t j = 0; j < rightKeyInit.size(); j++) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightKey.push_back(rightKeyInit[j]); + expectedRightValue.push_back(rightValueInit[j]); + } + } + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightKey, expectedRightValue); + + RunTestBlockJoin<BuildBlockJoinsWithNodeMultipleUsage>(setup, EJoinKind::Inner, expectedType, expected, + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {} + ); + } + +} // Y_UNIT_TEST_SUITE + } // namespace NMiniKQL } // namespace NKikimr diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp index fdd65bbba2..2dbeb2dbac 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp @@ -153,6 +153,43 @@ TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool sc return pgmBuilder.NewTupleType(blockItemTypes); } +TType* MakeJoinType(TProgramBuilder& pgmBuilder, EJoinKind joinKind, + TType* leftStreamType, const TVector<ui32>& leftKeyDrops, + TType* rightStreamType, const TVector<ui32>& rightKeyDrops +) { + const auto leftStreamItems = ValidateBlockStreamType(leftStreamType); + const auto rightStreamItems = ValidateBlockStreamType(rightStreamType); + + TVector<TType*> joinReturnItems; + + const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend()); + for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { // Excluding block size + if (leftKeyDropsSet.contains(i)) { + continue; + } + joinReturnItems.push_back(pgmBuilder.NewBlockType(leftStreamItems[i], TBlockType::EShape::Many)); + } + + if (joinKind != EJoinKind::LeftSemi && joinKind != EJoinKind::LeftOnly) { + const THashSet<ui32> rightKeyDropsSet(rightKeyDrops.cbegin(), rightKeyDrops.cend()); + for (size_t i = 0; i < rightStreamItems.size() - 1; i++) { // Excluding block size + if (rightKeyDropsSet.contains(i)) { + continue; + } + + joinReturnItems.push_back(pgmBuilder.NewBlockType( + joinKind == EJoinKind::Inner ? rightStreamItems[i] + : IsOptionalOrNull(rightStreamItems[i]) ? rightStreamItems[i] + : pgmBuilder.NewOptionalType(rightStreamItems[i]), + TBlockType::EShape::Many + )); + } + } + + joinReturnItems.push_back(pgmBuilder.NewBlockType(pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); + return pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(joinReturnItems)); +} + NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values ) { diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h index ecff426c92..5d4642a87c 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h @@ -10,6 +10,10 @@ inline bool IsOptionalOrNull(const TType* type) { } TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar); +TType* MakeJoinType(TProgramBuilder& pgmBuilder, EJoinKind joinKind, + TType* leftStreamType, const TVector<ui32>& leftKeyDrops, + TType* rightStreamType, const TVector<ui32>& rightKeyDrops +); NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index 717eb9ec32..b139c24058 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -321,6 +321,11 @@ void EnsureDataOrOptionalOfData(TRuntimeNode node) { ->GetItemType()->IsData(), "Expected data or optional of data"); } +std::vector<TType*> ValidateBlockType(const TType* type, bool unwrap) { + const auto wideComponents = AS_TYPE(TMultiType, type)->GetElements(); + return ValidateBlockItems(wideComponents, unwrap); +} + std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap) { const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType)); return ValidateBlockItems(wideComponents, unwrap); @@ -6009,29 +6014,82 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind, +TRuntimeNode TProgramBuilder::BlockStorage(TRuntimeNode stream, TType* returnType) { + if constexpr (RuntimeVersion < 59U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + ValidateBlockStreamType(stream.GetStaticType()); + + MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type"); + auto returnResourceType = AS_TYPE(TResourceType, returnType); + MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(stream); + + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockMapJoinIndex(TRuntimeNode blockStorage, TType* streamItemType, const TArrayRef<const ui32>& keyColumns, bool any, TType* returnType) { + if constexpr (RuntimeVersion < 59U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + MKQL_ENSURE(blockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type"); + auto blockStorageType = AS_TYPE(TResourceType, blockStorage.GetStaticType()); + MKQL_ENSURE(blockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + ValidateBlockType(streamItemType); + + MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type"); + auto returnResourceType = AS_TYPE(TResourceType, returnType); + MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + + TRuntimeNode::TList keyColumnsNodes; + keyColumnsNodes.reserve(keyColumns.size()); + std::transform(keyColumns.cbegin(), keyColumns.cend(), + std::back_inserter(keyColumnsNodes), [this](const ui32 idx) { + return NewDataLiteral(idx); + }); + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(blockStorage); + callableBuilder.Add(TRuntimeNode(streamItemType, true)); + callableBuilder.Add(NewTuple(keyColumnsNodes)); + callableBuilder.Add(NewDataLiteral((bool)any)); + + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightBlockStorage, TType* rightStreamItemType, EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops, - const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType + const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, TType* returnType ) { - if constexpr (RuntimeVersion < 53U) { + if constexpr (RuntimeVersion < 59U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - if (RuntimeVersion < 57U && joinKind == EJoinKind::Cross) { - THROW yexception() << __func__ << " does not support cross join in runtime version (" << RuntimeVersion << ")"; + + MKQL_ENSURE(rightBlockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type"); + auto rightBlockStorageType = AS_TYPE(TResourceType, rightBlockStorage.GetStaticType()); + if (joinKind != EJoinKind::Cross) { + MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + } else { + MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); } MKQL_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross, "Unsupported join kind"); MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch"); - if (joinKind == EJoinKind::Cross) { - MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join"); - } else { + if (joinKind != EJoinKind::Cross) { MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); + } else { + MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join"); } ValidateBlockStreamType(leftStream.GetStaticType()); - ValidateBlockStreamType(rightStream.GetStaticType()); + ValidateBlockType(rightStreamItemType); ValidateBlockStreamType(returnType); TRuntimeNode::TList leftKeyColumnsNodes; @@ -6064,13 +6122,13 @@ TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntime TCallableBuilder callableBuilder(Env, __func__, returnType); callableBuilder.Add(leftStream); - callableBuilder.Add(rightStream); + callableBuilder.Add(rightBlockStorage); + callableBuilder.Add(TRuntimeNode(rightStreamItemType, true)); callableBuilder.Add(NewDataLiteral((ui32)joinKind)); callableBuilder.Add(NewTuple(leftKeyColumnsNodes)); callableBuilder.Add(NewTuple(leftKeyDropsNodes)); callableBuilder.Add(NewTuple(rightKeyColumnsNodes)); callableBuilder.Add(NewTuple(rightKeyDropsNodes)); - callableBuilder.Add(NewDataLiteral((bool)rightAny)); return TRuntimeNode(callableBuilder.Build(), false); } diff --git a/yql/essentials/minikql/mkql_program_builder.h b/yql/essentials/minikql/mkql_program_builder.h index b67d80861d..01c0223293 100644 --- a/yql/essentials/minikql/mkql_program_builder.h +++ b/yql/essentials/minikql/mkql_program_builder.h @@ -17,6 +17,9 @@ class TBuiltinFunctionRegistry; constexpr std::string_view RandomMTResource = "MTRand"; constexpr std::string_view ResourceQueuePrefix = "TResourceQueue:"; +constexpr std::string_view BlockStorageResourcePrefix = "TBlockStorage:"; +constexpr std::string_view BlockMapJoinIndexResourcePrefix = "TBlockMapJoinIndex:"; +constexpr std::string_view BlockMapJoinIndexResourceSeparator = ":$YqlKeyColumns:"; enum class EJoinKind { Min = 1, @@ -133,6 +136,7 @@ struct TAggInfo { std::vector<ui32> ArgsColumns; }; +std::vector<TType*> ValidateBlockType(const TType* type, bool unwrap = true); std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap = true); std::vector<TType*> ValidateBlockFlowType(const TType* flowType, bool unwrap = true); @@ -258,9 +262,12 @@ public: TRuntimeNode BlockFromPg(TRuntimeNode input, TType* returnType); TRuntimeNode BlockPgResolvedCall(const std::string_view& name, ui32 id, const TArrayRef<const TRuntimeNode>& args, TType* returnType); - TRuntimeNode BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind, + TRuntimeNode BlockStorage(TRuntimeNode stream, TType* returnType); + TRuntimeNode BlockMapJoinIndex(TRuntimeNode blockStorage, TType* streamItemType, const TArrayRef<const ui32>& keyColumns, bool any, TType* returnType); + TRuntimeNode BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightBlockStorage, TType* rightStreamItemType, EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops, - const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType); + const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, TType* returnType + ); //-- logical functions TRuntimeNode BlockNot(TRuntimeNode data); diff --git a/yql/essentials/minikql/mkql_runtime_version.h b/yql/essentials/minikql/mkql_runtime_version.h index 8782dd1942..d0dc62b939 100644 --- a/yql/essentials/minikql/mkql_runtime_version.h +++ b/yql/essentials/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 58U +#define MKQL_RUNTIME_VERSION 59U #endif // History: diff --git a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp index 5d7e22fa2f..123b59cc04 100644 --- a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp +++ b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp @@ -1672,24 +1672,44 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType); }); + AddCallable("BlockStorage", [](const TExprNode& node, TMkqlBuildContext& ctx) { + const auto stream = MkqlBuildExpr(node.Head(), ctx); + const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + return ctx.ProgramBuilder.BlockStorage(stream, returnType); + }); + + AddCallable("BlockMapJoinIndex", [](const TExprNode& node, TMkqlBuildContext& ctx) { + const auto blockStorage = MkqlBuildExpr(node.Head(), ctx); + const auto itemType = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TMultiExprType>(); + + std::vector<ui32> keyColumns; + node.Child(2)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetWideBlockFieldPosition(*itemType, child.Content())); }); + + const bool any = HasSetting(node.Tail(), "any"); + + const auto itemMkqlType = BuildType(*node.Child(1), *itemType, ctx.ProgramBuilder); + const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + return ctx.ProgramBuilder.BlockMapJoinIndex(blockStorage, itemMkqlType, keyColumns, any, returnType); + }); + AddCallable("BlockMapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) { const auto leftStream = MkqlBuildExpr(node.Head(), ctx); - const auto rightStream = MkqlBuildExpr(*node.Child(1), ctx); - const auto joinKind = GetJoinKind(node, node.Child(2)->Content()); + const auto rightBlockStorage = MkqlBuildExpr(*node.Child(1), ctx); const auto leftItemType = node.Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); - const auto rightItemType = node.Child(1U)->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + const auto rightItemType = node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TMultiExprType>(); - std::vector<ui32> leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops; - node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); - node.Child(4)->ForEachChild([&](const TExprNode& child){ leftKeyDrops.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); - node.Child(5)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); - node.Child(6)->ForEachChild([&](const TExprNode& child){ rightKeyDrops.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); + const auto joinKind = GetJoinKind(node, node.Child(3)->Content()); - bool rightAny = HasSetting(node.Tail(), "rightAny"); + std::vector<ui32> leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops; + node.Child(4)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); + node.Child(5)->ForEachChild([&](const TExprNode& child){ leftKeyDrops.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); + node.Child(6)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); + node.Child(7)->ForEachChild([&](const TExprNode& child){ rightKeyDrops.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); + const auto rightItemMkqlType = BuildType(*node.Child(2), *rightItemType, ctx.ProgramBuilder); const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); - return ctx.ProgramBuilder.BlockMapJoinCore(leftStream, rightStream, joinKind, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, rightAny, returnType); + return ctx.ProgramBuilder.BlockMapJoinCore(leftStream, rightBlockStorage, rightItemMkqlType, joinKind, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, returnType); }); AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) { diff --git a/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json b/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json index f48bcd05bc..9da54766c1 100644 --- a/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json +++ b/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json @@ -1,9 +1,9 @@ { "test.test[Blocks-BlockMapJoinCore-default.txt-Debug]": [ { - "checksum": "78ca432550e55028e5a9754d3698bcb4", - "size": 1864, - "uri": "https://{canondata_backend}/1900335/faec43b5ddae0d95b63fd9929fbf0ba6117189f1/resource.tar.gz#test.test_Blocks-BlockMapJoinCore-default.txt-Debug_/opt.yql" + "checksum": "445a5d7f7918455fc74eefafa41b519d", + "size": 2187, + "uri": "https://{canondata_backend}/1871182/dd0654c5ee04d3c397f5f6c37ba02b2ae503ed1f/resource.tar.gz#test.test_Blocks-BlockMapJoinCore-default.txt-Debug_/opt.yql" } ], "test.test[Blocks-BlockMapJoinCore-default.txt-Results]": [ diff --git a/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls b/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls index 8a6ca3b84c..35154e67ac 100644 --- a/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls +++ b/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls @@ -15,7 +15,11 @@ (let narrowLambdaLeftSemi (lambda '(item1 item2) (AsStruct '('"asubkey" item1) '('"avalue" item2)))) (let doJoin (lambda '(left right narrowMapLambda joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops) (block '( - (return (Collect (NarrowMap (ToFlow (WideFromBlocks (BlockMapJoinCore (WideToBlocks (FromFlow (ExpandMap left expandLambda))) (WideToBlocks (FromFlow (ExpandMap right expandLambda))) joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops '()))) narrowMapLambda))) + (let leftStream (WideToBlocks (FromFlow (ExpandMap left expandLambda)))) + (let rightStream (WideToBlocks (FromFlow (ExpandMap right expandLambda)))) + (let rightStreamItemType (StreamItemType (TypeOf rightStream))) + (let rightBlockIndex (BlockMapJoinIndex (BlockStorage rightStream) rightStreamItemType rightKeyColumns '())) + (return (Collect (NarrowMap (ToFlow (WideFromBlocks (BlockMapJoinCore leftStream rightBlockIndex rightStreamItemType joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops))) narrowMapLambda))) )))) (let innerJoin (Apply doJoin (ToFlow table (DependsOn (String '0))) (ToFlow table (DependsOn (String '1))) narrowLambdaInner 'Inner '('0) '() '('0) '())) |